## Use Sagemaker Pipelines To Orchestrate End To End Cross Validation Model Training Workflow

Amazon SageMaker Pipelines simplifies ML workflows orchestration across each step of the ML process, from exploration data analysis, preprocessing to model training and model deployment. 
With Sagemaker Pipelines, you can develop a consistent, reusable workflow that integrates with CI/CD pipeline for improved quality and reduced errors throughout development lifecycle.

## SageMaker Pipelines
An ML workflow built using Sagemaker Pipeline is made up of a series of Steps defined as a directed acryclic graph (DAG). The pipeline is expressed in JSON definition that captures relationships between the steps of your pipeline. Here's a terminology used in Sagemaker Pipeline for defining an ML workflow.

* Pipelines - Top level definition of a pipeline. It encapsulates name, parameters, and steps. A pipeline is scoped within an account and region. 
* Parameters - Parameters are defined in the pipeline definition. It introduces variables that can be provided to the pipeline at execution time. Parameters support string, float and integer types. 
* Pipeline Steps - Defines the actions that the pipeline takes and the relationships between steps using properties. Sagemaker Pipelines support the following step types: <b>Processing, Training, Transform, CreateModel, RegisterModel, Condition, Callback</b>.

## Notebook Overview
This notebook implements a complete Cross Validation ML model workflow using a custom built docker image, HyperparameterTuner for automatic hyperparameter optimization, 
SKLearn framework for K fold split and model training. The workflow is defined orchestrated using Sagemaker Pipelines. 
Here are the main steps involved the end to end workflow:
    
<ol>
<li>Defines a list of parameters, with default values to be used throughout the pipeline</li>
<li>Defines a ProcessingStep with SKLearn processor to perform KFold cross validation splits</li>
<li>Defines a ProcessingStep that orchestrates cross validation model training with HyperparameterTuner integration </li>
<li>Defines a ConditionStep that validates the model performance against the baseline</li>
<li>Defines a TrainingStep to train the model with the hyperparameters suggested by HyperparameterTuner using all the dataset </li>
<li>Creates a Model package, defines RegisterModel to register the trained model in the previous step with Sagemaker Model Registry</li>    
</ol>

## Dataset

The Iris flower data set is a multivariate data set introduced by the British statistician, eugenicist, and biologist Ronald Fisher in his 1936 [paper](https://onlinelibrary.wiley.com/doi/abs/10.1111/j.1469-1809.1936.tb02137.x). The data set consists of 50 samples from each of 3 species of Iris:
* Iris setosa 
* Iris virginica  
* Iris versicolor

There are 4 features available in each sample: the length and the width of the sepals and petals measured in centimeters. 

Based on the combination of these four features, we are going to build a linear algorithm (SVM) to train a multiclass classification model to distinguish the species from each other.

## Defines Pipeline Parameters
With Pipeline Parameters, you can introduce variables to the pipeline that specific to the pipeline run.  
The supported parameter types include:

ParameterString - represents a str Python type
ParameterInteger - represents an int Python type
ParameterFloat - represents a float Python type

Additionally, parameters support default values, which can be useful for scenarios where only a subset of the defined parameters need to change. For example, for training a model that uses k fold Cross Validation method, you could provide the desired k value at pipeline execution time. 

Here are the parameters for the workflow used in this notebook:

*

In [2]:
import os

import sagemaker
from sagemaker import ScriptProcessor, ModelMetrics, MetricsSource, TrainingInput
from sagemaker.estimator import Estimator
from sagemaker.network import NetworkConfig
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.sklearn import SKLearnProcessor, SKLearn
from sagemaker.model import Model
from sagemaker.workflow.condition_step import JsonGet, ConditionStep
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.dataset_definition.inputs import (
    AthenaDatasetDefinition,
    DatasetDefinition,
)
from datetime import datetime
import time
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.execution_variables import ExecutionVariables

In [3]:
import boto3
import sagemaker


def get_environment(project_name, ssm_params):
    sm = boto3.client("sagemaker")
    ssm = boto3.client("ssm")

    r = sm.describe_domain(
            DomainId=sm.describe_project(
                ProjectName=project_name
                )["CreatedBy"]["DomainId"]
        )
    del r["ResponseMetadata"]
    del r["CreationTime"]
    del r["LastModifiedTime"]
    r = {**r, **r["DefaultUserSettings"]}
    del r["DefaultUserSettings"]

    i = {
        **r,
        **{t["Key"]:t["Value"]
            for t in sm.list_tags(ResourceArn=r["DomainArn"])["Tags"]
            if t["Key"] in ["EnvironmentName", "EnvironmentType"]}
    }

    for p in ssm_params:
        try:
            i[p["VariableName"]] = ssm.get_parameter(Name=f"{i['EnvironmentName']}-{i['EnvironmentType']}-{p['ParameterName']}")["Parameter"]["Value"]
        except:
            i[p["VariableName"]] = ""

    return i


def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")

    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    ), sagemaker_client


