# Assignment 3: Create a SageMaker pipeline
In this assignment you create an end-to-end ML workflow using [SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines/).

Refer to the notebook [`03-sagemaker-pipeline.ipynb`](../03-sagemaker-pipeline.ipynb) for code snippets and a general guidance for the exercises in this assignment.

## Import packages

In [4]:
import pandas as pd
import json
import boto3
import pathlib
import io
import sagemaker
from time import gmtime, strftime, sleep
from sagemaker.deserializers import CSVDeserializer
from sagemaker.serializers import CSVSerializer

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput, 
    ProcessingOutput, 
    ScriptProcessor
)
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
    ProcessingStep, 
    TrainingStep, 
    CreateModelStep
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (
    ParameterInteger, 
    ParameterFloat, 
    ParameterString, 
    ParameterBoolean
)
from sagemaker.workflow.clarify_check_step import (
    ModelBiasCheckConfig, 
    ClarifyCheckStep, 
    ModelExplainabilityCheckConfig
)
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.conditions import (
    ConditionGreaterThan,
    ConditionGreaterThanOrEqualTo
)
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import (
    Join,
    JsonGet
)
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda

from sagemaker.model_metrics import (
    MetricsSource, 
    ModelMetrics, 
    FileSource
)
from sagemaker.drift_check_baselines import DriftCheckBaselines

from sagemaker.image_uris import retrieve

sagemaker.__version__

'2.145.0'

## Set constants
You need some fixed string literals to simplify coding. Create these literals here.

In [5]:
%store -r 

%store

try:
    initialized
except NameError:
    print("+++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN 00-start-here notebook   ")
    print("+++++++++++++++++++++++++++++++++++++++++++++++++")

Stored variables and their in-db values:
athena_table_name                      -> 'sagemaker_workshop_e2e_churn_1686747619'
baseline_s3_url                        -> 's3://sagemaker-us-east-1-531485126105/from-idea-t
bucket                                 -> 'sagemaker-studio-us-east-1-531485126105'
bucket_name                            -> 'sagemaker-us-east-1-531485126105'
bucket_prefix                          -> 'from-idea-to-prod/xgboost'
churn_feature_group_name               -> 'sagemaker-workshop-e2e-churn'
docker_image_name                      -> '683313688378.dkr.ecr.us-east-1.amazonaws.com/sage
domain_id                              -> 'd-1qvmpqvqiuve'
evaluation_s3_url                      -> 's3://sagemaker-us-east-1-531485126105/from-idea-t
experiment_name                        -> 'from-idea-to-prod-experiment-19-09-26-44'
framework_version                      -> '1.3-1'
initialized                            -> True
input_s3_url                           -> 's3://sag

In [6]:
# Set names of pipeline objects
project = "from-idea-to-prod"

pipeline_name = f"{project}-pipeline"
pipeline_model_name = f"{project}-model-xgb"
model_package_group_name = f"{project}-model-group"
endpoint_config_name = f"{project}-endpoint-config"
endpoint_name = f"{project}-endpoint"

# Set instance types and counts
process_instance_type = "ml.c5.xlarge"
train_instance_count = 1
train_instance_type = "ml.m5.xlarge"

# Set S3 urls for processed data
# train_s3_url = 
# validation_s3_url = 
# test_s3_url = 
# baseline_s3_url = 
# evaluation_s3_url = 

In [7]:
# Set S3 urls for processed data
train_s3_url = f"s3://{bucket_name}/{bucket_prefix}/train"
validation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/validation"
test_s3_url = f"s3://{bucket_name}/{bucket_prefix}/test"
baseline_s3_url = f"s3://{bucket_name}/{bucket_prefix}/baseline"

evaluation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/evaluation"
prediction_baseline_s3_url = f"s3://{bucket_name}/{bucket_prefix}/prediction_baseline"

output_s3_url = f"s3://{bucket_name}/{bucket_prefix}/output"

In [8]:
%store train_s3_url
%store validation_s3_url
%store test_s3_url
%store baseline_s3_url
%store model_package_group_name
%store evaluation_s3_url
%store prediction_baseline_s3_url
%store output_s3_url

Stored 'train_s3_url' (str)
Stored 'validation_s3_url' (str)
Stored 'test_s3_url' (str)
Stored 'baseline_s3_url' (str)
Stored 'model_package_group_name' (str)
Stored 'evaluation_s3_url' (str)
Stored 'prediction_baseline_s3_url' (str)
Stored 'output_s3_url' (str)


