# Build a Machine Learning Workflow Using SageMaker Pipelines

Amazon SageMaker Model Building Pipelines is a tool for building **machine learning pipelines**. You can create a pipeline, and automate the ML workflow orchestration. 

Key features of SageMaker Pipelines are as followed:

## SageMaker Integration

SageMaker Pipelines is integrated directly with SageMaker, so you don't need to interact with any other AWS services. You also don't need to manage any resources because SageMaker Pipelines is a fully managed service, which means that it creates and manages resources for you.

## SageMaker Python SDK Integration

Because SageMaker Pipelines is integrated with the SageMaker Python SDK, you can create your pipelines programmatically using a high-level Python interface that you might already be familiar with. To view the SageMaker Python SDK API reference, see Pipelines. For SageMaker Python SDK code examples, see Amazon SageMaker Model Building Pipelines.

## SageMaker Studio Integration

SageMaker Studio offers an environment to manage the end-to-end SageMaker Pipelines experience. Using Studio, you can bypass the AWS console for your entire workflow management. For more information on managing SageMaker Pipelines from SageMaker Studio, see View, Track, and Execute SageMaker Pipelines in SageMaker Studio.

## Data Lineage Tracking

With SageMaker Pipelines you can track the history of your data within the pipeline execution. Amazon SageMaker ML Lineage Tracking lets you analyze where the data came from, where it was used as an input, and the outputs that were generated from it. For example, you can view the models created from an individual dataset, and you can view the datasets that went into creating an individual model. For more information, see Amazon SageMaker ML Lineage Tracking.

## Step Reuse

With SageMaker Pipelines, you can designate steps for caching. When a step is cached, it is indexed for reuse later if the same step is executed again. As a result, you can reuse the output from previous step executions of the same step in the same pipeline without having to run the step again. For more information on step caching, see Caching Pipeline Steps.


In [26]:
!pip install "xlrd >=1.0.0" sagemaker -U

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m23.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


# Create a SageMaker Pipeline to orchestrate an ML workflow
In this notebook, we'll define a customer churn ML workflow as series of steps using SageMaker Pipelines. Specifically, these are the steps involved in the end to end workflow:

* Pipelines - A DAG defined in python with steps and conditions to orchestrate SageMaker jobs and resource creation. Supports pipeline parameters for individual pipeline executions.
* Processing job Steps - A Pyspark job step that performs feature engineering, data validation.
* Data Quality Check Step - A processing step that performs data quality check and establish a baseline for monitoring data drifts in model monitoring processes.
* Training job Step - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.
* Model quality check Step - A processing step that performs model quality check nad establish a baseline for monitoring model drifts in model monitoring processes.
* Model evaluation Step - Performs model evaluation, create evalution metrics report and upload to S3 bucket to be referenced in the condition and model registration steps.
* Condition Step - Evaluate the model performance against the target threshold. Fail the pipeline if the metrics does not meet the threshold requirements. 
* Model registration Step - Registers model version with metrics captured in the previous steps
* Parametrized Pipeline executions - Enables variation in pipeline executions according to specified parameters.

Here's a diagram that summarizes the major steps described above:

![sm pipeline](img/sagemakerpipeline-churn.png)


# Setup

First step, we import the python libraries and frameworks required for running the workflow.
We also define variables to be referenced throughout the workflow.

In [27]:
import boto3
import pandas as pd
import sagemaker
import os
import json
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.clarify_check_step import (
    DataBiasCheckConfig,
    ClarifyCheckStep,
    ModelBiasCheckConfig,
    ModelPredictedLabelConfig,
    ModelExplainabilityCheckConfig,
    SHAPConfig,
)
from sagemaker.workflow.quality_check_step import (
    DataQualityCheckConfig,
    ModelQualityCheckConfig,
    QualityCheckStep,
)
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join
from sagemaker.model_monitor import DatasetFormat, model_monitoring
from sagemaker.clarify import BiasConfig, DataConfig, ModelConfig
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, TransformStep
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)
from sagemaker.xgboost.model import XGBoostModel
from sagemaker.inputs import CreateModelInput, TransformInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.transformer import Transformer
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.workflow.properties import PropertyFile
from sagemaker.processing import ScriptProcessor
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.model_metrics import MetricsSource, ModelMetrics, FileSource
from sagemaker.drift_check_baselines import DriftCheckBaselines

