# Importing Libraries and Configurations

https://github.com/aws/amazon-sagemaker-examples/issues/1207


https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost-tuning.html

In [2]:
import os
import glob
import json
import boto3
import sagemaker


# Taking pipeline building configurations from config.json.
# These are only for building and will not be available at 
# the runtime of the pipeline.
with open("config.json") as file:
    build_parameters = json.load(file)

# Setting Default Bucket and getting region and role

In [3]:

## Setting default bucket
# Method 1
from sagemaker.workflow.pipeline_context import PipelineSession
pipeline_session = PipelineSession(default_bucket = build_parameters["output_bucket"])
sagemaker_session = sagemaker.Session(default_bucket = build_parameters["output_bucket"])

# Method 2
# pipeline_session.default_bucket = build_parameters["output_bucket"]
# sagemaker_session.default_bucket = build_parameters["output_bucket"]

# Method 3
# sagemaker_session = sagemaker.Session()
# bucket = "sagemaker-output-bucket-us-east1"   

print(sagemaker_session.default_bucket)
print(pipeline_session.default_bucket)


## Getting region and role
region = boto3.Session().region_name
role = sagemaker.get_execution_role()

print(role)


<bound method Session.default_bucket of <sagemaker.session.Session object at 0x7fc1ab1a9550>>
<bound method Session.default_bucket of <sagemaker.workflow.pipeline_context.PipelineSession object at 0x7fc1ab1a9510>>
arn:aws:iam::720541911643:role/service-role/AmazonSageMaker-ExecutionRole-20230606T110107


# Input Data Location Parameters

In [4]:

# Default location for the datasets
train_data_uri = build_parameters["train_data"]
test_data_uri = build_parameters["test_data"]
evaluation_data_uri = build_parameters["evaluation_data"]
feature_selection_file_uri = build_parameters["feature_selection"]


# Parametrizing Data paths
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
train_data = ParameterString(name="TrainData", default_value = train_data_uri)
test_data = ParameterString(name="TestData", default_value = test_data_uri)
evaluation_data = ParameterString(name="EvaluationData", default_value = evaluation_data_uri)
feature_selection_file = ParameterString(name = "FeatureSelectionFile", default_value = feature_selection_file_uri)


#### Handling Output Locations
See this link to learn more about pipeline execution variables: https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.execution_variables.ExecutionVariables
pipeline_start_time is a execution vairable, so to create processig_output_path and inference_output_path we had to use sagemaker.workflow.functions.Join and we could not use Python f-strings.

In [5]:

pipeline_output_bucket = build_parameters["output_bucket"]

pipeline_start_time = sagemaker.workflow.execution_variables.ExecutionVariables.START_DATETIME

from sagemaker.workflow import functions
train_processing_output_path = functions.Join(on='/', values=["s3:/", pipeline_output_bucket, "Training_Pipeline_Output", pipeline_start_time, "TrainProcessingOutput"])
validation_processing_output_path = functions.Join(on='/', values=["s3:/", pipeline_output_bucket, "Training_Pipeline_Output", pipeline_start_time, "ValidationProcessingOutput"])


# Step 1: Preprocessing Training Data

#### 1.1 Loading preprocessing config.json file.

In [6]:
local_preprocessing_path = os.path.join("Pipeline_Component_Codes","Training","1_Preprocessing")
with open(os.path.join(local_preprocessing_path, "config.json")) as file:
    processing_build_parameters = json.load(file)

#### 1.2 Making parameter for processing machine type

In [7]:
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value=processing_build_parameters["machine_type"]
)


#### 1.3 Building the processor

In [8]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import Processor

if processing_build_parameters["processing_type"] == "sklearn":
    processor = SKLearnProcessor(
        framework_version = processing_build_parameters["framework_version"],
        instance_type = processing_build_parameters["machine_type"],
        instance_count = processing_build_parameters["machine_count"],
        base_job_name = f"{build_parameters['usecase']}-preprocessing",
        role=role
    )
elif processing_build_parameters["processing_type"] == "custom":
    processor = Processor(
        image_uri = processing_build_parameters["image_uri"],
        instance_type = processing_build_parameters["machine_type"],
        instance_count = processing_build_parameters["machine_count"],
        base_job_name = f"{build_parameters['usecase']}-preprocessing",
        role=role
    )

#### 1.4 Building preprocessing step