In [9]:
print(f"Train S3 url: {train_s3_url}")
print(f"Validation S3 url: {validation_s3_url}")
print(f"Test S3 url: {test_s3_url}")
print(f"Data baseline S3 url: {baseline_s3_url}")
print(f"Evaluation metrics S3 url: {evaluation_s3_url}")
print(f"Model prediction baseline S3 url: {prediction_baseline_s3_url}")

Train S3 url: s3://sagemaker-us-east-1-531485126105/from-idea-to-prod/xgboost/train
Validation S3 url: s3://sagemaker-us-east-1-531485126105/from-idea-to-prod/xgboost/validation
Test S3 url: s3://sagemaker-us-east-1-531485126105/from-idea-to-prod/xgboost/test
Data baseline S3 url: s3://sagemaker-us-east-1-531485126105/from-idea-to-prod/xgboost/baseline
Evaluation metrics S3 url: s3://sagemaker-us-east-1-531485126105/from-idea-to-prod/xgboost/evaluation
Model prediction baseline S3 url: s3://sagemaker-us-east-1-531485126105/from-idea-to-prod/xgboost/prediction_baseline


## Exercise 1: Create pipeline
Follow the steps to create a SageMaker pipeline with your ML workflow:
- Setup pipeline [parameters](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#parameters)
- Build the pipeline [steps](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#steps)
- Construct the [pipeline](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline)
- [Upsert](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.upsert) the pipeline

Configure outputs and inputs for each pipeline step.

In [10]:
# Setup pipeline parameters
# Set S3 url for input dataset
# input_s3_url_param = ParameterString()


In [11]:
session = PipelineSession()

In [12]:
# Set processing instance type
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

# Set training instance type
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

# Set training instance count
train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=train_instance_count
)

# Set model approval param
model_approval_status_param = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

# Minimal threshold for model performance on the test dataset
test_score_threshold_param = ParameterFloat(
    name="TestScoreThreshold", 
    default_value=0.75
)

# Set S3 url for input dataset
input_s3_url_param = ParameterString(
    name="InputDataUrl",
    default_value=input_s3_url,
)

In [13]:
# Processing step
# sklearn_processor = SKLearnProcessor()

# processing_inputs=[]

# processing_outputs=[]

# processor_args = sklearn_processor.run()

# step_process = ProcessingStep()

In [14]:
%%writefile preprocessing.py

import pandas as pd
import numpy as np
import argparse
import os

def _parse_args():
    
    parser = argparse.ArgumentParser()
    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument('--filepath', type=str, default='/opt/ml/processing/input/')
    parser.add_argument('--filename', type=str, default='bank-additional-full.csv')
    parser.add_argument('--outputpath', type=str, default='/opt/ml/processing/output/')
    
    return parser.parse_known_args()


if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    
    target_col = "y"
    
    # Load data
    df_data = pd.read_csv(os.path.join(args.filepath, args.filename), sep=";")

    # Indicator variable to capture when pdays takes a value of 999
    df_data["no_previous_contact"] = np.where(df_data["pdays"] == 999, 1, 0)

    # Indicator for individuals not actively employed
    df_data["not_working"] = np.where(
        np.in1d(df_data["job"], ["student", "retired", "unemployed"]), 1, 0
    )

    # remove unnecessary data
    df_model_data = df_data.drop(
        ["duration", "emp.var.rate", "cons.price.idx", "cons.conf.idx", "euribor3m", "nr.employed"],
        axis=1,
    )

    df_model_data = pd.get_dummies(df_model_data)  # Convert categorical variables to sets of indicators

    # Replace "y_no" and "y_yes" with a single label column, and bring it to the front:
    df_model_data = pd.concat(
        [
            df_model_data["y_yes"].rename(target_col),
            df_model_data.drop(["y_no", "y_yes"], axis=1),
        ],
        axis=1,
    )

    # Shuffle and splitting dataset
    train_data, validation_data, test_data = np.split(
        df_model_data.sample(frac=1, random_state=1729),
        [int(0.7 * len(df_model_data)), int(0.9 * len(df_model_data))],
    )

    print(f"Data split > train:{train_data.shape} | validation:{validation_data.shape} | test:{test_data.shape}")
    
    # Save datasets locally
    train_data.to_csv(os.path.join(args.outputpath, 'train/train.csv'), index=False, header=False)
    validation_data.to_csv(os.path.join(args.outputpath, 'validation/validation.csv'), index=False, header=False)
    test_data[target_col].to_csv(os.path.join(args.outputpath, 'test/test_y.csv'), index=False, header=False)
    test_data.drop([target_col], axis=1).to_csv(os.path.join(args.outputpath, 'test/test_x.csv'), index=False, header=False)
    
    # Save the baseline dataset for model monitoring
    df_model_data.drop([target_col], axis=1).to_csv(os.path.join(args.outputpath, 'baseline/baseline.csv'), index=False, header=False)
    
    print("## Processing complete. Exiting.")

Overwriting preprocessing.py


In [15]:
sklearn_processor = SKLearnProcessor(
        framework_version="0.23-1",
        role=sm_role,
        instance_type=process_instance_type_param,
        instance_count=1,
        base_job_name=f"{pipeline_name}/preprocess",
        sagemaker_session=session,
    )
    
processing_inputs=[
    ProcessingInput(source=input_s3_url_param, destination="/opt/ml/processing/input")
]

processing_outputs=[
    ProcessingOutput(output_name="train_data", source="/opt/ml/processing/output/train", 
                     destination=train_s3_url),
    ProcessingOutput(output_name="validation_data", source="/opt/ml/processing/output/validation",
                     destination=validation_s3_url),
    ProcessingOutput(output_name="test_data", source="/opt/ml/processing/output/test",
                     destination=test_s3_url),
    ProcessingOutput(output_name="baseline_data", source="/opt/ml/processing/output/baseline", 
                     destination=baseline_s3_url),
]

processor_args = sklearn_processor.run(
    inputs=processing_inputs,
    outputs=processing_outputs,
    code='preprocessing.py',
    # arguments = ['arg1', 'arg2'],
)
    
# Define processing step
step_process = ProcessingStep(
    name=f"{pipeline_name}-preprocess-data",
    step_args=processor_args,
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


In [16]:
# xgboost_image_uri = sagemaker.image_uris.retrieve("xgboost", region=region, version="1.5-1")
linear_image_uri = sagemaker.image_uris.retrieve("linear-learner", region=region, version="1")

In [17]:
# Training step
# estimator = sagemaker.estimator.Estimator()

# estimator.set_hyperparameters()

# training_inputs = {}

# training_args = estimator.fit(training_inputs)

# step_train = TrainingStep()

In [18]:
# Instantiate an Linear estimator object
estimator = sagemaker.estimator.Estimator(
    image_uri=linear_image_uri,
    role=sm_role, 
    instance_type=train_instance_type_param,
    instance_count=train_instance_count_param,
    output_path=output_s3_url,
    sagemaker_session=session,
    base_job_name=f"{pipeline_name}/train",
)

# Define algorithm hyperparameters
estimator.set_hyperparameters(
    num_round=150, # the number of rounds to run the training
    max_depth=5, # maximum depth of a tree
    eta=0.5, # step size shrinkage used in updates to prevent overfitting
    alpha=2.5, # L1 regularization term on weights
    objective="binary:logistic",
    eval_metric="auc", # evaluation metrics for validation data
    subsample=0.8, # subsample ratio of the training instance
    colsample_bytree=0.8, # subsample ratio of columns when constructing each tree
    min_child_weight=3, # minimum sum of instance weight (hessian) needed in a child
    early_stopping_rounds=10, # the model trains until the validation score stops improving
    verbosity=1, # verbosity of printing messages
)

training_inputs = {
    "train": TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "train_data"
        ].S3Output.S3Uri,
        content_type="text/csv",
    ),
    "validation": TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "validation_data"
        ].S3Output.S3Uri,
        content_type="text/csv",
    ),
}

