In [1]:
!pip install mlflow==2.13.2 sagemaker-mlflow==0.1.0 cloudpickle==2.2.1



In [2]:
import boto3
import sagemaker
import sagemaker.session
import os
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.steps import TrainingStep
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.workflow.model_step import ModelStep
from sagemaker.transformer import Transformer
from sagemaker.workflow.steps import TransformStep
from sagemaker.inputs import TransformInput
from sagemaker.workflow.pipeline import Pipeline
from sagemaker import get_execution_role

from scripts.functions import *

import os
import pandas as pd
import numpy as np
import mlflow
from mlflow.tracking import MlflowClient

settings = read_settings('scripts/settings.json')

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
!aws s3 rm s3://sagemaker-bucket-ds/01-churn/v1-prod --recursive

delete: s3://sagemaker-bucket-ds/01-churn/v1/data/inference_train/inference_train.csv
delete: s3://sagemaker-bucket-ds/01-churn/v1/data/test/test.csv
delete: s3://sagemaker-bucket-ds/01-churn/v1/data/train/train.csv
delete: s3://sagemaker-bucket-ds/01-churn/v1/output/inference_train/inference_train.csv.out
delete: s3://sagemaker-bucket-ds/01-churn/v1/data/valid/valid.csv


In [4]:
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()

pipeline_session = PipelineSession()

In [5]:
%%writefile scripts/processing.py

import os
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split

def create_random_dataframe_with_params(n_rows, n_cols, params, seed=None):
    """
    Create a DataFrame with random values and an additional binary target column based on the sum of products of values and parameters.
    
    Parameters:
    - n_rows: int, number of rows in the DataFrame
    - n_cols: int, number of columns in the DataFrame
    - params: list or array-like, parameters for each column
    - seed: int, random seed for reproducibility (default is None)
    
    Returns:
    - DataFrame with shape (n_rows, n_cols+1) where the last column is a binary target based on the sum of products.
    """
    if seed is not None:
        np.random.seed(seed)
        
    if len(params) != n_cols:
        raise ValueError("The length of params must be equal to the number of columns (n_cols).")
    
    data = np.random.rand(n_rows, n_cols)
    df = pd.DataFrame(data, columns=[f'col_{i+1}' for i in range(n_cols)])
    
    # Calculate the sum_product column
    df['sum_product'] = np.dot(df.values, params) + 0.5
    
    # Calculate the target column
    df['target'] = (np.random.rand(n_rows) < df['sum_product']).astype(int)
    
    # Drop the sum_product column
    df = df.drop(columns=['sum_product'])

    # Move target column to the first position
    columns = ['target'] + [col for col in df.columns if col != 'target']
    df = df[columns]    
    
    return df

if __name__ == '__main__':
    settings = os.environ
    
    # Create data
    params = [-1, -1, -0.5, 0, 0, 0.5, 1, 1]
    df = create_random_dataframe_with_params(n_rows = 100000, n_cols = 8, params = params, seed = 42)
    

    train, temp = train_test_split(df, test_size=0.4, random_state=42)
    test, valid = train_test_split(temp, test_size=0.5, random_state=42)

    train = train[['target', "col_1", "col_2", "col_3", "col_6", "col_7", "col_8"]]
    test = test[['target', "col_1", "col_2", "col_3", "col_6", "col_7", "col_8"]]
    valid = valid[['target', "col_1", "col_2", "col_3", "col_6", "col_7", "col_8"]]
    inference_train = train[["col_1", "col_2", "col_3", "col_6", "col_7", "col_8"]]

    train_path = os.path.join(settings["preprocessing_output_train"], "train.csv")
    train.to_csv(train_path, index=False, float_format='%.5f')

    test_path = os.path.join(settings["preprocessing_output_test"], "test.csv")
    test.to_csv(test_path, index=False, float_format='%.5f')

    valid_path = os.path.join(settings["preprocessing_output_valid"], "valid.csv")
    valid.to_csv(valid_path, index=False, float_format='%.5f')

    inference_train_path = os.path.join(settings["preprocessing_output_inference_train"], "inference_train.csv")
    inference_train.to_csv(inference_train_path, index=False, float_format='%.5f')

Overwriting scripts/processing.py