def environment_data(project_name):
    # Dynamically load environmental SSM parameters - provide the list of the variables to load from SSM parameter store
    ssm_parameters = [
        {"VariableName": "DataBucketName", "ParameterName": "data-bucket-name"},
        {"VariableName": "ModelBucketName", "ParameterName": "model-bucket-name"},
        {"VariableName": "S3KmsKeyId", "ParameterName": "kms-s3-key-arn"},
        {"VariableName": "EbsKmsKeyArn", "ParameterName": "kms-ebs-key-arn"},
        {"VariableName": "TrustedDefaultKinesisAccount", "ParameterName": "TrustedDefaultKinesisAccount"},
    ]
    env_data = get_environment(project_name=project_name, ssm_params=ssm_parameters)
    env_data["ProcessingRole"] = env_data["ExecutionRole"]
    env_data["TrainingRole"] = env_data["ExecutionRole"]
    
    return env_data


In [4]:
import os
import json

def get_pipeline(
        region,
        project_name=None,
        # source_scripts_path="./",
        model_package_group_name="AbalonePackageGroup",
        pipeline_name="AbalonePipeline",
        base_job_prefix="Abalone",
        revision="no-revision-provided",):
    """Gets a SageMaker ML Pipeline instance working with on abalone data.

    Args:
        region: AWS region to create and run the pipeline.
        @todo arg. definitions

    Returns:
        an instance of a pipeline
    """

    # get env data
    env_data = environment_data(project_name)

    sagemaker_session, sagemaker_client = get_session(region, env_data["DataBucketName"])
    default_bucket = sagemaker_session.default_bucket()
    base_dir = os.getcwd()

    pipeline = standard_model_pipeline(
        base_job_prefix=base_job_prefix,
        default_bucket=default_bucket,
        env_data=env_data,
        model_package_group_name=model_package_group_name,
        pipeline_name=pipeline_name,
        region=region,
        sagemaker_session=sagemaker_session,
        base_dir=base_dir,
        # source_scripts_path=source_scripts_path,
        project=project_name,
        revision=revision)
    return pipeline


In [5]:
# def sagemaker_pipeline_parameters(data_bucket):
#     processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
#     training_instance_count = "1"
#     processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
#     training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
#     model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")
#     hpo_tuner_instance_type = ParameterString(name="HPOTunerScriptInstanceType", default_value="ml.t3.medium")

#     return model_approval_status, processing_instance_count, processing_instance_type, training_instance_type, training_instance_count, hpo_tuner_instance_type


## (step 1) Preprocessing Step
The first step in K Fold cross validation model workflow is to split the training dataset into k batches randomly.
We are going to use Sagemaker SKLearnProcessor with a preprocessing script to perform dataset splits, and upload the results to the specified S3 bucket for model training step. 