training_args = estimator.fit(training_inputs)

# Define training step
step_train = TrainingStep(
    name=f"{pipeline_name}-train",
    step_args=training_args
)

Create an executable Python script for the evaluation step

In [19]:
%%writefile evaluation_assignment.py

import json
import pathlib
import pickle as pkl
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb
import datetime as dt
from sklearn.metrics import roc_curve, auc

if __name__ == "__main__":   
    # All paths are local for the processing container
        
    # Read model tar file

    # Load model
    
    # Read test data

    # Run predictions

    # Evaluate predictions

    # Save evaluation report

Overwriting evaluation_assignment.py


In [20]:
%%writefile evaluation.py

import json
import os
import pathlib
import pickle as pkl
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb
import datetime as dt
from sklearn.metrics import roc_curve, auc

if __name__ == "__main__":   
    
    # All paths are local for the processing container
    model_path = "/opt/ml/processing/model/model.tar.gz"
    test_x_path = "/opt/ml/processing/test/test_x.csv"
    test_y_path = "/opt/ml/processing/test/test_y.csv"
    output_dir = "/opt/ml/processing/evaluation"
    output_prediction_path = "/opt/ml/processing/output/"
        
    # Read model tar file
    with tarfile.open(model_path, "r:gz") as t:
        t.extractall(path=".")
    
    # Load model
    model = xgb.Booster()
    model.load_model("xgboost-model")
    
    # Read test data
    X_test = xgb.DMatrix(pd.read_csv(test_x_path, header=None).values)
    y_test = pd.read_csv(test_y_path, header=None).to_numpy()

    # Run predictions
    probability = model.predict(X_test)

    # Evaluate predictions
    fpr, tpr, thresholds = roc_curve(y_test, probability)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }

    # Save evaluation report
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    with open(f"{output_dir}/evaluation.json", "w") as f:
        f.write(json.dumps(report_dict))
    
    # Save prediction baseline file - we need it later for the model quality monitoring
    pd.DataFrame({"prediction":np.array(np.round(probability), dtype=int),
                  "probability":probability,
                  "label":y_test.squeeze()}
                ).to_csv(os.path.join(output_prediction_path, 'prediction_baseline/prediction_baseline.csv'), index=False, header=True)