In [9]:

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name = "preprocessing_full_data",
    description = "Data preprocessing and splitting into train and test set",
    processor=processor,
    inputs=[
        ProcessingInput(source = train_data, destination="/opt/ml/processing/input/data"),  
        ProcessingInput(source=feature_selection_file, destination="/opt/ml/processing/input/feature_selection")
    ],
    outputs=[
        # Train
        ProcessingOutput(output_name = "train", source="/opt/ml/processing/train", 
                         destination = train_processing_output_path
                        ),
        # Test
        ProcessingOutput(output_name = "test", source="/opt/ml/processing/test", 
                         destination = train_processing_output_path
                        ),
        # Logs
        ProcessingOutput(output_name = "logs", source="/opt/ml/processing/logss", 
                         destination = train_processing_output_path
                        ),
    ],
    code=os.path.join(local_preprocessing_path, processing_build_parameters["entry_point"]),
    job_arguments = ["--train_data_location", "/opt/ml/processing/input/data", 
                     "--feature_selection_file_location", "/opt/ml/processing/input/feature_selection/Feature_Selection.csv", 
                     "--target_column", "Churn",
                     "--preprocessed_train_data_location", "/opt/ml/processing/train", 
                     "--preprocessed_test_data_location", "/opt/ml/processing/test", 
                     "--log_location", "/opt/ml/processing/logss"
                    ]
)


# Step 2: Processing Evaluation Data

#### 2.1 Loading preprocessing config.json file.

In [10]:
local_val_processing_path = os.path.join("Pipeline_Component_Codes","Training","11_Validation_Processing")
with open(os.path.join(local_val_processing_path, "config.json")) as file:
    val_processing_build_parameters = json.load(file)

#### 2.2 Making parameter for processing machine type

In [11]:
val_processing_instance_type = ParameterString(
    name="ValProcessingInstanceType",
    default_value=val_processing_build_parameters["machine_type"]
)


#### 2.3 Building the processor

In [12]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import Processor

if val_processing_build_parameters["processing_type"] == "sklearn":
    processor = SKLearnProcessor(
        framework_version = val_processing_build_parameters["framework_version"],
        instance_type = val_processing_build_parameters["machine_type"],
        instance_count = val_processing_build_parameters["machine_count"],
        base_job_name = f"{build_parameters['usecase']}-preprocessing",
        role=role
    )
elif processing_build_parameters["processing_type"] == "custom":
    processor = Processor(
        image_uri = val_processing_build_parameters["image_uri"],
        instance_type = val_processing_build_parameters["machine_type"],
        instance_count = val_processing_build_parameters["machine_count"],
        base_job_name = f"{build_parameters['usecase']}-preprocessing",
        role=role
    )

#### 2.4 Building preprocessing step

In [13]:


step_val_process = ProcessingStep(
    name = "preprocessing_validation_data",
    description = "Validation data preprocessing.",
    processor=processor,
    inputs=[
        ProcessingInput(source = evaluation_data, destination="/opt/ml/processing/input/data"),  
        ProcessingInput(source = feature_selection_file, destination="/opt/ml/processing/input/feature_selection")
    ],
    outputs=[
        # Evaluation data
        ProcessingOutput(output_name = "evaluation", source="/opt/ml/processing/test", 
                         destination = validation_processing_output_path
                        ),
        # Logs
        ProcessingOutput(output_name = "logs", source="/opt/ml/processing/logss", 
                         destination = validation_processing_output_path
                        ),
    ],
    code=os.path.join(local_val_processing_path, val_processing_build_parameters["entry_point"])
)


# Step 3: Hyperparameter Tuning

#### 3.1 Getting models from Local Path

In [14]:

local_models_path = os.path.join("Pipeline_Component_Codes", "Training", "2_Models_HPTune")
models = []
for directory in os.listdir(local_models_path):
    if '.' not in directory: # Avoiding .ipynb_checkpoints
        models.append(directory)

models = models[1:]
print(models)

['Logistic_Regression', 'Decision_Tree']


#### 3.2 Reading config.json Files

In [15]:
model_details = {}
for model in models:
    with open(os.path.join(local_models_path, model, "config.json")) as file:
        model_build_parameters = json.load(file)
    model_details[model] = model_build_parameters

# print(model_details)

#### 3.2 Input Parameters

