In [5]:
!pip install --upgrade boto3



In [6]:
import boto3
import pandas as pd
from sagemaker.workflow.pipeline_context import PipelineSession

  from pandas.core.computation.check import NUMEXPR_INSTALLED


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


In [7]:
import sagemaker
pipeline_name = f"sagemaker-mlops-train-pipeline"

sagemaker_session = sagemaker.Session()
s3_client = boto3.resource('s3')
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket= sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroup"


In [8]:
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
ParameterFloat)

auc_score_threshold = 0.75
base_job_prefix = 'churn-example'
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString( name="ProcessingInstanceType", default_value="ml.m5.xlarge") 
training_instance_type = ParameterString( name="TrainingInstanceType", default_value="ml.m5.xlarge") 
model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval")

## Data Collection

In [9]:
!wget https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-datascientists/main/Section-16-mlops-pipeline/dataset/storedata_total.xlsx


--2024-02-26 22:11:46--  https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-datascientists/main/Section-16-mlops-pipeline/dataset/storedata_total.xlsx
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2713209 (2.6M) [application/octet-stream]
Saving to: ‘storedata_total.xlsx.1’


2024-02-26 22:11:46 (155 MB/s) - ‘storedata_total.xlsx.1’ saved [2713209/2713209]



In [10]:
store_data = pd.read_excel('storedata_total.xlsx')
store_data.to_csv("storedata_total.csv")

  for idx, row in parser.parse():


## Processing step

In [11]:
!wget https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-datascientists/main/Section-16-mlops-pipeline/preprocess-churn.py


--2024-02-26 22:11:54--  https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-datascientists/main/Section-16-mlops-pipeline/preprocess-churn.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2156 (2.1K) [text/plain]
Saving to: ‘preprocess-churn.py.1’


2024-02-26 22:11:54 (38.8 MB/s) - ‘preprocess-churn.py.1’ saved [2156/2156]



In [12]:
!pygmentize "preprocess-churn.py"


