# Define Pipeline with Sklearn Model(s)

## Load libraries

In [1]:
import os
import json
import yaml
import string
import boto3

import sagemaker
import sagemaker.session
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.functions import Join
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, TransformStep, CacheConfig
from sagemaker.estimator import Estimator
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo, ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.properties import PropertyFile

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


## Define session variables

In [2]:
with open('param_config.yaml', 'r') as config_file:
    config_params = yaml.safe_load(config_file)

In [3]:
model_description = config_params['model_description']

bucket = config_params['bucket']
prod_uri = f's3://{bucket}'

pipeline_name = config_params['use_case']
model_package_group_name = config_params['use_case']

timestamp_suffix = config_params['timestamp_suffix']

model_script = config_params['model_script']
tune_model_script = config_params['tune_model_script']
eval_script = config_params['eval_script']
processor_dir = config_params['processor_dir']

process_input_path = "/opt/ml/processing/input"
process_output_path = "/opt/ml/processing/output"

In [4]:
session = sagemaker.session.Session()
pipe_session= PipelineSession()
region = session.boto_region_name
role = sagemaker.get_execution_role()
account_id = session.account_id()

image_uri_tag = config_params['image_uri_tag']
image_uri = f"{account_id}.dkr.ecr.{region}.amazonaws.com/frm-svcs:{image_uri_tag}"
# image_uri = f"{account_id}.dkr.ecr.{region}.amazonaws.com/frm-svcs:"

## Define Pipeline parameters

In [5]:
param_list = []
step_list = []

In [6]:
train_size = ParameterString(name='TrainSize', default_value=config_params['train_size'])
param_list.append(train_size)

step_cache_config = CacheConfig(enable_caching=True, expire_after=config_params['step_cache_config'])
param_list.append(step_cache_config)

folder_date = config_params['folder_date']
target_name = config_params['target_name']

sample_size = config_params['sample_size']
predictability = config_params['predictability']
target_rate = config_params['target_rate']

processing_instance_count = config_params['processing_instance_count']
processing_instance_type = config_params['processing_instance_type']
training_instance_type = config_params['training_instance_type']
training_instance_count = config_params['training_instance_count']
inference_instance_type = config_params['inference_instance_type']

max_automl_runtime = config_params['max_automl_runtime'] 
model_approval_status = config_params['model_approval_status']
model_registration_metric_threshold = config_params['model_registration_metric_threshold']

model_name = config_params['model_description']
data_path = config_params['data_path']

In [7]:
tags = [
    {"Key": "PLATFORM", "Value": "FO-ML"},
    {"Key": "BUSINESS_REGION", "Value": "GLOBAL"},
    {"Key": "BUSINESS_UNIT", "Value": "MOBILITY"},
    {"Key": "CLIENT", "Value": "MULTI_TENANT"}
   ]

meta_data = {"target": target_name,
             "model-name": config_params['model_description'],
             "program-name": config_params['use_case'],
             "train_data": folder_date,
             "train_date": timestamp_suffix}

## Define Data Processor

In [8]:
data_processor = ScriptProcessor(
    image_uri=image_uri,
    role=role,
    command=['python3'],
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    sagemaker_session=pipe_session,
    base_job_name='{}-{}'.format(model_name, "processor"))

# 1a. Create Data and Split

In [9]:
step_combine_split = ProcessingStep(
    name="{}-01c_creat_split".format(model_name),
    processor=data_processor,
    outputs=[
        ProcessingOutput(
            output_name="train",
            source=Join(
                on="/",
                values=[
                    process_output_path,
                    "train"]),
            destination="{}/{}/{}".format(bucket, model_name, 'train')),
        ProcessingOutput(
            output_name="validate",
            source=Join(
                on="/",
                values=[
                    process_output_path,
                    "validate"]),
            destination="{}/{}/{}".format(bucket, model_name, 'validate')),
        ProcessingOutput(
            output_name="test",
            source=Join(
                on="/",
                values=[
                    process_output_path,
                    "test"]),
            destination="{}/{}/{}".format(bucket, model_name, 'test')),
    ],
    job_arguments=["--target-name", target_name,
                   "--target-rate", target_rate,
                   "--sample-size", sample_size,
                   '--predictability', predictability,
                   '--train-size', train_size,
                   '--input-path', process_input_path,
                   '--output-path', process_output_path],
    code="create_data.py",
    cache_config=step_cache_config)
step_list.append(step_combine_split)

## 3a. Train Model

In [10]:
with open(os.path.join(processor_dir, "algo_dict.json")) as ad:
    algo_dict = json.loads(ad.read())

