In [1]:
import boto3 
import pandas as pd 
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession 

s3_client = boto3.resource('s3') 
pipeline_name = f"sagemaker-mlops-train-pipeline" 
import sagemaker
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name 
role = sagemaker.get_execution_role() 
pipeline_session = PipelineSession() 
default_bucket = sagemaker_session.default_bucket() 
model_package_group_name = f"ChurnModelPackageGroup"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
from sagemaker.workflow.parameters import ( 
 ParameterInteger, 
 ParameterString, 
 ParameterFloat) 

auc_score_threshold = 0.75 
base_job_prefix = "churn-example"
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString( name="ProcessingInstanceType", default_value="ml.m5.xlarge") 
training_instance_type = ParameterString( name="TrainingInstanceType", default_value="ml.m5.xlarge") 
input_data = "storedata_total.csv" 
model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval")

In [3]:
!wget https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-datascientists/main/Section-16-mlops-pipeline/dataset/storedata_total.xlsx

--2025-06-04 12:03:09--  https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-datascientists/main/Section-16-mlops-pipeline/dataset/storedata_total.xlsx
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2713209 (2.6M) [application/octet-stream]
Saving to: ‘storedata_total.xlsx’


2025-06-04 12:03:10 (260 MB/s) - ‘storedata_total.xlsx’ saved [2713209/2713209]



In [5]:
! pip install openpyxl