In [6]:
def preprocessing(s3_buckets, base_job_prefix, env_data, network_config, processing_instance_count, processing_instance_type,
                  sagemaker_session, snapshot_path, training_path, test_path,
                  execution_time, framework_version):

    # ## 1- processing step for feature engineering Step
    sklearn_processor = SKLearnProcessor(
        framework_version=framework_version,
        instance_type=processing_instance_type,
        instance_count=1,
        base_job_name="kfold-crossvalidation-split",
        sagemaker_session=sagemaker_session,
        role=env_data["ProcessingRole"],
        network_config=network_config,
        volume_kms_key=env_data["EbsKmsKeyArn"],
        output_kms_key=env_data["S3KmsKeyId"]
    )

    step_process = ProcessingStep(
        name="PreprocessStep",
        processor=sklearn_processor,
        # inputs=[ProcessingInput(source=f'{source_scripts_path}/preprocessing/utils/',
        #                         destination="/opt/ml/processing/input/code/utils/")],
        outputs=[
            ProcessingOutput(
                output_name="train",
                source="/opt/ml/processing/train",
                destination=training_path
            ),
            ProcessingOutput(output_name="test",
                             source="/opt/ml/processing/test",
                             destination=test_path
                             )
        ],
        code="code/preprocessing.py",
    )
    return step_process



## (step 2) Cross Validation Model Training Step 
In Cross Validation Model Training workflow, a script processor is used for orchestrating k training jobs in parallel, each of the k jobs is responsible for training a model using the specified split samples. Additionally, the script processor leverages Sagemaker HyperparameterTuner to optimize the hyper parameters and pass these values to perform k training jobs. The script processor monitors all training jobs. Once the jobs are complete, the script processor captures key metrics, including the training accuracy and the hyperparameters from the best training job, then uploads the results to the specified S3 bucket location to be used for model evaluation and model selection steps.

The components involved in orchestrating the cross validation model training, hyperparameter optimizations and key metrics capture:

* PropertyFile - EvaluationReport, contains the performance metrics from the HyperparameterTuner job, expressed in JSON format.
* PropertyFile JobInfo, contains information about the best training job and the corresponding hyperparameters used for training, expressed in JSON format.
* ScriptProcessor - A python script that orchestrates a hyperparameter tuning job for cross validation model trainings.