In [28]:
s3_client = boto3.resource('s3')
pipeline_name = f"StreamingServiceChurnModelPipeline"
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()

model_package_group_name = f"StreamingServiceModelPackageGroup"
s3_processing_input_prefix = "data/kkbox-customer-churn-model/raw"
s3_code_prefix = "data/kkbox-customer-churn-model/code"
s3_preprocessing_output_prefix = "data/kkbox-customer-churn-model/processed"
s3_batch_transform_output_prefix = "data/kkbox-customer-churn-model/batch-transformed"
s3_logs_prefix = "data/kkbox-customer-churn-model/logs"
s3_model_prefix = "data/kkbox-customer-churn-model/output"
s3_quality_prefx = "data/kkbox-customer-churn-model/quality"
s3_bias_prefx = "data/kkbox-customer-churn-model/bias"
s3_explainability_prefix = "data/kkbox-customer-churn-model/explainability"
s3_model_evaluation_prefix = "data/kkbox-customer-churn-model/evaluation"
auc_score_threshold = 0.75
base_job_prefix = "kkbox-customer-churn-model"
model_package_group_name = "kkbox-customer-churn-model-packages"
pyspark_cluster_instance_type = "ml.m5.4xlarge"
cache_config = CacheConfig(enable_caching=True, expire_after="P1M")

# SageMaker Pipeline Parameters
You can introduce variables into your pipeline definition using parameters. 
Parameters have a default value, which you can override by specifying parameter values when starting a pipeline execution. 
Here are the supported Parameter types:
    
* ParameterString – Representing a string parameter.
* ParameterInteger – Representing an integer parameter.
* ParameterFloat – Representing a float parameter.
* ParameterBoolean – Representing a Boolean Python type.

Following is a list of parameters that we'll define for our project:

* processing_instance_count - number of instances needed to carry out the pyspark processing job.
* processing_instance_type - SageMaker ML instance type to use or the processing job.
* skip_check_data_quality - Specifics whether to skip data quality check. 
* register_new_baseline_data_quality - Specifies whether to create a new data baseline in the specified pipeline exeuction.
* supplied_baseline_constraints_data_bias - An option to manually provide the constraint baseline for bias check (S3 URI)
* supplied_baseline_statistics_data_quality - An option to manually provide statistics  baseline for data quality check (S3 URI)
* supplied_baseline_constraints_data_quality - An option to manually provide the constraint baseline for data quality check (S3 URI)
* skip_check_data_bias - Whether to skips data bias check
* register_new_baseline_data_bias - Whether to create and register a new data bias baseline
* training_instance_type - SageMaker ML instance type to use for the training job
* inference_instance_type = SageMaker ML instance type to use for the inference job
* input_data_prefix - S3 URI to identify the input data
* model_approval_status - Default model approval status when registering a model with SageMaker Model Registry
* skip_check_model_quality = Whether to skips model quality check
* register_new_baseline_model_quality - Whether to create and register a new model quality baseline.
* supplied_baseline_statistics_model_quality - Manually provide a statistics baseline for model quality check (S3 URI)
* supplied_baseline_constraints_model_quality - Manually provide a constraint baseline for model quality check (S3 URI)
* skip_check_model_bias - Whether to skip model bias check
* register_new_baseline_model_bias - Whether to create and register a new model bias baseline.
* supplied_baseline_constraints_model_bias - Manually provide a constraint baseline for model bias check (S3 URI)
* skip_check_model_explainability - Whether to skips model explainability check
* register_new_baseline_model_explainability - Whether to create and register a new explainability baseline.
* supplied_baseline_constraints_model_explainability - Manually provide a constraint baseline for model explainability check (S3 URI)