In [6]:
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type='ml.t3.medium',
    instance_count=1,
    base_job_name=settings['preprocessing_job_name'],
    sagemaker_session=pipeline_session,
    role=role,
    env=settings
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [7]:
train_s3_path = os.path.join("s3://",settings['bucket_name'],settings['project_path_s3'],'data','train')
test_s3_path = os.path.join("s3://",settings['bucket_name'],settings['project_path_s3'],'data','test')
valid_s3_path = os.path.join("s3://",settings['bucket_name'],settings['project_path_s3'],'data','valid')
inference_train_s3_path = os.path.join("s3://",settings['bucket_name'],settings['project_path_s3'],'data','inference_train')

processor_args = sklearn_processor.run(
    inputs=[],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source=settings["preprocessing_output_train"],
            destination=train_s3_path),
        ProcessingOutput(
            output_name="test",
            source=settings["preprocessing_output_test"],
            destination=test_s3_path),
        ProcessingOutput(
            output_name="valid",
            source=settings["preprocessing_output_valid"],
            destination=valid_s3_path),
        ProcessingOutput(
            output_name="inference_train",
            source=settings["preprocessing_output_inference_train"],
            destination=inference_train_s3_path)        

    ],
    code="scripts/processing.py",
) 

step_process = ProcessingStep(
    name=settings["preprocessing_step_name"],
    step_args=processor_args
)



In [8]:
environment = {
    'mlflow_arn': settings['mlflow_arn'],
    'mlflow_experiment_name': settings['mlflow_experiment_name'],
    'mlflow_final_model_name': 'final-model2',
    'mlflow_model_name': settings['mlflow_model_name']
}

sklearn = SKLearn(
    entry_point='train.py', # The file with the training code
    source_dir='scripts', # The folder with the training code
    framework_version='1.2-1', # Version of SKLearn which will be used
    instance_type='ml.m5.large', # Instance type that wil be used
    role=role, # Role that will be used during execution
    sagemaker_session=pipeline_session, 
    base_job_name=settings['training_job_name'], # Name of the training job. Timestamp will be added as suffix
    environment = environment
)

In [9]:
train_args = sklearn.fit({"train": step_process.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri})

In [10]:
step_train = TrainingStep(
    name=settings["training_step_name"],
    step_args = train_args
)

In [11]:
mlflow.set_tracking_uri(settings['mlflow_arn'])
mlflow.set_experiment(settings['mlflow_experiment_name'])
client = MlflowClient()
role = get_execution_role()

In [12]:
registered_model = client.get_registered_model(name=settings['mlflow_model_name'])
run_id = registered_model.latest_versions[0].run_id
source_path = registered_model.latest_versions[0].source
model_path = os.path.join(source_path, 'model.tar.gz')

In [13]:
# Create the SKLearnModel
sklearn_model = SKLearnModel(
    model_data=model_path,
    entry_point='inference.py', # The file with the training code
    source_dir="scripts", # The folder with the training code
    role=role,
    framework_version='1.2-1',  # Replace with the appropriate sklearn version
    sagemaker_session=pipeline_session
)

In [14]:
step_create_model = ModelStep(
   name=settings["modelcreate_step_name"],
   step_args=sklearn_model.create(instance_type="ml.m5.large"),
    depends_on=[step_train]
)

In [15]:
transformer_output_path = os.path.join("s3://",settings['bucket_name'],settings['project_path_s3'],'output','inference_train')

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.large",
    instance_count=1,
    output_path=transformer_output_path,
    accept="text/csv"
)

In [16]:
step_transform = TransformStep(
    name=settings["transformer_step_name"],
    transformer=transformer,
    inputs=TransformInput(
        data=step_process.properties.ProcessingOutputConfig.Outputs['inference_train'].S3Output.S3Uri,
        content_type='text/csv', # It is neccessary because csv is not default format
        split_type='Line' # Each line equals one observation)
))

In [17]:
pipeline_name = f"01-churn-deploy-model"
pipeline = Pipeline(
    name=pipeline_name,
    steps=[step_process, step_train, step_create_model, step_transform],
)

In [18]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:eu-west-1:211125740051:pipeline/01-churn-deploy-model',
 'ResponseMetadata': {'RequestId': 'd935880a-4400-46db-8c82-9a8d8e28d22a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd935880a-4400-46db-8c82-9a8d8e28d22a',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '89',
   'date': 'Mon, 15 Jul 2024 12:24:43 GMT'},
  'RetryAttempts': 0}}

In [19]:
execution = pipeline.start()