Overwriting evaluation.py


Use this script to setup a [ScriptProcessor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ScriptProcessor) object. Don't forget to pass the [PropertyFile](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.properties.PropertyFile) where the evaluation script outputs the metrics to the [ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) constructor.

In [21]:
# Evaluation step
# script_processor = ScriptProcessor()

# eval_inputs=[]

# eval_outputs=[]

# eval_args = script_processor.run()

# evaluation_report = PropertyFile()

# step_eval = ProcessingStep()

In [24]:
%%writefile evaluation.py

import json
import os
import pathlib
import pickle as pkl
import tarfile
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb
import datetime as dt
from sklearn.metrics import roc_curve, auc

if __name__ == "__main__":   
    
    # All paths are local for the processing container
    model_path = "/opt/ml/processing/model/model.tar.gz"
    test_x_path = "/opt/ml/processing/test/test_x.csv"
    test_y_path = "/opt/ml/processing/test/test_y.csv"
    output_dir = "/opt/ml/processing/evaluation"
    output_prediction_path = "/opt/ml/processing/output/"
        
    # Read model tar file
    with tarfile.open(model_path, "r:gz") as t:
        t.extractall(path=".")
    
    # Load model
    model = linear.Booster()
    model.load_model("linear-model")
    
    # Read test data
    X_test = xgb.DMatrix(pd.read_csv(test_x_path, header=None).values)
    y_test = pd.read_csv(test_y_path, header=None).to_numpy()

    # Run predictions
    probability = model.predict(X_test)

    # Evaluate predictions
    fpr, tpr, thresholds = roc_curve(y_test, probability)
    auc_score = auc(fpr, tpr)
    report_dict = {
        "classification_metrics": {
            "auc_score": {
                "value": auc_score,
            },
        },
    }

    # Save evaluation report
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    with open(f"{output_dir}/evaluation.json", "w") as f:
        f.write(json.dumps(report_dict))
    
    # Save prediction baseline file - we need it later for the model quality monitoring
    pd.DataFrame({"prediction":np.array(np.round(probability), dtype=int),
                  "probability":probability,
                  "label":y_test.squeeze()}
                ).to_csv(os.path.join(output_prediction_path, 'prediction_baseline/prediction_baseline.csv'), index=False, header=True)

Overwriting evaluation.py


In [25]:
script_processor = ScriptProcessor(
    image_uri=linear_image_uri,
    role=sm_role,
    command=["python3"],
    instance_type=process_instance_type_param,
    instance_count=1,
    base_job_name=f"{pipeline_name}/evaluate",
    sagemaker_session=session,
)

eval_inputs=[
    ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts, 
                    destination="/opt/ml/processing/model"),
    ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri, 
                    destination="/opt/ml/processing/test"),
]

eval_outputs=[
    ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", 
                     destination=evaluation_s3_url),
    ProcessingOutput(output_name="prediction_baseline_data", source="/opt/ml/processing/output/prediction_baseline", 
                     destination=prediction_baseline_s3_url),
]