In [29]:
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount",default_value=4)
processing_instance_type = ParameterString(name="ProcessingInstanceType",default_value="ml.m5.4xlarge")
skip_check_data_quality = ParameterBoolean(name="SkipDataQualityCheck", default_value=True)
register_new_baseline_data_quality = ParameterBoolean(name="RegisterNewDataQualityBaseline", default_value=True)
supplied_baseline_constraints_data_bias = ParameterString(name="DataBiasSuppliedBaselineConstraints", default_value="")
supplied_baseline_statistics_data_quality = ParameterString(name="DataQualitySuppliedStatistics", default_value="")
supplied_baseline_constraints_data_quality = ParameterString(name="DataQualitySuppliedConstraints", default_value="")
skip_check_data_bias = ParameterBoolean(name="SkipDataBiasCheck", default_value=True)
register_new_baseline_data_bias = ParameterBoolean(name="RegisterNewDataBiasBaseline", default_value=True)
training_instance_type = ParameterString(name="TrainingInstanceType",default_value="ml.m5.2xlarge")
inference_instance_type = ParameterString(name="InferenceInstanceType",default_value="ml.m5.2xlarge")
input_data_prefix = ParameterString(name="InputDataPrefix",default_value=s3_processing_input_prefix,)
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")
skip_check_model_quality = ParameterBoolean(name="SkipModelQualityCheck", default_value=True)
register_new_baseline_model_quality = ParameterBoolean(name="RegisterNewModelQualityBaseline", default_value=True)
supplied_baseline_statistics_model_quality = ParameterString(name="ModelQualitySuppliedStatistics", default_value="")
supplied_baseline_constraints_model_quality = ParameterString(name="ModelQualitySuppliedConstraints", default_value="")
skip_check_model_bias = ParameterBoolean(name="SkipModelBiasCheck", default_value=True)
register_new_baseline_model_bias = ParameterBoolean(name="RegisterNewModelBiasBaseline", default_value=True)
supplied_baseline_constraints_model_bias = ParameterString(name="ModelBiasSuppliedStatistics", default_value="")
skip_check_model_explainability = ParameterBoolean(name="SkipModelExplainabilityCheck", default_value=True)
register_new_baseline_model_explainability = ParameterBoolean(name="RegisterNewModelExplainabilityBaseline", default_value=True)
supplied_baseline_constraints_model_explainability = ParameterString(name="ModelExplainabilitySuppliedStatistics", default_value="")

In [30]:
# Upload processing script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/cust_churn_prediction/preprocess.py", f"{s3_code_prefix}/preprocess.py")

# PreProcessing Step

In this step, we'll use a Pyspark SageMaker processing job to perform the feature engineering functionality. SageMaker Processing job will use an ephimeral cluster to run the given pyspark script and automatically shutsdown when the script complete.

In [31]:
# Run the processing job
spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type,
    max_runtime_in_seconds=12000,
    sagemaker_session = pipeline_session
)

processor_args = spark_processor.run(
    submit_app=f"s3://{default_bucket}/{s3_code_prefix}/preprocess.py",
    arguments=[
        "--s3_input_bucket",
        default_bucket,
        "--s3_input_key_prefix",
        input_data_prefix,
        "--s3_output_bucket",
        default_bucket,
        "--s3_output_key_prefix",
        s3_preprocessing_output_prefix,
    ],
    spark_event_logs_s3_uri=f"s3://{default_bucket}/{s3_logs_prefix}/spark_event_logs",
    logs=False,
)

step_process = ProcessingStep(name="PreProcess", step_args=processor_args, cache_config=cache_config)



In [32]:
s3 = boto3.client("s3")
response = s3.list_objects_v2(Bucket=default_bucket, Prefix=s3_preprocessing_output_prefix)
train_data_s3_uri_prefix = [ x['Key'] for x in response['Contents'] if f"{s3_preprocessing_output_prefix}/train" in x['Key']][0]
train_data_s3_uri = os.path.join(f"s3://{default_bucket}", train_data_s3_uri_prefix)

# Data Quality Check Step