In [16]:
# Machine types
training_instances = []
for model in models:
    training_instance_type = ParameterString(
        name=f"{model}_InstanceType",
        default_value="ml.m5.xlarge"
    )
    training_instances.append(training_instance_type)

# Objective metric
# objective_metric_name = ParameterString(name = "ObjectiveMetric", default_value = build_parameters["objective_metric"])
# metric_definitions = [{"Name": objective_metric_name, "Regex": "accuracy:([0-9\\.]+)"}]

objective_metric_name = "validation:accuracy"
metric_definitions = [{'Name': "validation:accuracy", 'Regex': "accuracy:([0-9\\.]+)"}]

#### 3.3 Creating the Estimators on which Tuning Will Happen

Sagemaker provides us docker conatiners for all the popular algorithms like they have scikit learn image for all the Scikit learn models, they have XGBoost image and they also have deep learning images as well. Here for the demo purpose we have used only three models, Logistic Regression, Decision Tree and XGBoost. So our need was not to build our own image. If any other Python library is needed we can mention those in **requirements.txt** file. We do not have to do anything more, pipeline will automatically install those in respective containers. 

In [17]:
from sagemaker.sklearn import SKLearn
from sagemaker.xgboost.estimator import XGBoost

from sagemaker.tuner import ContinuousParameter, IntegerParameter, CategoricalParameter, HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TuningStep

from sagemaker.workflow import functions



tuning_steps = []
index = -1
for model in models:
    index = index + 1
    current_model_details = model_details[model]
    
    output_path = functions.Join(on='/', values=["s3:/", pipeline_output_bucket, "Training_Pipeline_Output", pipeline_start_time, "HPTuningOutputs", model])
    
    # Creating the Estimators on which Tuning Will Happen
    if current_model_details['model_type'] == 'sklearn_model':
        estimator = SKLearn(
                            source_dir = os.path.join(local_models_path, model),
                            entry_point = current_model_details["entry_point"], 
                            instance_type = training_instances[index], 
                            instance_count = 1,
                            framework_version = current_model_details["framework_version"], 
                            role = role,
                            output_path = output_path
                            )
    elif current_model_details['model_type'] == 'xgboost_model':
        estimator = XGBoost(
                            source_dir = os.path.join(local_models_path, model),
                            entry_point = current_model_details["entry_point"],
                            instance_type = training_instances[index],
                            instance_count = 1,
                            framework_version = current_model_details["framework_version"],
                            role=role,
                            output_path = output_path
                            )
    
    
    # Getting the hyperparameters
    hyperparameters = current_model_details["hyperparameters"]
    
    hyperparameter_ranges = {}
    for hyperparameter in hyperparameters:
        if hyperparameters[hyperparameter]["type"] == "continuous":
            hyperparameter_ranges[hyperparameter] = ContinuousParameter(min_value = hyperparameters[hyperparameter]["min_value"],
                                                                        max_value = hyperparameters[hyperparameter]["max_value"],
                                                                        scaling_type = hyperparameters[hyperparameter]["scaling_type"])
        elif hyperparameters[hyperparameter]["type"] == "categorical":
            hyperparameter_ranges[hyperparameter] = CategoricalParameter(hyperparameters[hyperparameter]["values"])
        elif hyperparameters[hyperparameter]["type"] == "integer":
            hyperparameter_ranges[hyperparameter] = IntegerParameter(min_value = hyperparameters[hyperparameter]["min_value"],
                                                                     max_value = hyperparameters[hyperparameter]["max_value"])
    
    
    
    # Making the hyperparameter tuner
    tuner = HyperparameterTuner(
        estimator = estimator,
        objective_metric_name = objective_metric_name,
        hyperparameter_ranges = hyperparameter_ranges,
        metric_definitions = metric_definitions,
        max_jobs=1,
        max_parallel_jobs=1,
        strategy = current_model_details["tuning_strategy"],
        base_tuning_job_name = current_model_details["model_name"]
        )
    
    
    # Building the tuning step
    step_tuning = TuningStep(
        name = f"hptuning_{current_model_details['model_name']}",
        tuner = tuner,
        inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
                content_type="text/csv"
            ),
            "test": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
                content_type="text/csv",
            ),
        }
    )
    
    tuning_steps.append(step_tuning)
        

instance_type is a PipelineVariable (<class 'sagemaker.workflow.parameters.ParameterString'>). Its interpreted value in execution time should not be of GPU types since GPU training is not supported for Scikit-Learn.
The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.
instance_type is a PipelineVariable (<class 'sagemaker.workflow.parameters.ParameterString'>). Its interpreted value in execution time should not be of GPU types since GPU training is not supported for Scikit-Learn.
The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure 