Collecting openpyxl
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting et-xmlfile (from openpyxl)
  Downloading et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Downloading et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Installing collected packages: et-xmlfile, openpyxl
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2/2[0m [openpyxl]1/2[0m [openpyxl]
[1A[2KSuccessfully installed et-xmlfile-2.0.0 openpyxl-3.1.5


In [6]:
store_data = pd.read_excel("storedata_total.xlsx") 
store_data.to_csv("storedata_total.csv")

  warn(msg)


In [7]:
!wget https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-datascientists/main/Section-16-mlops-pipeline/preprocess-churn.py

--2025-06-04 12:05:10--  https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-datascientists/main/Section-16-mlops-pipeline/preprocess-churn.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.109.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2156 (2.1K) [text/plain]
Saving to: ‘preprocess-churn.py’


2025-06-04 12:05:10 (42.8 MB/s) - ‘preprocess-churn.py’ saved [2156/2156]



In [8]:
!pygmentize "preprocess-churn.py"

[34mimport[39;49;00m[37m [39;49;00m[04m[36mos[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m[37m [39;49;00m[04m[36mtempfile[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m[37m [39;49;00m[04m[36mnumpy[39;49;00m[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00m[04m[36mnp[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m[37m [39;49;00m[04m[36mpandas[39;49;00m[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00m[04m[36mpd[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m[37m [39;49;00m[04m[36mdatetime[39;49;00m[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00m[04m[36mdt[39;49;00m[37m[39;49;00m
[34mif[39;49;00m [31m__name__[39;49;00m == [33m"[39;49;00m[33m__main__[39;49;00m[33m"[39;49;00m:[37m[39;49;00m
    base_dir = [33m"[39;49;00m[33m/opt/ml/processing[39;49;00m[33m"[39;49;00m[37m[39;49;00m
    [37m#Read Data[39;49;00m[37m[39;49;00m
    df = pd.read_csv([37m[39;49;00m
        [33mf[39;49;00m[33m"[39;49;00m[33m{[39;

In [9]:
# Define Processing Step for Feature Engineering
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

framework_version = "1.0-1"
sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",\
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",\
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",\
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"preprocess-churn.py",
)
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)



In [10]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.tuner import (
    IntegerParameter,
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
)
from sagemaker.workflow.steps import TuningStep

model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)
fixed_hyperparameters = {
"eval_metric":"auc",
"objective":"binary:logistic",
"num_round":"100",
"rate_drop":"0.3",
"tweedie_variance_power":"1.4"
}
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    hyperparameters=fixed_hyperparameters,
    output_path=model_path,
    base_job_name=f"churn-train",
    sagemaker_session=pipeline_session,
    role=role)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


In [11]:
hyperparameter_ranges = {
"eta": ContinuousParameter(0, 1),
"min_child_weight": ContinuousParameter(1, 10),
"alpha": ContinuousParameter(0, 2),
"max_depth": IntegerParameter(1, 10),
}
objective_metric_name = "validation:auc"

tuner = HyperparameterTuner(
    xgb_train,
    objective_metric_name,
    hyperparameter_ranges,
    max_jobs=2,
    max_parallel_jobs=2,
)

hpo_args = tuner.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

step_tuning = TuningStep(
    name="ChurnHyperParameterTuning",
    step_args=hpo_args,
)



In [12]:
!wget https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-datascientists/main/Section-16-mlops-pipeline/evaluate-churn.py

--2025-06-04 12:06:39--  https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-datascientists/main/Section-16-mlops-pipeline/evaluate-churn.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.110.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1309 (1.3K) [text/plain]
Saving to: ‘evaluate-churn.py’


2025-06-04 12:06:39 (107 MB/s) - ‘evaluate-churn.py’ saved [1309/1309]



In [13]:
!pygmentize "evaluate-churn.py"

[34mimport[39;49;00m[37m [39;49;00m[04m[36mjson[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m[37m [39;49;00m[04m[36mpathlib[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m[37m [39;49;00m[04m[36mpickle[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m[37m [39;49;00m[04m[36mtarfile[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m[37m [39;49;00m[04m[36mjoblib[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m[37m [39;49;00m[04m[36mnumpy[39;49;00m[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00m[04m[36mnp[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m[37m [39;49;00m[04m[36mpandas[39;49;00m[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00m[04m[36mpd[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m[37m [39;49;00m[04m[36mxgboost[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m[37m [39;49;00m[04m[36mdatetime[39;49;00m[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00m[04m[36mdt[39;49;00m[37m[39;49;00m
[34mfrom[39;49;00m[37m 

In [14]:
# define model evaluation step to evaluate the trained model
from sagemaker.processing import ScriptProcessor
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)
eval_args = script_eval.run(
     inputs=[
            ProcessingInput(
                source=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix="output"),
                destination="/opt/ml/processing/model"
            ),
            ProcessingInput(
                source=step_process.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test"
            )
        ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",\
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"evaluate-churn.py",
)
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)



In [15]:
from sagemaker import Model
from sagemaker.workflow.model_step import ModelStep

model = Model(
    image_uri=image_uri,
    model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix="output"),
    sagemaker_session=pipeline_session,
    role=role,
)
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)



In [16]:
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)
step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register],
)

In [17]:
import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        model_approval_status,
        input_data,
        auc_score_threshold,
    ],
    steps=[step_process, step_tuning, step_eval, step_cond],
) 
definition = json.loads(pipeline.definition())
print(definition)



{'Version': '2020-12-01', 'Metadata': {}, 'Parameters': [{'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1}, {'Name': 'ProcessingInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m5.xlarge'}, {'Name': 'TrainingInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m5.xlarge'}, {'Name': 'ModelApprovalStatus', 'Type': 'String', 'DefaultValue': 'PendingManualApproval'}], 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'}, 'TrialName': {'Get': 'Execution.PipelineExecutionId'}}, 'Steps': [{'Name': 'ChurnModelProcess', 'Type': 'Processing', 'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge', 'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'}, 'VolumeSizeInGB': 30}}, 'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3', 'ContainerEntrypoint': ['python3', '/opt/ml/processing/input/code/preprocess-churn.py']}, 'RoleArn': 'arn:aws:i

In [18]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=role)
# start Pipeline execution
pipeline.start()



_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:640168443051:pipeline/sagemaker-mlops-train-pipeline/execution/t4ke8qrizpta', sagemaker_session=<sagemaker.session.Session object at 0x7fe615a3d340>)