**CheckJobConfig** is a helper function that's used to define the job configurations used by the **QualityCheckStep**. By separating the job configuration from the step parameters, the same **CheckJobConfig** can be used across multiple steps for quality checks.

The **DataQualityCheckConfig** is used to define the Quality Check job by specifying the dataset used to calculate the baseline, in this case, the training dataset from the data processing step, the dataset format, in this case, a csv file with no headers, and the output path for the results of the data quality check.

In [33]:
check_job_config = CheckJobConfig(
    role=role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    volume_size_in_gb=120,
    sagemaker_session=sagemaker_session,
)

data_quality_check_config = DataQualityCheckConfig(
    baseline_dataset=train_data_s3_uri,
    dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"),
    output_s3_uri=Join(
        on="/",
        values=[
            "s3:/",
            default_bucket,
            s3_quality_prefx,
            ExecutionVariables.PIPELINE_EXECUTION_ID,
            "dataqualitycheckstep",
        ],
    ),
)

data_quality_check_step = QualityCheckStep(
    name="DataQualityCheckStep",
    skip_check=skip_check_data_quality,
    register_new_baseline=register_new_baseline_data_quality,
    quality_check_config=data_quality_check_config,
    check_job_config=check_job_config,
    supplied_baseline_statistics=supplied_baseline_statistics_data_quality,
    supplied_baseline_constraints=supplied_baseline_constraints_data_quality,
    model_package_group_name=model_package_group_name,
    cache_config=cache_config,
    depends_on=[step_process]
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


# Data Bias Check

The job configuration from the previous step is used here and the **DataConfig** class is used to define how the **ClarifyCheckStep** should compute the data bias. The training dataset is used again for the bias evaluation, the column representing the label is specified through the label parameter, and a BiasConfig is provided.

In the BiasConfig, we specify a facet name (the column that is the focal point of the bias calculation), the value of the facet that determines the range of values it can hold, and the threshold value for the label.


In [34]:
headers = ['msno', 'is_churn', 'regist_trans', 'mst_frq_plan_days', \
           'revenue', 'regist_cancels', 'bd', 'tenure', 'num_25', \
           'num_50', 'num_75', 'num_985', 'num_100', 'num_unq', \
           'total_secs', 'city', 'gender', 'registered_via', \
           'qtr_trans', 'mst_frq_pay_met', 'is_auto_renew']

data_bias_analysis_cfg_output_path = (
    f"s3://{default_bucket}/{s3_quality_prefx}/databiascheckstep/analysis_cfg"
)

data_bias_data_config = DataConfig(
    s3_data_input_path=train_data_s3_uri,
    s3_output_path=Join(
        on="/",
        values=[
            "s3:/",
            default_bucket,
            s3_quality_prefx,
            ExecutionVariables.PIPELINE_EXECUTION_ID,
            "databiascheckstep",
        ],
    ),
    label=1,
    excluded_columns=[0],
    dataset_type="text/csv",
    headers=headers,
    s3_analysis_config_output_path=data_bias_analysis_cfg_output_path,
)


data_bias_config = BiasConfig(
    label_values_or_threshold=[1], facet_name="gender", facet_values_or_threshold=[0], group_name="bd")

data_bias_check_config = DataBiasCheckConfig(
    data_config=data_bias_data_config,
    data_bias_config=data_bias_config,
)

data_bias_check_step = ClarifyCheckStep(
    name="DataBiasCheckStep",
    clarify_check_config=data_bias_check_config,
    check_job_config=check_job_config,
    skip_check=skip_check_data_bias,
    register_new_baseline=register_new_baseline_data_bias,
    supplied_baseline_constraints=supplied_baseline_constraints_data_bias,
    model_package_group_name=model_package_group_name,
    cache_config=cache_config,
    depends_on=[data_quality_check_step]
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.0.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.model_monitor.clarify_model_monitoring:Uploading analysis config to {s3_uri}.
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.0.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


# Training Step

Train a binary classification model using XGBoost algorithm. The training step defines the hyperparameters, and fits a model using a SageMaker Training job.

In [35]:
model_path = f"s3://{default_bucket}/{s3_model_prefix}"

hyperparameters = {
    "max_depth":5,
    "eta":0.2,
    "gamma":4,
    "min_child_weight":6,
    "subsample":0.7,
    "n_estimators":50,
    "region" : region,
    "sm_experiment" : ExecutionVariables.PIPELINE_NAME,
    "sm_run" : ExecutionVariables.PIPELINE_EXECUTION_ID}

xgb_train = XGBoost(entry_point = "pipelines/cust_churn_prediction/train.py", 
                    framework_version='1.5-1',
                    hyperparameters=hyperparameters,
                    role=role,
                    instance_count=1,
                    instance_type=training_instance_type,
                    volume_size =10,
                    output_path=model_path,
                    sagemaker_session=pipeline_session)

train_args = xgb_train.fit(
    inputs={
            "train": TrainingInput(
                s3_data=f"s3://{default_bucket}/{s3_preprocessing_output_prefix}/train",
                content_type="text/csv"
            ),
            "test": TrainingInput(
                s3_data=f"s3://{default_bucket}/{s3_preprocessing_output_prefix}/test",
                content_type="text/csv"
            ),
        },
)

step_train = TrainingStep(
    name="TrainModel",
    step_args=train_args,
    cache_config=cache_config,
    depends_on=[step_process])

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.2xlarge.


# Create Model Step

After the model is train. We will define a Model object that encapsulates information about the model, for example, instance type for inference, S3 location of the  trained model etc.

A create model step requires model artifacts and information about the SageMaker instance type that you need to use to create the model. The following example shows how to create a CreateModel step definition. For more information on CreateModel step requirements, see the sagemaker.workflow.steps.CreateModelStep documentation.

In [36]:
model = XGBoostModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
    entry_point="pipelines/cust_churn_prediction/inference.py",
    framework_version="1.5-1"
)
step_create_model = ModelStep(
    name="CreateModel",
    step_args=model.create(instance_type=inference_instance_type),
)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.2xlarge.


# Transform Step

You use a transform step for batch transformation to run inference on an entire dataset. 

In this example, we define a transform step to run inferences on the test dataset on S3 bucket. The output from the transform step is to be used for model quality check.


In [37]:
model_client_config = { "InvocationsTimeoutInSeconds" : 10, "InvocationsMaxRetries" : 3 }

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.2xlarge",
    instance_count=1,
    accept="text/csv",
    assemble_with="Line",
    output_path=f"s3://{default_bucket}/{s3_batch_transform_output_prefix}",
)

