### Notebook to define and run the main pipeline, consisting of sagemaker jobs, to collect and process the data, train and evaluate the model, and register it to the model registry, also tracks train and evaluation parameters and metrics using mlflow experiments

In [24]:
%pip install stepfunctions

import pandas as pd
import json
import boto3
import pathlib
import io
import sagemaker
from time import gmtime, strftime, sleep
from sagemaker.deserializers import CSVDeserializer
from sagemaker.serializers import CSVSerializer
import sagemaker
from sagemaker.experiments.run import Run, load_run
import time
from datetime import datetime
from sagemaker import image_uris
from sagemaker.estimator import Estimator

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.inputs import TrainingInput

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterFloat,
    ParameterString,
    ParameterBoolean,
)
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.fail_step import FailStep

from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import Join, JsonGet

from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
from sagemaker.workflow.function_step import step
from sagemaker.workflow.step_outputs import get_step
from sagemaker.workflow.properties import PropertyFile

from sagemaker.model_metrics import (
    MetricsSource, 
    ModelMetrics, 
    FileSource
)

from sagemaker.workflow.conditions import (
    ConditionGreaterThan,
)

from stepfunctions.steps.sagemaker import EndpointConfigStep, EndpointStep

sagemaker.__version__

Note: you may need to restart the kernel to use updated packages.


'2.219.0'

#### Variables and constants

In [47]:
# names of pipeline objects
project = "index-predictor"
bucket_name = "team1-index-predictor-bucket"
version = "v21"
feature_group_name = "index-predictor-feature-group-v7"
tracking_server_arn = (
    "arn:aws:sagemaker:eu-central-1:567821811420:mlflow-tracking-server/wildfire-mj"
)

current_timestamp = strftime("%d-%H-%M", gmtime())
pipeline_name = f"{project}-pipeline"
pipeline_model_name = f"{project}-model-xgb"
model_package_group_name = f"{project}-model-group-{version}"
endpoint_config_name = f"{project}-endpoint-config"
endpoint_name = f"{project}-endpoint"
model_approval_status = "PendingManualApproval"

# Set instance types and counts
process_instance_type = "ml.m5.large"
process_instance_count = 1
train_instance_count = 1
model_endpoint_instance_count = 1
train_instance_type = "ml.m5.large"
model_endpoint_instance_type = "ml.t2.large"
endpoint_name = f"{project}-endpoint-{version}"

skprocessor_framework_version = "1.0-1"
session = PipelineSession()

# Thresholds
cumulative_return_threshold = -0.02
accuracy_threshold = 0.45

sklearn_image_uri = image_uris.retrieve(
    framework="sklearn", region=session.boto_region_name, version="1.2-1"
)
xgboost_container = image_uris.retrieve(
    "xgboost", session.boto_region_name, version="1.7-1"
)

region = session.boto_region_name

In [48]:
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)
train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount", default_value=train_instance_count
)
process_instance_count_param = ParameterInteger(
    name="ProcessingInstanceCount", default_value=process_instance_count
)
model_approval_status_param = ParameterString(
    name="ModelApprovalStatus", default_value=model_approval_status
)
version_param = ParameterString(name="Version", default_value=version)

#### Data collector

In [49]:
collection_processor = ScriptProcessor(
    command=["python3"],
    image_uri=sklearn_image_uri,
    role=sagemaker.get_execution_role(),
    instance_count=process_instance_count,
    instance_type=process_instance_type,
)

collection_step = ProcessingStep(
    name=f"{project}-collection-{version}",
    processor=collection_processor,
    code="../src/data/collector.py",
    inputs=[],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output/data/raw",
            destination=f"s3://{bucket_name}/data/raw/{version}",
            output_name="raw_data",
        )
    ],
    job_arguments=[
        "--mode",
        "train-val-test",
        "--version",
        version,
    ],
)

#### Data processor

In [50]:
processing_processor = ScriptProcessor(
    command=["python3"],
    image_uri=sklearn_image_uri,
    role=sagemaker.get_execution_role(),
    instance_count=process_instance_count,
    instance_type=process_instance_type,
)

processing_step = ProcessingStep(
    name=f"{project}-processing-{version}",
    processor=processing_processor,
    code="../src/data/processor.py",
    inputs=[
        ProcessingInput(
            source=collection_step.properties.ProcessingOutputConfig.Outputs[
                "raw_data"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/input",
        )
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination=f"s3://{bucket_name}/data/processed/{version}",
            output_name="dataset_sizes",
        )
    ],
    job_arguments=[
        "--mode",
        "feature_store",
        "--raw_data_filename",
        "/opt/ml/processing/input/data.csv",
        "--output_path",
        "/opt/ml/processing/output",
        "--version",
        version,
        "--feature_group_name",
        feature_group_name,
        "--region",
        region,
    ],
)

#### Model training

In [51]:
estimator = Estimator(
    image_uri=xgboost_container,
    role=sagemaker.get_execution_role(),
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=f"s3://{bucket_name}/models/{version}",
    sagemaker_session=sagemaker.Session(),
    entry_point="../src/models/trainer.py",
)