# Step: 4: Getting the best model from each hyperparameter tuning job

Follow this link to get example of how to write lambda step https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/lambda-step/sagemaker-pipelines-lambda-step.ipynb

#### 4.1 Building the Lambda Function

In [18]:
from sagemaker.lambda_helper import Lambda


func = Lambda(
    function_name = "get_best_model_from_hptune_job",
    execution_role_arn=build_parameters["role_given_to_lambda"],
#     execution_role_arn = role,
#     execution_role_arn = lambda_role,
    script = os.path.join("Pipeline_Component_Codes", "Training", "3_HPTune_Best_Model", "main.py"),
    handler="main.main",
)


#### 4.2 Building the Lambdastep
See this link to get the idea on how tuning step name is being fetched (tuning_steps[i].properties.HyperParameterTuningJobName): https://boto3.amazonaws.com/v1/documentation/api/1.9.46/reference/services/sagemaker.html#SageMaker.Client.describe_hyper_parameter_tuning_job 
In the same way other properties can also be accessed.

In [19]:
from sagemaker.workflow.lambda_step import LambdaOutput, LambdaStep, LambdaOutputTypeEnum

best_model_steps = []
for i in range(len(models)):
    # Building the outputs
    output_param_1 = LambdaOutput(output_name=f"best_model_location", output_type=LambdaOutputTypeEnum.String)
    output_param_2 = LambdaOutput(output_name=f"best_metric_value", output_type=LambdaOutputTypeEnum.String)
    output_param_3 = LambdaOutput(output_name=f"best_model", output_type=LambdaOutputTypeEnum.String)
    output_param_4 = LambdaOutput(output_name=f"best_training_job_container", output_type=LambdaOutputTypeEnum.String)
    output_param_5 = LambdaOutput(output_name=f"best_instance_type", output_type=LambdaOutputTypeEnum.String)
    output_param_6 = LambdaOutput(output_name=f"best_metrics_path", output_type=LambdaOutputTypeEnum.String)
    
    # Building the Lambdastep
    step_deploy_lambda = LambdaStep(
        name=f"get_best_{models[i]}_model",
        lambda_func=func,
        inputs={
            "tuning_job_name": tuning_steps[i].properties.HyperParameterTuningJobName
        },
#         outputs=[output_param_1, output_param_2],
        outputs=[output_param_1, output_param_2, output_param_3, output_param_4, output_param_5, output_param_6]
    )
    
    best_model_steps.append(step_deploy_lambda)

# Step 5: Evaluating the best models from each hyperparameter tuning job

#### Building Property Files

In [20]:
from sagemaker.workflow.properties import PropertyFile

evaluation_reports = []

for i in range(len(models)):
    evaluation_report = PropertyFile(
        name = f"evaluating_best_{models[i]}_model",
        output_name = "evaluation",
        path = "evaluation.json"
    )
    
    evaluation_reports.append(evaluation_report)

In [21]:

    