step_transform = TransformStep(
    name="BatchTransform",
    transformer=transformer,
    inputs=TransformInput(
        data=f"s3://{default_bucket}/{s3_preprocessing_output_prefix}/test",
        input_filter="$[2:]",
        join_source="Input",
        output_filter="$[1,-1]",
        content_type="text/csv",
        split_type="Line",
        model_client_config = model_client_config
    ),
    cache_config=cache_config,
    depends_on=[step_train]
)

# Model Quality Check

In the **QualityCheckStep**, we calculate the baselines for statistics and constraints using the predictions that the model generates from the test dataset (output from the TransformStep). 

We define the problem type as **BinaryClassification** in the ModelQualityCheckConfig along with specifying the columns which represent the input and output. Since the dataset has no headers, _c0, _c1 are auto-generated header names that should be used in the **ModelQualityCheckConfig**.

In [38]:
model_quality_check_config = ModelQualityCheckConfig(
    baseline_dataset=step_transform.properties.TransformOutput.S3OutputPath,
    dataset_format=DatasetFormat.csv(header=False),
    output_s3_uri=Join(
        on="/",
        values=[
            "s3:/",
            default_bucket,
            s3_quality_prefx,
            ExecutionVariables.PIPELINE_EXECUTION_ID,
            "modelqualitycheckstep",
        ],
    ),
    problem_type="BinaryClassification",
    inference_attribute="_c1",  # use auto-populated headers since we don't have headers in the dataset
    ground_truth_attribute="_c0",  # use auto-populated headers since we don't have headers in the dataset
)