[34mimport[39;49;00m [04m[36mos[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mtempfile[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mnumpy[39;49;00m [34mas[39;49;00m [04m[36mnp[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mdatetime[39;49;00m [34mas[39;49;00m [04m[36mdt[39;49;00m[37m[39;49;00m
[34mif[39;49;00m [31m__name__[39;49;00m == [33m"[39;49;00m[33m__main__[39;49;00m[33m"[39;49;00m:[37m[39;49;00m
    base_dir = [33m"[39;49;00m[33m/opt/ml/processing[39;49;00m[33m"[39;49;00m[37m[39;49;00m
    [37m#Read Data[39;49;00m[37m[39;49;00m
    df = pd.read_csv([37m[39;49;00m
        [33mf[39;49;00m[33m"[39;49;00m[33m{[39;49;00mbase_dir[33m}[39;49;00m[33m/input/storedata_total.csv[39;49;00m[33m"[39;49;00m[37m[39;49;00m
    )[37m[39;49;00m
    [37m# convert created column to

In [13]:
input_data = "storedata_total.csv" 


In [14]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

In [15]:
framework_version = "1.0-1"
sklearn_processor = SKLearnProcessor(
        framework_version=framework_version,
        instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)

processor_args = sklearn_processor.run(
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train",\
                         destination=f"s3://{default_bucket}/output/train" ),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",\
                        destination=f"s3://{default_bucket}/output/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test",\
                        destination=f"s3://{default_bucket}/output/test")
    ],
    code=f"preprocess-churn.py",
)



In [16]:
step_process = ProcessingStep(name="ChurnModelProcess", step_args=processor_args)

## Hyperparam Tuning

In [17]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.tuner import (
        IntegerParameter,
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner)
from sagemaker.workflow.steps import TuningStep


In [18]:
model_path = f"s3://{default_bucket}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


In [19]:
fixed_hyperparameters = {
"eval_metric":"auc",
"objective":"binary:logistic",
"num_round":"100",
"rate_drop":"0.3",
"tweedie_variance_power":"1.4"
}

In [20]:
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    hyperparameters=fixed_hyperparameters,
    output_path=model_path,
    base_job_name=f"churn-train",
    sagemaker_session=pipeline_session,
    role=role,
)

In [21]:
hyperparameter_ranges = {
"eta": ContinuousParameter(0, 1),
"min_child_weight": ContinuousParameter(1, 10),
"alpha": ContinuousParameter(0, 2),
"max_depth": IntegerParameter(4, 10),
}

In [22]:
objective_metric_name = "validation:auc"

tuner = HyperparameterTuner(
    xgb_train,
    objective_metric_name,
    hyperparameter_ranges,
    max_jobs=2,
    max_parallel_jobs=2,
)


In [23]:
hpo_args = tuner.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

In [24]:
step_tuning = TuningStep(
    name="ChurnHyperParameterTuning",
    step_args=hpo_args,
)

## Evaluation step

In [25]:
!wget https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-datascientists/main/Section-16-mlops-pipeline/evaluate-churn.py


--2024-02-26 22:11:55--  https://raw.githubusercontent.com/manifoldailearning/mlops-with-aws-datascientists/main/Section-16-mlops-pipeline/evaluate-churn.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1309 (1.3K) [text/plain]
Saving to: ‘evaluate-churn.py.1’


2024-02-26 22:11:55 (69.4 MB/s) - ‘evaluate-churn.py.1’ saved [1309/1309]



In [26]:
from sagemaker.processing import ScriptProcessor
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session)

In [27]:
eval_args = script_eval.run(
     inputs=[
            ProcessingInput(
                source=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix="output"),
                destination="/opt/ml/processing/model"
            ),
            ProcessingInput(
                source=step_process.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test"
            )
        ],
    outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation",\
                             destination=f"s3://{default_bucket}/output/evaluation"),
        ],
    code=f"evaluate-churn.py",
)

In [28]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="ChurnEvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEvalModel",
    step_args=eval_args,
    property_files=[evaluation_report])

## Define a register model step

In [29]:
from sagemaker import Model
from sagemaker.workflow.model_step import ModelStep

model = Model(image_uri= image_uri,
    model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=default_bucket,prefix="output"),
    sagemaker_session=pipeline_session,
    role=role,
)

In [30]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)



In [31]:
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)

## Condition step to check AUC

In [33]:
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.auc_score.value",
    ),
    right=auc_score_threshold,
)

step_cond = ConditionStep(
    name="CheckAUCScoreChurnEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register],
)

## Build and Trigger the pipeline run

In [41]:
import json
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(name= pipeline_name,
                   parameters=[
                       processing_instance_count,
                       processing_instance_type,
                       training_instance_type,
                       model_approval_status,
                       input_data,
                       auc_score_threshold,
                   ],
                    steps=[step_process, step_tuning, step_eval, step_cond],)
definition  = json.loads(pipeline.definition())
print(definition)




{'Version': '2020-12-01', 'Metadata': {}, 'Parameters': [{'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1}, {'Name': 'ProcessingInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m5.xlarge'}, {'Name': 'TrainingInstanceType', 'Type': 'String', 'DefaultValue': 'ml.m5.xlarge'}, {'Name': 'ModelApprovalStatus', 'Type': 'String', 'DefaultValue': 'PendingManualApproval'}], 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'}, 'TrialName': {'Get': 'Execution.PipelineExecutionId'}}, 'Steps': [{'Name': 'ChurnModelProcess', 'Type': 'Processing', 'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge', 'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'}, 'VolumeSizeInGB': 30}}, 'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3', 'ContainerEntrypoint': ['python3', '/opt/ml/processing/input/code/preprocess-churn.py']}, 'RoleArn': 'arn:aws:i

In [42]:
pipeline.upsert(role_arn=role)
# start Pipeline execution
pipeline.start



<bound method Pipeline.start of <sagemaker.workflow.pipeline.Pipeline object at 0x7ff4b8d94760>>