In [1]:
import boto3
import sagemaker
import sagemaker.session
import os
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.steps import TrainingStep
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.workflow.model_step import ModelStep
from sagemaker.transformer import Transformer
from sagemaker.workflow.steps import TransformStep
from sagemaker.inputs import TransformInput
from sagemaker.workflow.pipeline import Pipeline

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()

pipeline_session = PipelineSession()

In [3]:
os.makedirs("05_full_pipeline", exist_ok=True) # Create folder for training code

In [4]:
%%writefile 05_full_pipeline/processing.py

import os
import pandas as pd
import numpy as np

from sklearn import datasets
from sklearn.model_selection import train_test_split


if __name__ == '__main__':
    
    iris = datasets.load_iris()
    df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    df["class"] = pd.Series(iris.target)
    df = df[df['class'].isin([0, 1])] # Lets keep only class 0 and 1 to have binary classification
    df = df[[list(df.columns)[-1]] + list(df.columns)[:-1]] # Reorder target as the first column
    df.columns = df.columns.str.replace(' ', '_').str.replace('(', '').str.replace(')', '')
    
    train_df, test_df = train_test_split(df, test_size=0.33, random_state=42, stratify=df["class"])
    
    iris_train = train_df.to_numpy()
    np.savetxt('/opt/ml/processing/output/train/iris_train.csv', iris_train, delimiter=',', fmt='%1.1f, %1.3f, %1.3f, %1.3f, %1.3f')
    
    iris_test = test_df.to_numpy()
    np.savetxt('/opt/ml/processing/output/test/iris_test.csv', iris_test, delimiter=',', fmt='%1.1f, %1.3f, %1.3f, %1.3f, %1.3f')
    
    iris_inference = test_df.iloc[:, 1:].to_numpy()
    np.savetxt('/opt/ml/processing/output/inference/iris_inference.csv', iris_inference, delimiter=',', fmt='%1.3f, %1.3f, %1.3f, %1.3f')
    
    column_names_list = ','.join(df.columns)
    with open('/opt/ml/processing/output/column_names.csv', 'w') as file:
        file.write(column_names_list)

Overwriting 05_full_pipeline/processing.py


In [5]:
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type='ml.t3.medium',
    instance_count=1,
    base_job_name="05-full-pipeline",
    sagemaker_session=pipeline_session,
    role=role,
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [6]:
processor_args = sklearn_processor.run(
    inputs=[],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/output/train",
            destination='s3://sagemaker-bucket-ds/PIPELINE/05-full-pipeline/train'),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/output/test",
            destination='s3://sagemaker-bucket-ds/PIPELINE/05-full-pipeline/test'),
        ProcessingOutput(
            output_name="inference",
            source="/opt/ml/processing/output/inference",
            destination='s3://sagemaker-bucket-ds/PIPELINE/05-full-pipeline/inference'),        

    ],
    code="05_full_pipeline/processing.py",
) 

step_process = ProcessingStep(
    name="Preprocessing",
    step_args=processor_args
)



In [7]:
%%writefile 05_full_pipeline/train.py

from __future__ import print_function

import argparse
import joblib
import os
import pandas as pd

from sklearn import tree

if __name__ == '__main__':
    model_dir = os.environ['SM_MODEL_DIR'] # Folder where model must be saved
    train_dir = os.environ['SM_CHANNEL_TRAIN'] # Folder where train data is stored

    # Lets assume there is only one training file
    train_file_name = os.listdir(train_dir)[0]
    train_file_path = os.path.join(train_dir, train_file_name)
    
    train_data = pd.read_csv(train_file_path, header=None, engine="python")

    # labels are in the first column
    train_y = train_data.iloc[:, 0]
    train_X = train_data.iloc[:, 1:]  

    # Train the model
    # Hyperparameters are hardcoded
    clf = tree.DecisionTreeClassifier(max_leaf_nodes=30)
    clf = clf.fit(train_X, train_y)

    # Save model object
    joblib.dump(clf, os.path.join(model_dir, "model.joblib"))

Overwriting 05_full_pipeline/train.py


In [8]:
sklearn = SKLearn(
    entry_point='train.py', # The file with the training code
    source_dir='05_full_pipeline', # The folder with the training code
    framework_version='1.2-1', # Version of SKLearn which will be used
    instance_type='ml.m5.large', # Instance type that wil be used
    role=role, # Role that will be used during execution
    sagemaker_session=pipeline_session, 
    base_job_name='05_full_pipeline' # Name of the training job. Timestamp will be added as suffix
)

In [9]:
train_args = sklearn.fit({"train": step_process.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri})

In [10]:
step_train = TrainingStep(
    name="SimpleTrain",
    step_args = train_args
)

In [11]:
%%writefile 05_full_pipeline/start_file.py

from __future__ import print_function

import argparse
import joblib
import os
import pandas as pd

from sklearn.linear_model import LogisticRegression

# There is no default function to load the model
# Without this function the job will fail!
def model_fn(model_dir):
    """Deserialized and return fitted model

    Note that this should have the same name as the serialized model in the main method
    """
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf

# There is a default function to calculate the predictions.
# It calculates the class 0/1 instead of probability
# That is why we should override it with a custom function
def predict_fn(input_data, model):
    pred_prob = model.predict_proba(input_data)
    return pred_prob

Overwriting 05_full_pipeline/start_file.py


In [12]:
# Create the SKLearnModel
sklearn_model = SKLearnModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    entry_point='start_file.py', # The file with the training code
    source_dir="05_full_pipeline", # The folder with the training code
    role=role,
    framework_version='1.2-1',  # Replace with the appropriate sklearn version
    sagemaker_session=pipeline_session
)

In [13]:
step_create_model = ModelStep(
   name="MyModelCreationStep",
   step_args=sklearn_model.create(instance_type="ml.m5.large"),
)

In [14]:
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.large",
    instance_count=1,
    output_path=f"s3://sagemaker-bucket-ds/PIPELINE/05-full-pipeline/INFERENCE_OUTPUT/"
)

In [15]:
step_transform = TransformStep(
    name="MyTransformStep",
    transformer=transformer,
    inputs=TransformInput(
        data=step_process.properties.ProcessingOutputConfig.Outputs['inference'].S3Output.S3Uri,
        content_type='text/csv', # It is neccessary because csv is not default format
        split_type='Line' # Each line equals one observation)
))

In [16]:
pipeline_name = f"05-full-pipeline"
pipeline = Pipeline(
    name=pipeline_name,
    steps=[step_process, step_train, step_create_model, step_transform],
)

In [17]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:eu-west-1:211125740051:pipeline/05-full-pipeline',
 'ResponseMetadata': {'RequestId': '7cc90bbc-0cd6-4236-bb1d-6617b064523f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '7cc90bbc-0cd6-4236-bb1d-6617b064523f',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '84',
   'date': 'Thu, 11 Jul 2024 13:51:54 GMT'},
  'RetryAttempts': 0}}

In [18]:
execution = pipeline.start()