In [11]:
train_processor = ScriptProcessor(
    image_uri=image_uri,
    role=role,
    command=['python3'],
    instance_type=training_instance_type,
    instance_count=processing_instance_count,
    sagemaker_session=pipe_session,
    base_job_name="{}-{}".format(model_name, 'trainer'))

In [12]:
algo_list = list(algo_dict.keys())
alphabet = list(string.ascii_lowercase)
results_path = os.path.join(prod_uri, model_name, "train_result_metrics")
basemodel_steps = []

for i, algo in enumerate(algo_list):
    counter = alphabet[i]
    
    train_args = train_processor.run(
        code=model_script,
        # source_dir=processor_dir,
        inputs=[
            ProcessingInput(
                source=step_combine_split.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
                destination=process_input_path)],
        outputs=[
            ProcessingOutput(
                output_name="train_results",
                source=process_output_path,
                destination=results_path)],
        arguments = ['--target', target_name,
                     "--train-size", train_size,
                     "--algo-name", algo,
                     '--input-path', process_input_path,
                     '--output-path', process_output_path])
    
    train_base_model = ProcessingStep(
        name="{}-03{}_base_model_{}".format(model_name, counter, algo)[:64],
        step_args=train_args,
        cache_config=step_cache_config)
    
    step_list.append(train_base_model)
    basemodel_steps.append(train_base_model)



# 3b Eval Baseline Models

In [13]:
step_best_algo = ProcessingStep(
    name="{}-03_best_algo".format(model_name),
    processor=data_processor,
    inputs=[
        ProcessingInput(
            source=results_path,
            destination=process_input_path)],
    outputs=[
        ProcessingOutput(
            output_name="baseline_metrics",
            source=process_output_path,
            destination="{}/{}/{}".format(bucket, model_name, 'baseline_metrics'))
    ],
    code="eval_basemodels.py",
    job_arguments = ['--input-path', process_input_path,
                 '--output-path', process_output_path],
    cache_config=step_cache_config,
    depends_on=basemodel_steps)
step_list.append(step_best_algo)

# 3c. Tune Model

In [14]:
model = Estimator(
    entry_point=tune_model_script,
    image_uri=image_uri,
    role=role,
    instance_type=training_instance_type,
    instance_count=training_instance_count,
    sagemaker_session=pipe_session,
    hyperparameters = {'target':target_name,
                       "train-size":train_size},
    output_path=prod_uri,
    max_run=259200)

step_tune_model = TrainingStep(
    name="{}-03_tune_model".format(model_name),
    estimator=model,
    inputs={
        'train_data':TrainingInput(
            s3_data=step_combine_split.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type='application/x-parquet'),
        'validate_data':TrainingInput(
            s3_data=step_combine_split.properties.ProcessingOutputConfig.Outputs["validate"].S3Output.S3Uri,            
            content_type='application/x-parquet'),
        'metric_data':TrainingInput(
            s3_data=step_best_algo.properties.ProcessingOutputConfig.Outputs["baseline_metrics"].S3Output.S3Uri,
            content_type='application/x-parquet')
    },
    cache_config=step_cache_config)
step_list.append(step_tune_model)


estimator_model = Model(
    name="{}-03b_TrainedModel".format(model_name),
    image_uri=image_uri,
    model_data=step_tune_model.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    sagemaker_session=pipe_session,
    entry_point=tune_model_script)

step_create_model = ModelStep(
    name="{}-03c".format(model_name),
    step_args=estimator_model.create(
        instance_type=inference_instance_type,
        tags=tags))
step_list.append(step_create_model)



## 4. Evaluate Model

In [15]:
evaluation_report = PropertyFile(
    name="{}-04a_evaluation".format(model_name),
    output_name="evaluation_metrics",
    path="evaluation.json")

eval_processor = ScriptProcessor(
    image_uri=image_uri,
    role=role,
    command=['python3'],
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    sagemaker_session=pipe_session)