model_quality_check_step = QualityCheckStep(
    name="ModelQualityCheckStep",
    skip_check=skip_check_model_quality,
    register_new_baseline=register_new_baseline_model_quality,
    quality_check_config=model_quality_check_config,
    check_job_config=check_job_config,
    supplied_baseline_statistics=supplied_baseline_statistics_model_quality,
    supplied_baseline_constraints=supplied_baseline_constraints_model_quality,
    model_package_group_name=model_package_group_name,
    cache_config=cache_config,
    depends_on = [step_transform]
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: .
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


# Model Bias Step

Similar to the Data Bias check step, we'll use calculate the model bias using the training dataset and the model trained in the previous step.

In [39]:
model_bias_analysis_cfg_output_path = (
    f"s3://{default_bucket}/{s3_bias_prefx}/modelbiascheckstep/analysis_cfg"
)

model_bias_data_config = DataConfig(
    s3_data_input_path=f"s3://{default_bucket}/{s3_preprocessing_output_prefix}/train",
    s3_output_path=Join(
        on="/",
        values=[
            "s3:/",
            default_bucket,
            s3_bias_prefx,
            ExecutionVariables.PIPELINE_EXECUTION_ID,
            "modelbiascheckstep",
        ],
    ),
    s3_analysis_config_output_path=model_bias_analysis_cfg_output_path,
    label=1,
    dataset_type="text/csv",
    excluded_columns=[0]
)

model_config = ModelConfig(
    model_name=step_create_model.properties.ModelName,
    instance_count=1,
    instance_type="ml.m5.xlarge",
)

# We are using this bias config to configure Clarify to detect bias based on the first feature in the featurized vector for Sex
model_bias_config = BiasConfig(label_values_or_threshold=[1], facet_name=[16], facet_values_or_threshold=[[1]])

model_bias_check_config = ModelBiasCheckConfig(
    data_config=model_bias_data_config,
    data_bias_config=model_bias_config,
    model_config=model_config,
    model_predicted_label_config=ModelPredictedLabelConfig(),
)

model_bias_check_step = ClarifyCheckStep(
    name="ModelBiasCheckStep",
    clarify_check_config=model_bias_check_config,
    check_job_config=check_job_config,
    skip_check=skip_check_model_bias,
    register_new_baseline=register_new_baseline_model_bias,
    supplied_baseline_constraints=supplied_baseline_constraints_model_bias,
    model_package_group_name=model_package_group_name,
    cache_config=cache_config,
    depends_on=[model_quality_check_step]
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.0.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.model_monitor.clarify_model_monitoring:Uploading analysis config to {s3_uri}.
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.0.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


# Model Explainability

SageMaker Clarify uses a model-agnostic feature attribution approach, which you can use to understand why a model made a prediction after training and to provide per-instance explanation during inference. The implementation includes a scalable and efficient implementation of SHAP, based on the concept of a Shapley value from the field of cooperative game theory that assigns each feature an importance value for a particular prediction.

For Model Explainability, Clarify requires an explainability configuration to be provided. In this example, we use SHAPConfig. For more information of explainability_config, visit the [Clarify documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/clarify-model-explainability.html).

In [40]:
model_explainability_analysis_cfg_output_path = "s3://{}/{}/{}/{}".format(
    default_bucket, s3_explainability_prefix, "modelexplainabilitycheckstep", "analysis_cfg"
)

model_explainability_data_config = DataConfig(
    s3_data_input_path=f"s3://{default_bucket}/{s3_preprocessing_output_prefix}/validation",
    s3_output_path=Join(
        on="/",
        values=[
            "s3:/",
            default_bucket,
            s3_explainability_prefix,
            ExecutionVariables.PIPELINE_EXECUTION_ID,
            "modelexplainabilitycheckstep",
        ],
    ),
    s3_analysis_config_output_path=model_explainability_analysis_cfg_output_path,
    label=1,
    excluded_columns=[0],
    dataset_type="text/csv",
)
shap_config = SHAPConfig(seed=123, num_samples=10, num_clusters=2)
model_explainability_check_config = ModelExplainabilityCheckConfig(
    data_config=model_explainability_data_config,
    model_config=model_config,
    explainability_config=shap_config,
)
model_explainability_check_step = ClarifyCheckStep(
    name="ModelExplainabilityCheckStep",
    clarify_check_config=model_explainability_check_config,
    check_job_config=check_job_config,
    skip_check=skip_check_model_explainability,
    register_new_baseline=register_new_baseline_model_explainability,
    supplied_baseline_constraints=supplied_baseline_constraints_model_explainability,
    model_package_group_name=model_package_group_name,
    cache_config=cache_config,
    depends_on=[model_bias_check_step]
)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.0.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.model_monitor.clarify_model_monitoring:Uploading analysis config to {s3_uri}.
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.0.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


# Evaluation Step

The evaluation step performs model evaluation by calculating performance metrics such as precision, recall, F1 score etc.
The step is executed as a processing job. The output from the step is used in the Condition Step to determine if the model should be registered or not.

In [41]:
#Upload the evaluation script to S3
s3_client.Bucket(default_bucket).upload_file("pipelines/cust_churn_prediction/evaluate.py", f"{s3_code_prefix}/evaluate.py")

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.5-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)

# define model evaluation step to evaluate the trained model
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="model-evaluation",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=f"s3://{default_bucket}/{s3_preprocessing_output_prefix}/validation",
            destination="/opt/ml/processing/validation",
        ),
    ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",\
                             destination=f"s3://{default_bucket}/{s3_model_evaluation_prefix}"),
        ],
    code=f"s3://{default_bucket}/{s3_code_prefix}/evaluate.py",
)

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="EvaluateModel",
    step_args=eval_args,
    property_files=[evaluation_report],
    depends_on=[step_create_model]
)

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.