eval_args = script_processor.run(
    inputs=eval_inputs,
    outputs=eval_outputs,
    code="evaluation.py",
)
    
evaluation_report = PropertyFile(
    name="ModelEvaluationReport", output_name="evaluation", path="evaluation.json"
)

step_eval = ProcessingStep(
    name=f"{pipeline_name}-evaluate-model",
    step_args=eval_args,
    property_files=[evaluation_report]
)

Use a model artifact from the training step and a property file from the evaluation step to create a [Model](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.Model) and [ModelMetrics](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_metrics.ModelMetrics) objects. Use these objects to construct a [ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.model_step.ModelStep).

In [None]:
# Register step
# model = Model()

# model_metrics = ModelMetrics()

# register_args = model.register()

# step_register = ModelStep()

In [27]:
model = Model(
    image_uri=linear_image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    name=f"from-idea-to-prod-linear-model",
    sagemaker_session=session,
    role=sm_role,
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge", "ml.m5.large"],
    transform_instances=["ml.m5.xlarge", "ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status_param,
    model_metrics=model_metrics,
)

step_register = ModelStep(
    name=f"{pipeline_name}-register",
    step_args=register_args
)

Add a [FailStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.fail_step.FailStep) to stop the pipeline execution if the model performance metric doesn't meet the specified threshold. 

In [None]:
# Fail step
# step_fail = FailStep()

In [28]:
step_fail = FailStep(
    name=f"{pipeline_name}-fail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", test_score_threshold_param]),
)

Use [Conditions](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#conditions) and `JsonGet` to construct a condition and the condition step.

In [None]:
# Condition step
# cond_lte = ConditionGreaterThan()

# step_cond = ConditionStep()

In [29]:
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=test_score_threshold_param,
)

step_cond = ConditionStep(
    name=f"{pipeline_name}-check-test-score",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[step_fail],
)

Use step array to create a [Pipeline](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline) object.

In [None]:
# Create pipeline
# pipeline = Pipeline()

In [30]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        process_instance_type_param,
        train_instance_type_param,
        train_instance_count_param,
        model_approval_status_param,
        test_score_threshold_param,
        input_s3_url_param,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=session,
)

In [None]:
# Create a new or update existing Pipeline
# pipeline.upsert(role_arn=sm_role)

In [31]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sm_role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:531485126105:pipeline/from-idea-to-prod-pipeline',
 'ResponseMetadata': {'RequestId': '60a668a7-33d8-4b9f-b08b-c92ad49bf681',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '60a668a7-33d8-4b9f-b08b-c92ad49bf681',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '94',
   'date': 'Thu, 22 Jun 2023 05:31:33 GMT'},
  'RetryAttempts': 0}}

In [None]:
# Print the pipeline definition
# pipeline_definition = json.loads(pipeline.describe()['PipelineDefinition'])
# pipeline_definition

In [32]:
pipeline_definition = json.loads(pipeline.describe()['PipelineDefinition'])
pipeline_definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.xlarge'},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'TestScoreThreshold', 'Type': 'Float', 'DefaultValue': 0.75},
  {'Name': 'InputDataUrl',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-531485126105/from-idea-to-prod/xgboost/input/bank-additional-full.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'from-idea-to-prod-pipeline-preprocess-data',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInsta

## Exercise 2: Execute the pipeline

In [None]:
# Start an execution
# execution = pipeline.start()

In [33]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType=process_instance_type,
        TrainingInstanceType=train_instance_type,
        TrainingInstanceCount=train_instance_count,
        ModelApprovalStatus="PendingManualApproval",
        TestScoreThreshold=0.75,
        InputDataUrl=input_s3_url
    )
)

In [None]:
# Un-comment this call if you want the notebook to wait until the pipeline's execution finished
# Execution time of this pipeline is about 13 minutes
# execution.wait()

In [None]:
# List execution steps
# execution.list_steps()

In [34]:
execution.list_steps()

[{'StepName': 'from-idea-to-prod-pipeline-preprocess-data',
  'StartTime': datetime.datetime(2023, 6, 22, 5, 32, 47, 589000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:531485126105:processing-job/pipelines-zcwjjqgvanqt-from-idea-to-prod-pi-iUccvrTm3B'}}}]

In [35]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

## Continue with the assignment 4
Navigate to the [assignment 4](04-assignment-sagemaker-project.ipynb) notebook.