## Custom Docker Image
In order to facilitate k fold cross validation training jobs through Sagemaker Automatic Model tuning, we need to create a custom docker image to include both the python script that manages the kfold cross validation training jobs, and the actual training script that each of the k training jobs would submit. For details about adopting custom docker containers to work with Sagemaker, please follow this [link](https://docs.aws.amazon.com/sagemaker/latest/dg/docker-containers-adapt-your-own.html). The docker image used in the pipeline was built using the [Dockerfile](code/Dockerfile) included in this project. 

Following are the steps for working with [ECR](https://aws.amazon.com/ecr/) on authentication, image building and pushing to ECR registry for Sagemaker training: \(follow this [link](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html) for official AWS guidance for working with ECR\)

<b>Prerequisites</b>
* [docker](https://docs.docker.com/get-docker/) 
* [git client](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) 
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-install.html) 

<b>Note:</b>
If you use [AWS Cloud9](https://aws.amazon.com/cloud9/) as the CLI terminal, the prerequisites described above are  met by default, there is no need to install any additional tools.

<b>Steps</b>
* Open a new terminal
* git clone this project
* cd to code directory
* ./build-and-push-docker.sh [aws_acct_id] [aws_region]
* capture the ECR repository name from the script after a successful run. You'll need to provide the image name at  pipeline execution time. Here's an example format of an ECR repo name: ############.dkr.ecr.region.amazonaws.com/sagemaker-cross-validation-pipeline:latest

In [7]:
def training_tasks(s3_buckets, base_job_prefix, env_data, image_uri, network_config, sagemaker_session,
                            training_instance_type, training_instance_count, 
                            hpo_tuner_instance_type, region, framework_version):

    # ## 2- Cross Validation Model Training Step

    evaluation_report = PropertyFile(name="EvaluationReport", output_name="evaluation", path="evaluation.json")
    jobinfo = PropertyFile(name="JobInfo", output_name="jobinfo", path="jobinfo.json")

    script_tuner = ScriptProcessor(
        image_uri=image_uri,
        command=["python3"],
        instance_type=hpo_tuner_instance_type,
        instance_count=1,
        base_job_name="KFoldCrossValidationHyperParameterTuner`",
        role=env_data["TrainingRole"],
        sagemaker_session=sagemaker_session,
        volume_kms_key=env_data["EbsKmsKeyArn"],
        output_kms_key=env_data["S3KmsKeyId"],
        network_config=network_config,
    )

    k = 3
    min_c = 0
    max_c = 1
    min_gamma = 0.0001
    max_gamma = 0.001
    gamma_scaling_type = "Logarithmic"
    max_parallel_jobs = 1
    
    step_cv_train_hpo = ProcessingStep(
        name="HyperParameterTuningStep",
        processor=script_tuner,
        code="code/cross_validation_with_hpo.py",
        outputs=[
            ProcessingOutput(output_name="evaluation",
                             source="/opt/ml/processing/evaluation",
                             destination=s3_buckets['s3_bucket_base_path_evaluation']), 
            ProcessingOutput(output_name="jobinfo",
                             source="/opt/ml/processing/jobinfo",
                             destination=s3_buckets['s3_bucket_base_path_jobinfo'])
        ],
        job_arguments=["-k", k,
                       "--image-uri", image_uri,
                       "--train", s3_buckets['s3_bucket_base_path_train'],
                       "--test", s3_buckets['s3_bucket_base_path_test'],
                       "--instance-type", training_instance_type,
                       "--instance-count", "1",
                       "--output-path", s3_buckets['s3_bucket_base_path_output'],
                       "--max-jobs", "3",
                       "--max-parallel-jobs", max_parallel_jobs,
                       "--region", str(region),
                       "--subnets", env_data["SubnetIds"][0],
                       "--security_group_ids", env_data["SecurityGroups"][0],
                       "--min-c", min_c,
                       "--max-c", max_c,
                       "--min-gamma", min_gamma, 
                       "--max-gamma", max_gamma,
                       "--gamma-scaling-type", gamma_scaling_type,                       
                      ],
        property_files=[evaluation_report],
        depends_on=['PreprocessStep']
    )
    
    # ## 3- Model Selection Step
    sklearn_estimator = SKLearn("scikit_learn_iris.py",
                                framework_version=framework_version,
                                instance_type=training_instance_type,
                                py_version='py3',
                                source_dir='code',
                                output_path=s3_buckets['s3_bucket_base_path_output'],
                                role=env_data["TrainingRole"],
                                subnets=[env_data["SubnetIds"][0]],
                                security_group_ids=[env_data["SecurityGroups"][0]],
                                ) 
    
    step_model_selection = TrainingStep(
        name="ModelSelectionStep",
        estimator=sklearn_estimator,
        inputs={
            "train": TrainingInput(
                s3_data=f"{s3_buckets['s3_bucket_base_path_train']}/all",
                content_type="text/csv"
            ),
            "jobinfo": TrainingInput(
                s3_data=f"{s3_buckets['s3_bucket_base_path_jobinfo']}",
                content_type="application/json"
            )
        }
    )
    
    
    return step_model_selection, step_cv_train_hpo, sklearn_estimator, evaluation_report



## (step 3) Model Selection Step
Model selection is the final step in cross validation model training workflow. Based on the metrics and hyperparameters acquired from the cross validation steps orchestrated through ScriptProcessor, 
a Training Step is defined to train a model with the same algorithm used in cross validation training, with all available training data. The model artifacts created from the training process will be used 
for model registration, deployment and inferences. 

Components involved in the model selection step:
    
* SKLearn Estimator - A Sagemaker Estimator used in training a final model.
* TrainingStep - Workflow step that triggers the model selection process.


## (step 4) Register Model With Model Registry
Once the model selection step is complete, the trained model artifact can be registered with Sagemaker Model Registry.
Model registry catalogs the trained model to enable model versioning, performance metrics and approval status captures. Additionally, models versioned in the ModelRegistry can be deployed through CI/CD. Here's a link for more information about Model Registry, https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html

Components involved in registering a trained model with Model Registry:
* Model - Model object that contains metadata for the trained model. 
* CreateModelInput - An object that encapsulates the parameters used to create a Sagemaker Model.
* CreateModelStep - Workflow Step that creates a Sagemaker Model
* ModelMetrics - Captures metadata, including metrics statistics, data constraints, bias and explainability for the trained model.
* RegisterModel - Workflow Step that registers model Model Registry.

In [8]:
def model_register_tasks(s3_buckets, evaluation_report, sklearn_estimator, step_model_selection, step_cv_train_hpo,
                              model_approval_status, baseline_model_objective_value, sagemaker_session, model_package_group_name, network_config,
                              env_data, revision):
          
    model = Model(
        image_uri=sklearn_estimator.image_uri,
        model_data=step_model_selection.properties.ModelArtifacts.S3ModelArtifacts,
        sagemaker_session=sagemaker_session,
        role=env_data['TrainingRole'],
    )

    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri="{}/evaluation.json".format(
                step_cv_train_hpo.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
            ),
            content_type="application/json",
        )
    )

    step_register_model = RegisterModel(
        name="RegisterModelStep",
        estimator=sklearn_estimator,
        model_data=step_model_selection.properties.ModelArtifacts.S3ModelArtifacts,
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
        transform_instances=["ml.m5.xlarge"],
        model_package_group_name=model_package_group_name,
        approval_status=model_approval_status,
        model_metrics=model_metrics,
    )
    # Condition Step
    cond_gte = ConditionGreaterThanOrEqualTo(
        left=JsonGet(
            step=step_cv_train_hpo,
            property_file=evaluation_report,
            json_path="multiclass_classification_metrics.accuracy.value",
        ),
        right=baseline_model_objective_value,
    )

    step_cond = ConditionStep(
        name="ModelEvaluationStep",
        conditions=[cond_gte],
        if_steps=[step_model_selection, step_register_model],
        else_steps=[],
    )


    return step_cond


## (step 5) Condition Step
Sagemaker Pipelines supports condition steps for evaluating the conditions of step properties to determine the next action.
In the context of cross validation model workflow, a condition step is defined to evaluate model metrics captured in the Cross Validation Training Step to determine whether 
the model selection step should take place. This step evaluates a ConditionGreaterThanOrEqualTo based on a given baseline model objective value to determine the next steps.

Components involved in defining a Condition Step:

ConditionGreaterThanOrEqualTo - A condition that defines the evaluation criteria for the given model objective value and model performance metrics captured in the evaluation report. This condition returns True if the model performance metrics is greater or equals to the baseline model objective value, False otherwise.
ConditionStep - Workflow Step that performs the evaluation based on the criteria defined in ConditionGreaterThanOrEqualTo

## Define A Pipeline (standard_model_pipeline)
With Pipeline components defined, we can create Sagemaker Pipeline by associating the Parameters, Steps and Conditions created in this notebook.
The pipeline definition encodes a pipeline using a directed acyclic graph (DAG) with relationships between each step of the pipeline. 
The structure of a pipeline's DAG is determined by either data dependencies between steps, or custom dependencies defined in the Steps.
For CrossValidation training pipline, relationships between the components in the DAG are specified in the depends_on attribute of the Steps.

A pipeline instance is composed of a <b>name, parameters, and steps </b>.

In [9]:
def standard_model_pipeline(base_job_prefix, default_bucket, env_data, model_package_group_name, pipeline_name,
                            region, sagemaker_session, base_dir, project="standard_model",
                            revision="none", purpose="p1033"):

    # parameters for pipeline execution
    # processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
    processing_instance_count = "1"
    # processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge") 
    processing_instance_type = "ml.m5.xlarge"
    # training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
    training_instance_type = "ml.m5.xlarge"
    training_instance_count = "1"
    # hpo_tuner_instance_type = ParameterString(name="HPOTunerScriptInstanceType", default_value="ml.t3.medium")
    hpo_tuner_instance_type = "ml.t3.medium"
    # model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval") 
    model_approval_status = "PendingManualApproval"
    role = "arn:aws:iam::370702650160:role/sm-mlops-env-EnvironmentI-SageMakerPipelineExecuti-1AWTL5A5UKOHN"
    
    print(f"\n\nenv_data={env_data}\n\n")
    print(f"env_data['DataBucketName'] = {env_data['DataBucketName']}")
    print(f"env_data['ModelBucketName'] = {env_data['ModelBucketName']}")

    default_bucket_data = ParameterString(name="DefaultS3BucketData", default_value=env_data['DataBucketName'])
    default_bucket_models = ParameterString(name="DefaultS3BucketModels", default_value=env_data['ModelBucketName'])

    # baseline_model_objective_value = ParameterFloat(name='BaselineModelObjectiveValue', default_value=0.6)

    image_uri = "813736554012.dkr.ecr.eu-north-1.amazonaws.com/engineering-custom-images:crossvalidation"
    framework_version = "0.23-1"
    baseline_model_objective_value = 0.6
    
    # database = ParameterString(name="DataBase", default_value="ml-test-datasets_rl")  
    # table = ParameterString(name="AbaloneTable", default_value="ml_master") 
    # filter = ParameterString(name="FilterRings", default_value="disabled")
    time_path = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
    trigger_id = ParameterString(name="TriggerID", default_value="0000000000") 
    nowgmt = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
    print(f"nowgmt={nowgmt}")
    execution_time = ParameterString(name="ExecutionTime", default_value=nowgmt)
    framework_version = "0.23-1"

    network_config = NetworkConfig(
        enable_network_isolation=False,
        security_group_ids=env_data["SecurityGroups"],
        subnets=env_data["SubnetIds"],
        encrypt_inter_container_traffic=True)

    model_name = "svm_model_for_iris_data"
    data_base_path = "s3://{}/lifecycle/60d/{}/{}/{}/{}/output/training".format(env_data["DataBucketName"], project, revision, time_path, purpose)
    s3_buckets = {'default_bucket_data': default_bucket_data,
                  'default_bucket_models': default_bucket_models,
                  'bucket_prefix_data': data_base_path,
                  'bucket_prefix_models': "s3://{}/lifecycle/max/{}/{}/{}/{}/output/training".format(env_data["ModelBucketName"], project, revision, model_name, time_path),
                  'evaluation_path': "s3://{}/lifecycle/max/{}/{}/{}/{}/output/evaluation".format(env_data["ModelBucketName"], project, revision, model_name, time_path),
                  's3_bucket_base_path': data_base_path,
                  's3_bucket_base_path_train': f"{data_base_path}/train", 
                  's3_bucket_base_path_test': f"{data_base_path}/test",
                  's3_bucket_base_path_evaluation': f"{data_base_path}/evaluation",
                  's3_bucket_base_path_jobinfo': f"{data_base_path}/jobinfo",
                  's3_bucket_base_path_output': f"{data_base_path}/output",
                  'code_path': f"s3://{env_data['DataBucketName']}/lifecycle/max/{project}/{revision}/input/source_scripts/preprocessing",
                 }
    print(f"s3_buckets={s3_buckets}")
    
    step_process = preprocessing(
                            s3_buckets,
                            base_job_prefix=base_job_prefix,
                            env_data=env_data,
                            network_config=network_config,
                            processing_instance_count=processing_instance_count,
                            processing_instance_type=processing_instance_type,
                            sagemaker_session=sagemaker_session,
                            # source_scripts_path=source_scripts_path,
                            snapshot_path="{}/data-snapshot/".format(s3_buckets['bucket_prefix_data']),
                            training_path="{}/train".format(s3_buckets['bucket_prefix_data']),
                            test_path="{}/test".format(s3_buckets['bucket_prefix_data']),
                            # database=database,
                            # table=table,
                            # filter=filter,
                            execution_time=execution_time,
                            framework_version=framework_version,
                        )
    
    step_model_selection, step_cv_train_hpo, sklearn_estimator, evaluation_report = training_tasks(s3_buckets, base_job_prefix=base_job_prefix,
                                                    env_data=env_data,
                                                    image_uri=image_uri,
                                                    network_config=network_config,
                                                    sagemaker_session=sagemaker_session,
                                                    training_instance_type=training_instance_type,
                                                    training_instance_count=training_instance_count,
                                                    hpo_tuner_instance_type=hpo_tuner_instance_type,
                                                    region=region,
                                                    framework_version=framework_version,
                                                    # source_scripts_path=source_scripts_path,
                                                    )

    step_cond = model_register_tasks(s3_buckets,
                                          evaluation_report,
                                          sklearn_estimator,
                                          step_model_selection,
                                          step_cv_train_hpo,
                                          model_approval_status,
                                          baseline_model_objective_value,
                                          sagemaker_session,
                                          model_package_group_name,
                                          network_config,
                                          env_data,
                                          revision)

    print(f"ExecutionVariables.PIPELINE_NAME = {ExecutionVariables.PIPELINE_NAME.expr}")
    print(f"ExecutionVariables.PIPELINE_EXECUTION_ID = {ExecutionVariables.PIPELINE_EXECUTION_ID.expr}")

    pipeline_name = "CrossValidationTrainingPipeline"
    pipeline = Pipeline(
        name=pipeline_name,
        # parameters=[
        #     # processing_instance_count,
        #     # processing_instance_type,
        #     # training_instance_type,
        #     # inference_instance_type,
        #     # hpo_tuner_instance_type,
        #     # model_approval_status,
        #     # role,
        #     # default_bucket_data,
        #     # baseline_model_objective_value,
        #     # image_uri,
        #     execution_time,
        #     # database,
        #     # table,
        #     # filter,
        # ],    
        pipeline_experiment_config=PipelineExperimentConfig(
          ExecutionVariables.PIPELINE_NAME,
          ExecutionVariables.PIPELINE_EXECUTION_ID),
         steps=[step_process, step_cv_train_hpo, step_cond],
    )
    return pipeline



## Examine Pipeline Definition
Before triggering a pipeline run, it's a good practice to examine the JSON pipeline definition to ensure that it's well-formed.

In [10]:
pipeline_cvu=get_pipeline(
        region='eu-north-1',
        project_name='customerone-inf',
        # source_scripts_path="/root/sagemaker-user/sagemaker-cross-validation-pipeline/code",
        model_package_group_name="cross_validation_example",
        pipeline_name="cross_validation_example_pipeline",
        base_job_prefix="cross_validation",
        revision="no-revision-provided",)



env_data={'DomainArn': 'arn:aws:sagemaker:eu-north-1:370702650160:domain/d-tdizim9qnor9', 'DomainId': 'd-tdizim9qnor9', 'DomainName': 'mlops-dev-eu-north-1-sagemaker-domain', 'HomeEfsFileSystemId': 'fs-03fc3d37f8623fea2', 'Status': 'InService', 'AuthMode': 'IAM', 'AppNetworkAccessType': 'VpcOnly', 'SubnetIds': ['subnet-0724be5e7071e7070', 'subnet-01def51ffe7467c71'], 'Url': 'https://d-tdizim9qnor9.studio.eu-north-1.sagemaker.aws', 'VpcId': 'vpc-0459a28f3637e285c', 'KmsKeyId': 'f4664542-0f2e-42ca-b51f-2bec0ad62278', 'ExecutionRole': 'arn:aws:iam::370702650160:role/sm-mlops-env-EnvironmentIAM-SageMakerExecutionRole-14AU65MVMBUGO', 'SecurityGroups': ['sg-041054ee4500f96f6'], 'JupyterServerAppSettings': {'DefaultResourceSpec': {'SageMakerImageArn': 'arn:aws:sagemaker:eu-north-1:243637512696:image/jupyter-server-3', 'InstanceType': 'system', 'LifecycleConfigArn': 'arn:aws:sagemaker:eu-north-1:370702650160:studio-lifecycle-config/install-autoshutdown-server-extension'}, 'LifecycleConfigArn

The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


ExecutionVariables.PIPELINE_NAME = {'Get': 'Execution.PipelineName'}
ExecutionVariables.PIPELINE_EXECUTION_ID = {'Get': 'Execution.PipelineExecutionId'}


In [11]:
import json
# json.loads(pipeline.definition())
parsed = json.loads(pipeline_cvu.definition())
print(json.dumps(parsed, indent=2, sort_keys=True))

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{
  "Metadata": {},
  "Parameters": [],
  "PipelineExperimentConfig": {
    "ExperimentName": {
      "Get": "Execution.PipelineName"
    },
    "TrialName": {
      "Get": "Execution.PipelineExecutionId"
    }
  },
  "Steps": [
    {
      "Arguments": {
        "AppSpecification": {
          "ContainerEntrypoint": [
            "python3",
            "/opt/ml/processing/input/code/preprocessing.py"
          ],
          "ImageUri": "662702820516.dkr.ecr.eu-north-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3"
        },
        "NetworkConfig": {
          "EnableInterContainerTrafficEncryption": true,
          "EnableNetworkIsolation": false,
          "VpcConfig": {
            "SecurityGroupIds": [
              "sg-041054ee4500f96f6"
            ],
            "Subnets": [
              "subnet-0724be5e7071e7070",
              "subnet-01def51ffe7467c71"
            ]
          }
        },
        "ProcessingInputs": [
          {
            "AppManaged": false,
     

# Pipeline Creation
Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist, or update the pipeline if it does. The role passed in is used by SageMaker Pipelines to create all of the jobs defined in the steps.

In [12]:
# pipeline.upsert(role_arn=role)
upsert_response = pipeline_cvu.upsert(
            role_arn="arn:aws:iam::370702650160:role/sm-mlops-env-EnvironmentI-SageMakerPipelineExecuti-1AWTL5A5UKOHN"
        )

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


ClientError: An error occurred (ValidationException) when calling the UpdatePipeline operation: Unable to parse pipeline definition. Property 'null' with value 'null' is not of expected type 'String'

# Trigger Pipeline Execution
After creating a pipeline definition, you can submit it to SageMaker to start your execution, optionally provides the parameters specific for the run.

In [None]:
execution = pipeline_cvu.start()

In [None]:
# image_uri = "370702650160.dkr.ecr.eu-north-1.amazonaws.com/sagemaker-cross-validation-pipeline"

In [None]:
# image_uri = ParameterString(name="ImageURI", default_value="370702650160.dkr.ecr.eu-north-1.amazonaws.com/sagemaker-cross-validation-pipeline")

In [None]:
# image_uri.to_string()

In [None]:
# # Before triggering the pipeline, make sure to override the ImageURI parameter value with 
# # one created the previous step.
# execution = pipeline.start(
#     parameters=dict(
#         BaselineModelObjectiveValue=0.8,
#         MinimumC=0,
#         MaximumC=1,
#         MaxTrainingJobs=3,
# #         ImageURI="370702650160.dkr.ecr.eu-north-1.amazonaws.com/sagemaker-cross-validation-pipeline"
#     ))

# Examine a Pipeline Execution
Examine the pipeline execution at runtime by using sagemaker SDK

In [None]:
# Added by Sarah
execution.list_steps()


In [None]:
execution.describe()

## Wait For The Pipeline Execution To Complete 
Pipeline execution supports waiting for the job to complete synchrounously

In [None]:
execution.wait()