# Model Metrics

Define the metrics to be registered with the model in the Model Registry

In [42]:
model_metrics = ModelMetrics(
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_check_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    bias_pre_training=MetricsSource(
        s3_uri=data_bias_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    model_statistics=MetricsSource(
        s3_uri=model_quality_check_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    model_constraints=MetricsSource(
        s3_uri=model_quality_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    bias_post_training=MetricsSource(
        s3_uri=model_bias_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    explainability=MetricsSource(
        s3_uri=model_explainability_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
)

drift_check_baselines = DriftCheckBaselines(
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_check_step.properties.BaselineUsedForDriftCheckStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    bias_pre_training_constraints=MetricsSource(
        s3_uri=data_bias_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    bias_config_file=FileSource(
        s3_uri=model_bias_check_config.monitoring_analysis_config_uri,
        content_type="application/json",
    ),
    model_statistics=MetricsSource(
        s3_uri=model_quality_check_step.properties.BaselineUsedForDriftCheckStatistics,
        content_type="application/json",
    ),
    model_constraints=MetricsSource(
        s3_uri=model_quality_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    bias_post_training_constraints=MetricsSource(
        s3_uri=model_bias_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    explainability_constraints=MetricsSource(
        s3_uri=model_explainability_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    explainability_config_file=FileSource(
        s3_uri=model_explainability_check_config.monitoring_analysis_config_uri,
        content_type="application/json",
    ),
)

# Register Model with Model Registry

In the model registration step, we'll include the data and model baselines, and the model metrics to register a model versin.

The intention behind these parameters is to give users a way to configure the baselines associated with a model so they can be used in drift checks or model monitoring jobs. Each time a pipeline is executed, users can choose to update the drift_check_baselines with newly calculated baselines. The model_metrics can be used to register the newly calculated baselines or any other metrics associated with the model.

Every time a baseline is calculated, it is not necessary that the baselines used for drift checks are updated to the newly calculated baselines. In some cases, users may retain an older version of the baseline file to be used for drift checks and not register new baselines that are calculated in the Pipeline run.

In [43]:
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
    drift_check_baselines=drift_check_baselines
)
step_register = ModelStep(name="RegisterModel", step_args=register_args, depends_on=[model_explainability_check_step])

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.


# Define Failed Step

Define a FailStep to stop an Amazon SageMaker Model Building Pipelines execution when a desired condition or state is not achieved and to mark that pipeline's execution as failed. The FailStep also allows you to enter a custom error message, indicating the cause of the pipeline's execution failure.

In [44]:
step_fail = FailStep(
    name="EvalScoreFail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", auc_score_threshold]),
    )

# Condition Check

We define a condition check to determine whether to stop the pipeline execution when the evaluation scores (AUC) did not meet the threshold specified. 

In [49]:
cond_gte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckEvaluationScore",
    conditions=[cond_gte],
    if_steps=[step_transform, model_quality_check_step, model_bias_check_step, model_explainability_check_step,step_register],
    else_steps=[step_fail]
)

# Create a Pipeline

In [51]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        skip_check_data_quality,
        register_new_baseline_data_quality,
        supplied_baseline_constraints_data_bias,
        supplied_baseline_statistics_data_quality,
        supplied_baseline_constraints_data_quality,
        skip_check_data_bias,
        register_new_baseline_data_bias,
        training_instance_type,
        inference_instance_type,
        skip_check_model_quality,
        register_new_baseline_model_quality,
        supplied_baseline_statistics_model_quality,
        supplied_baseline_constraints_model_quality,
        skip_check_model_bias,
        register_new_baseline_model_bias,
        supplied_baseline_constraints_model_bias,
        skip_check_model_explainability,
        register_new_baseline_model_explainability,
        supplied_baseline_constraints_model_explainability,
        input_data_prefix,
        model_approval_status,
        auc_score_threshold,
    ],
    steps=[step_process, 
        data_quality_check_step,
        data_bias_check_step,
        step_train, 
        step_create_model,
        step_eval, 
        step_cond
    ]
) 

definition = json.loads(pipeline.definition())
print(definition)

{'Version': '2020-12-01', 'Metadata': {}, 'Parameters': [{'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 4}, {'Name': 'ProcessingInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m5.4xlarge'}, {'Name': 'SkipDataQualityCheck', 'Type': 'Boolean', 'DefaultValue': True}, {'Name': 'RegisterNewDataQualityBaseline', 'Type': 'Boolean', 'DefaultValue': True}, {'Name': 'DataBiasSuppliedBaselineConstraints', 'Type': 'String', 'DefaultValue': ''}, {'Name': 'DataQualitySuppliedStatistics', 'Type': 'String', 'DefaultValue': ''}, {'Name': 'DataQualitySuppliedConstraints', 'Type': 'String', 'DefaultValue': ''}, {'Name': 'SkipDataBiasCheck', 'Type': 'Boolean', 'DefaultValue': True}, {'Name': 'RegisterNewDataBiasBaseline', 'Type': 'Boolean', 'DefaultValue': True}, {'Name': 'TrainingInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m5.2xlarge'}, {'Name': 'InferenceInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m5.2xlarge'}, {'Name': 'SkipModelQualityCheck', 'Type': 'Boo

# Start a Pipeline Execution

In [52]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:602900100639:pipeline/streamingservicechurnmodelpipeline',
 'ResponseMetadata': {'RequestId': 'aeba6276-ae66-41a0-bc90-cb741d6eaf9e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'aeba6276-ae66-41a0-bc90-cb741d6eaf9e',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '102',
   'date': 'Thu, 02 Feb 2023 19:15:07 GMT'},
  'RetryAttempts': 0}}

In [53]:
pipeline.start(parameters=dict(
    SkipDataQualityCheck=True,
    RegisterNewDataQualityBaseline=True,
    SkipDataBiasCheck=True,
    RegisterNewDataBiasBaseline=True))


_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:602900100639:pipeline/streamingservicechurnmodelpipeline/execution/59twe0l2n79i', sagemaker_session=<sagemaker.session.Session object at 0x7f73275ac590>)