## Initial setup

:::{.callout-note}
Before running this notebook, follow the [Setup Instructions](https://program.ml.school/setup.html) for the program.
:::

Let's start by setting up the environment and preparing to run the notebook.


In [1]:
#| hide

%load_ext autoreload
%autoreload 2
%load_ext dotenv
%dotenv

import sys
import logging
import ipytest
import json
from pathlib import Path


CODE_FOLDER = Path("code")
CODE_FOLDER.mkdir(parents=True, exist_ok=True)
INFERENCE_CODE_FOLDER = CODE_FOLDER / "inference"
INFERENCE_CODE_FOLDER.mkdir(parents=True, exist_ok=True)

sys.path.extend([f"./{CODE_FOLDER}", f"./{INFERENCE_CODE_FOLDER}"])

DATA_FILEPATH = "penguins.csv"

ipytest.autoconfig(raise_on_error=True)

# By default, The SageMaker SDK logs events related to the default
# configuration using the INFO level. To prevent these from spoiling
# the output of this notebook cells, we can change the logging
# level to ERROR instead.
logging.getLogger("sagemaker.config").setLevel(logging.ERROR)

In [2]:
LOCAL_MODE = False

In [3]:
import os

bucket = os.environ["BUCKET"]
role = os.environ["ROLE"]

S3_LOCATION = f"s3://{bucket}/penguins"

In [4]:
architecture = !(uname -m)
IS_APPLE_M_CHIP = architecture[0] == "arm64"

In [5]:
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession, LocalPipelineSession

pipeline_session = PipelineSession(default_bucket=bucket) if not LOCAL_MODE else None

if LOCAL_MODE:
    config = {
        "session": LocalPipelineSession(default_bucket=bucket),
        "instance_type": "local",
        # We need to use a custom Docker image when we run the pipeline
        # in Local Model on an ARM64 machine.
        "image": "sagemaker-tensorflow-toolkit-local" if IS_APPLE_M_CHIP else None,
    }
else:
    config = {
        "session": pipeline_session,
        "instance_type": "ml.m5.xlarge",
        "image": None,
    }

config["framework_version"] = "2.11"
config["py_version"] = "py39"

In [6]:
import boto3

sagemaker_session = sagemaker.session.Session()
sagemaker_client = boto3.client("sagemaker")
iam_client = boto3.client("iam")
region = boto3.Session().region_name

The first step is to create the script that will split and transform the input data.


In [21]:
%%writefile {CODE_FOLDER}/preprocessor.py
#| label: preprocessing-script
#| echo: true
#| output: false
#| filename: preprocessor.py
#| code-line-numbers: true

import os
import tarfile
import tempfile
import joblib
import numpy as np
import pandas as pd

from pathlib import Path
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.impute import SimpleImputer
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler, OrdinalEncoder


def preprocess(base_directory, to_upper=False):
    """
    This function loads the supplied data, splits it and transforms it.
    """

    df = _read_data_from_input_csv_files(base_directory)

    # print(df)
    if to_upper:
        df['island'] = df['island'].str.upper()
    else:
        df['island'] = df['island'].str.lower()

    output_path = Path(base_directory) / "capitalized-output"
    df.to_csv(output_path / "capitalized-islands.csv", header=True, index=False)
    

def _read_data_from_input_csv_files(base_directory):
    """
    This function reads every CSV file available and concatenates
    them into a single dataframe.
    """

    input_directory = Path(base_directory) / "input"
    files = [file for file in input_directory.glob("*.csv")]
    
    if len(files) == 0:
        raise ValueError(f"The are no CSV files in {str(input_directory)}/")
        
    raw_data = [pd.read_csv(file) for file in files]
    df = pd.concat(raw_data)
    return df

    
if __name__ == "__main__":
    preprocess(base_directory="/opt/ml/processing")

Overwriting code/preprocessor.py


### Step 3 - Setting up the Processing Step

Let's now define the [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) that we'll use in the pipeline to run the script that will split and transform the data.


In [22]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="15d")

In [23]:
from sagemaker.workflow.parameters import ParameterString

dataset_location = ParameterString(
    name="dataset_location",
    default_value=f"{S3_LOCATION}/data",
)

# dataset_location = ParameterString(
#     name="dataset_location",
#     default_value=f"{S3_LOCATION}/random_data",
# )

In [24]:
# | code: true
# | output: false

from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig


processor = SKLearnProcessor(
    base_job_name="preprocess-data",
    framework_version="1.2-1",
    # By default, a new account doesn't have access to `ml.m5.xlarge` instances.
    # If you haven't requested a quota increase yet, you can use an
    # `ml.t3.medium` instance type instead. This will work out of the box, but
    # the Processing Job will take significantly longer than it should have.
    # To get access to `ml.m5.xlarge` instances, you can request a quota
    # increase under the Service Quotas section in your AWS account.
    instance_type=config["instance_type"],
    instance_count=2,
    role=role,
    sagemaker_session=config["session"],
)

preprocessing_step = ProcessingStep(
    name="capitalize-islands",
    step_args=processor.run(
        code=f"{CODE_FOLDER}/preprocessor.py",
        inputs=[
            ProcessingInput(
                source=dataset_location, 
                destination="/opt/ml/processing/input", 
                s3_data_distribution_type="ShardedByS3Key"
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name="capitalized",
                source="/opt/ml/processing/capitalized-output", # Updated to correct output source directory
                destination=f"{S3_LOCATION}/preprocessing/capitalized",
            ),
        ],
    ),
    cache_config=cache_config,
)

pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

session1_pipeline = Pipeline(
    name="session1-pipeline",
    parameters=[dataset_location],
    steps=[
        preprocessing_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

session1_pipeline.upsert(role_arn=role)

session1_pipeline.start()

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3




_PipelineExecution(arn='arn:aws:sagemaker:us-east-2:992382774880:pipeline/session1-pipeline/execution/42u806fotycq', sagemaker_session=<sagemaker.workflow.pipeline_context.PipelineSession object at 0x103f97820>)

### Assignments

-   <span style="padding:4px; background-color: #f2a68a; color: #000;"><strong>Assignment 1.4</strong></span> For this assignment, we want to run a distributed Processing Job across multiple instances to capitalize the `island` column of the dataset. Your dataset will consist of 10 different files stored in S3. Set up a Processing Step using two instances. When specifying the input to the Processing Step, you must set the `ProcessingInput.s3_data_distribution_type` attribute to `ShardedByS3Key`. By doing this, SageMaker will run a cluster with two instances simultaneously, each with access to half the files. Check the [`S3DataDistributionType`](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_S3DataSource.html) documentation for more information.