evaluation_steps = []
for i in range(len(models)):
    
    # Building evaluator
    current_model_details = model_details[models[i]]
    print(current_model_details['model_type'])
    
    if current_model_details['model_type'] == 'sklearn_model':
        evaluator = SKLearnProcessor(
        framework_version = current_model_details["framework_version"],
        instance_type = training_instances[i],
        instance_count = 1,
        role=role
        )
    elif current_model_details['model_type'] != 'sklearn_model':
        # Logic is not complete
        evaluator = Processor(
            image_uri = processing_build_parameters["image_uri"],
            instance_type = processing_build_parameters["machine_tyLLpe"],
            instance_count = processing_build_parameters["machine_count"],
            base_job_name = f"{build_parameters['usecase']}-preprocessing",
            role=role
        )
    
    output_path = functions.Join(on='/', values=["s3:/", pipeline_output_bucket, "Training_Pipeline_Output", pipeline_start_time, "EvaluationOutputs", models[i]])
    
    # Building the evaluation step
    step_evaluation = ProcessingStep(
        name = f"evaluating_best_{models[i]}_model",
        description = f"Evaluating best {models[i]} model.",
        processor = evaluator,
        inputs=[
            # Inputs will be 1. evaluation data and 2. model to be evaluated.
            ProcessingInput(source = step_val_process.properties.ProcessingOutputConfig.Outputs["evaluation"].S3Output.S3Uri, destination="/opt/ml/processing/input/data"), # 1
            ProcessingInput(source = best_model_steps[i].properties.Outputs["best_model_location"], destination="/opt/ml/processing/input/model") # 2
        ],
        outputs=[
            # Three output files will be generated.
            # 1. A file containing all the model evaluation metrics that will be shown in the Model Registry.
            ProcessingOutput(output_name="metrics",
                             source="/opt/ml/processing/metrics",
                             destination = output_path
                            ),
            # 2. A file containing evaluation outputs that will be sent to.
            ProcessingOutput(output_name="evaluation",
                             source="/opt/ml/processing/test",
                             destination = output_path
                            ),
            # 2. Logs
            ProcessingOutput(output_name = "logs", source="/opt/ml/processing/logss", 
                             destination = output_path
                            )
        ],
        code = os.path.join("Pipeline_Component_Codes", "Training", "4_Model_Evaluation", "main.py"),
        property_files=[evaluation_reports[i]],
        job_arguments = [
            "--objective_metric", build_parameters["objective_metric"],  # This can be accuracy, precision etc. Models will be evaluated based on this metric.
            "--model_name", models[i],  # This argument will not be used in the code, this will be written in the output 1 file as it is.
            "--best_model_location", best_model_steps[i].properties.Outputs["best_model_location"],  # This argument will not be used in the code, this will be written in the output 1 file as it is.
            "--best_training_job_container", best_model_steps[i].properties.Outputs["best_training_job_container"],  # This argument will not be used in the code, this will be written in the output 1 file as it is.
            "--best_instance_type", best_model_steps[i].properties.Outputs["best_instance_type"]  # This argument will not be used in the code, this will be written in the output 1 file as it is.
            ]
    )
    
    evaluation_steps.append(step_evaluation)


sklearn_model


The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


sklearn_model


The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


# Step 6: Getting the Final Model

#### Building the Lambda Function
We could have develop another Lambda function to accomplish this job, but we have written the logic of this task in the previous lambda function (where we are getting the final model from each hyperparameter tuning job), so that we have to maintain only one Lambda function.

In [22]:

# get_final_model_func = Lambda(
#     function_name = "get_final_model",
#     execution_role_arn=build_parameters["role_given_to_lambda"],
# #     execution_role_arn = role,
# #     execution_role_arn = lambda_role,
#     script = os.path.join("Pipeline_Component_Codes", "Training", "3_HPTune_Best_Model", "main.py"),
#     handler="main.main",
# )


#### Building the Lambdastep

In [22]:

from sagemaker.workflow.functions import JsonGet

# output_param_1 = LambdaOutput(output_name=f"final_model_location", output_type=LambdaOutputTypeEnum.String)
# output_param_2 = LambdaOutput(output_name=f"final_metric_value", output_type=LambdaOutputTypeEnum.String)
# output_param_3 = LambdaOutput(output_name=f"final_model", output_type=LambdaOutputTypeEnum.String)

output_param_1 = LambdaOutput(output_name=f"best_model_location", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name=f"best_metric_value", output_type=LambdaOutputTypeEnum.String)
output_param_3 = LambdaOutput(output_name=f"best_model", output_type=LambdaOutputTypeEnum.String)
output_param_4 = LambdaOutput(output_name=f"best_training_job_container", output_type=LambdaOutputTypeEnum.String)
output_param_5 = LambdaOutput(output_name=f"best_instance_type", output_type=LambdaOutputTypeEnum.String)
output_param_6 = LambdaOutput(output_name=f"best_metrics_path", output_type=LambdaOutputTypeEnum.String)