eval_args = eval_processor.run(
    code="evaluate.py",
    inputs=[
        ProcessingInput(
            source=step_tune_model.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/input/model"),
        ProcessingInput(
            source=step_combine_split.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/input/test")],
    outputs=[
        ProcessingOutput(
            output_name="evaluation_metrics",
            source="/opt/ml/processing/evaluation",
            destination="{}/{}/{}".format(bucket, model_name, 'evaluation'))
    ],
    arguments = ["--target", target_name,
                 # '--input-path', process_input_path,
                 # '--output-path', process_output_path
                ])
    
step_evaluation = ProcessingStep(
    name="{}-04b_evalStep".format(model_name),
    step_args=eval_args,
    property_files=[evaluation_report],
    cache_config=step_cache_config)
step_list.append(step_evaluation)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=Join(
            on='/',
            values=[
                step_evaluation.arguments["ProcessingOutputConfig"]["Outputs"][0]['S3Output']['S3Uri'],
                'evaluation.json']),
        content_type='application/json'),
    explainability=MetricsSource(
        s3_uri=step_tune_model.properties.ModelArtifacts.S3ModelArtifacts,
        content_type="application/json"))



## 5. Register Model on Conditional

In [16]:
step_register_model = RegisterModel(
    name=model_name,
    model=estimator_model,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=[inference_instance_type],
    transform_instances=[inference_instance_type],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    customer_metadata_properties=meta_data,
    model_metrics=model_metrics,
    description=model_description)
# step_list.append(step_register_model)

step_conditional_registration = ConditionStep(
    name="{}-05a_ConditionalRegistration".format(model_name),
    conditions=[
        ConditionGreaterThanOrEqualTo(
            left=JsonGet(
                step_name=step_evaluation.name,
                property_file=evaluation_report,
                json_path="binary_classification_metrics.f1.value"),
            right=model_registration_metric_threshold)],
    if_steps=[step_register_model],
    else_steps=[])
step_list.append(step_conditional_registration)

## Define Pipeline

In [17]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=param_list,
    steps=step_list,
    sagemaker_session=pipe_session)

In [21]:
pipeline

<sagemaker.workflow.pipeline.Pipeline at 0x7fec091dce50>

In [22]:
pipeline_name

'smpl'

In [23]:
param_list

[ParameterString(name='TrainSize', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='0.8'),
 CacheConfig(enable_caching=True, expire_after='PT24H')]

In [24]:
step_list

[<sagemaker.workflow.steps.ProcessingStep at 0x7fec0a465a90>,
 <sagemaker.workflow.steps.ProcessingStep at 0x7fec08352cd0>,
 <sagemaker.workflow.steps.ProcessingStep at 0x7fec08a76b10>,
 <sagemaker.workflow.steps.ProcessingStep at 0x7fec08a88a10>,
 <sagemaker.workflow.steps.ProcessingStep at 0x7fec08a8a590>,
 <sagemaker.workflow.steps.ProcessingStep at 0x7fec08a9c610>,
 <sagemaker.workflow.steps.ProcessingStep at 0x7fec08a9ded0>,
 <sagemaker.workflow.steps.ProcessingStep at 0x7fec08aaccd0>,
 <sagemaker.workflow.steps.ProcessingStep at 0x7fec08aaedd0>,
 <sagemaker.workflow.steps.ProcessingStep at 0x7fec08ab8a10>,
 <sagemaker.workflow.steps.ProcessingStep at 0x7fec08aba910>,
 <sagemaker.workflow.steps.TrainingStep at 0x7fec091ed350>,
 ModelStep(name='smpl10000samples20241211-03c', steps=[<sagemaker.workflow.steps.CreateModelStep object at 0x7fec0a732e10>], depends_on=None),
 <sagemaker.workflow.steps.ProcessingStep at 0x7fec08c32850>,
 <sagemaker.workflow.condition_step.ConditionStep at 

In [26]:
pipeline.definition()



'{"Version": "2020-12-01", "Metadata": {}, "Parameters": [{"Name": "TrainSize", "Type": "String", "DefaultValue": "0.8"}], "PipelineExperimentConfig": {"ExperimentName": {"Get": "Execution.PipelineName"}, "TrialName": {"Get": "Execution.PipelineExecutionId"}}, "Steps": [{"Name": "smpl10000samples20241211-01c_creat_split", "Type": "Processing", "Arguments": {"ProcessingResources": {"ClusterConfig": {"InstanceType": "ml.m5.large", "InstanceCount": 1, "VolumeSizeInGB": 30}}, "AppSpecification": {"ImageUri": "707031497630.dkr.ecr.us-east-1.amazonaws.com/frm-svcs:fit-fraudml-sagemaker-collab-20241121174423-54-76a346d", "ContainerArguments": ["--target-name", "target", "--target-rate", 0.5, "--sample-size", 10000, "--predictability", 0.8, "--train-size", {"Get": "Parameters.TrainSize"}, "--input-path", "/opt/ml/processing/input", "--output-path", "/opt/ml/processing/output"], "ContainerEntrypoint": ["python3", "/opt/ml/processing/input/code/create_data.py"]}, "RoleArn": "arn:aws:iam::8889628

## Upload Pipeline and start execution

In [18]:
pipeline.upsert(role_arn=role, tags=tags)



ClientError: An error occurred (ValidationException) when calling the CreatePipeline operation: Unable to parse pipeline definition. Property 'null' with value 'null' is not of expected type 'String'

In [None]:
pipeline.start(
    execution_display_name=model_description
)

In [None]:
print("Done!")