estimator.set_hyperparameters(
    mode="feature_store",
    dataset_sizes_path="/opt/ml/input/data/dataset_sizes/dataset_sizes.json",
    data_version=version,
    target_column="close_target",
    columns_to_drop="write_time,api_invocation_time,is_deleted,datetime,type,version",
    model_output_path="/opt/ml/model",
    num_trials=10,
    feature_group_name=feature_group_name,
    bucket_name=bucket_name,
    region=session.boto_region_name,
    tracking_server_arn=tracking_server_arn,
    experiment_name=f"{project}-training-pipeline",
)

dataset_sizes_input = TrainingInput(
    s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
        "dataset_sizes"
    ].S3Output.S3Uri
)

training_step = TrainingStep(
    name=f"{project}-training-{version}",
    estimator=estimator,
    inputs={"dataset_sizes": dataset_sizes_input},
)

#### Model evaluation

In [52]:
evaluation_processor = ScriptProcessor(
    command=["python3"],
    image_uri=xgboost_container,
    role=sagemaker.get_execution_role(),
    instance_count=process_instance_count,
    instance_type=process_instance_type,
)

evaluation_step = ProcessingStep(
    name=f"{project}-evaluation-{version}",
    processor=evaluation_processor,
    code="../src/models/evaluator.py",
    inputs=[
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs[
                "dataset_sizes"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/input",
        ),
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination=f"s3://{bucket_name}/evaluation_results/{version}",
            output_name="evaluation_result",
        )
    ],
    job_arguments=[
        "--mode",
        "feature_store",
        "--input_path",
        "/opt/ml/processing/input",
        "--dataset_sizes_path",
        "/opt/ml/processing/input/dataset_sizes.json",
        "--data_version",
        version,
        "--target_column",
        "close_target",
        "--columns_to_drop",
        "write_time,api_invocation_time,is_deleted,datetime,type,version",
        "--model_path",
        "/opt/ml/processing/model",
        "--output_path",
        "/opt/ml/processing/output",
        "--feature_group_name",
        feature_group_name,
        "--region",
        region,
        "--bucket_name",
        bucket_name,
        "--tracking_server_arn",
        tracking_server_arn,
        "--experiment_name",
        f"{project}-evaluation-pipeline",
    ],
)

#### Model registration

In [53]:
model = Model(
    image_uri=xgboost_container,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    name=f"{project}-model-{version}",
    sagemaker_session=session,
    role=sagemaker.get_execution_role(),
)

register_step = RegisterModel(
    name=f"{project}-register-{version}",
    model=model,
    content_types=["text/csv"],
    response_types=["application/json"],
    inference_instances=["ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status="PendingManualApproval",
    model_metrics=ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri=evaluation_step.properties.ProcessingOutputConfig.Outputs[
                "evaluation_result"
            ].S3Output.S3Uri,
            content_type="application/json",
        )
    ),
    description=f"XGBoost model for index prediction, trained on pipeline {version}",
)

In [54]:
step_fail = FailStep(
    name=f"{project}-fail-{version}",
    error_message=f"Execution failed due to Cumulative Return below {cumulative_return_threshold} or Accuracy below {accuracy_threshold}",
)

In [55]:
cond_cumulative_return = ConditionGreaterThan(
    left=JsonGet(
        step=evaluation_step.name,
        s3_uri=Join(on='/', values=[
            f"s3://{bucket_name}",
            "evaluation_results",
            version,
            "evaluation_report.json"
        ]),
        json_path="cumulative_return",
    ),
    right=cumulative_return_threshold,
)

cond_accuracy = ConditionGreaterThan(
    left=JsonGet(
        step=evaluation_step.name,
        s3_uri=Join(on='/', values=[
            f"s3://{bucket_name}",
            "evaluation_results",
            version,
            "evaluation_report.json"
        ]),
        json_path="test_accuracy",
    ),
    right=accuracy_threshold,
)

check_model_step = ConditionStep(
    name=f"{project}-check-{version}",
    conditions=[cond_cumulative_return, cond_accuracy],
    if_steps=[register_step],
    else_steps=[step_fail],
    depends_on=[evaluation_step],
)

#### Pipeline definition and execution

In [56]:
pipeline_def_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        process_instance_type_param,
        process_instance_count_param,
        train_instance_type_param,
        train_instance_count_param,
        model_approval_status_param,
        version_param,
    ],
    steps=[
        collection_step,
        processing_step,
        training_step,
        evaluation_step,
        check_model_step,
    ],
    sagemaker_session=session,
    pipeline_definition_config=pipeline_def_config,
)

pipeline.upsert(role_arn=sagemaker.get_execution_role())

{'PipelineArn': 'arn:aws:sagemaker:eu-central-1:567821811420:pipeline/index-predictor-pipeline',
 'ResponseMetadata': {'RequestId': '175e2c54-86a5-4d80-98f5-130db87a821a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '175e2c54-86a5-4d80-98f5-130db87a821a',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '95',
   'date': 'Sat, 22 Jun 2024 17:25:13 GMT'},
  'RetryAttempts': 0}}

In [57]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType=process_instance_type,
        TrainingInstanceType=train_instance_type,
        TrainingInstanceCount=train_instance_count,
        ModelApprovalStatus="PendingManualApproval",
    )
)