lambda_inputs = {'n':len(models)}
for i in range(len(models)):
    best_model_location = JsonGet(step_name = evaluation_steps[i].name, property_file = evaluation_reports[i], json_path="best_model_location")
    metrics = JsonGet(step_name = evaluation_steps[i].name, property_file = evaluation_reports[i], json_path="best_metric_value")
    model = JsonGet(step_name = evaluation_steps[i].name, property_file = evaluation_reports[i], json_path="best_model")
    
    best_training_job_container = JsonGet(step_name = evaluation_steps[i].name, property_file = evaluation_reports[i], json_path="best_training_job_container")
    best_instance_type = JsonGet(step_name = evaluation_steps[i].name, property_file = evaluation_reports[i], json_path="best_instance_type")
    
    
    
    lambda_inputs[f"best_model_location{i}"] = best_model_location
    lambda_inputs[f"metrics{i}"] = metrics
    lambda_inputs[f"model{i}"] = model
    
    lambda_inputs[f"best_training_job_container{i}"] = best_training_job_container
    lambda_inputs[f"best_instance_type{i}"] = best_instance_type
    lambda_inputs[f"metrics_path{i}"] = evaluation_steps[i].properties.ProcessingOutputConfig.Outputs["metrics"].S3Output.S3Uri


    
step_final_model = LambdaStep(
    name=f"get_final_model",
    lambda_func=func,
    inputs = lambda_inputs,
    outputs=[output_param_1, output_param_2, output_param_3, output_param_4, output_param_5, output_param_6]
)



# Step 8: Register best model in SageMaker model registry
Yes, we have to build step 8 before step 7. Step 7 will be a condition step and to define a condition step we have to first define what is the step which will be executed, if the condition in condition step is true and which is the step that will be if the condition is false.

In [23]:
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker import image_uris
from sagemaker.model_metrics import ModelMetrics, MetricsSource, FileSource
from sagemaker.workflow.functions import Join


from sagemaker.estimator import Estimator

estimator = Estimator(
    entry_point = "",
    # instance_type = step_final_model.properties.Outputs["best_instance_type"],
    image_uri = step_final_model.properties.Outputs["best_training_job_container"],
    role = role
)

model_metrics = ModelMetrics(
        model_statistics = MetricsSource(
            # s3_uri = evaluation_steps[i].properties.ProcessingOutputConfig.Outputs["evaluation"].S3Output.S3Uri,
            s3_uri = Join(on="/", values=[step_final_model.properties.Outputs["best_metrics_path"], "metrics.json"]),
            # s3_uri = "s3://demo-output-bucket/Training_Pipeline_Output/2023-08-03T11:33:25.698Z/EvaluationOutputs/Decision_Tree/metrics.json",
            # s3_uri = Join(on="/", values=[step_evaluate_model.arguments["Outputs"][0]["S3Output"]["S3Uri"], "evaluation.json"])
            content_type = "application/json"
        )
)

    
register_best_model_step = RegisterModel(
    name=f"RegisterFinalModel",
    estimator = estimator, 
    # model_data=step_final_model.properties.Outputs["final_model_location"],
    model_data=step_final_model.properties.Outputs["best_model_location"],
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[step_final_model.properties.Outputs["best_instance_type"]],
    transform_instances=[step_final_model.properties.Outputs["best_instance_type"]],
    model_package_group_name = build_parameters["model_package_group_name"],
    # image_uri = step_final_model.properties.Outputs["best_training_job_container"],
    # approval_status="Approved",
    role=role,
    depends_on = [],
    model_metrics = model_metrics
    )

# Building the Pipeline
#### Arranging the steps inside pipeline

In [24]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"{build_parameters['usecase']}-training"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[train_data, test_data, evaluation_data, feature_selection_file, 
                processing_instance_type] + training_instances + [objective_metric_name],
    # steps = [step_process, step_val_process] + tuning_steps + best_model_steps + evaluation_steps + [step_final_model] + condition_steps,
    steps = [step_process, step_val_process] + tuning_steps + best_model_steps + evaluation_steps + [step_final_model] + [register_best_model_step],
    sagemaker_session = pipeline_session
)

#### Uploading the pipeline

In [25]:
pipeline.upsert(role_arn=role)
# execution = pipeline.start()

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'HyperParameterTuningJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


Using provided s3_resource


No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'HyperParameterTuningJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


Using provided s3_resource


Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.
Popping out 'ModelPackageName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ProcessingJobName' from the pip

Using provided s3_resource
Using provided s3_resource


Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.
Popping out 'ModelPackageName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


{'PipelineArn': 'arn:aws:sagemaker:ap-south-1:720541911643:pipeline/churn-training',
 'ResponseMetadata': {'RequestId': 'eade4705-9c7e-4baa-ae78-32e27a1002b5',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'eade4705-9c7e-4baa-ae78-32e27a1002b5',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Tue, 22 Aug 2023 04:02:31 GMT'},
  'RetryAttempts': 0}}

In [19]:
# execution = pipeline.start()