# CI/CD Pipeline 

In [2]:
!pip install -U sagemaker
!pip install ipython

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
import os
import sys
import boto3
import sagemaker
import argparse
import requests
import tempfile
import pandas as pd
import json
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"ChurnModelPackageGroupName"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [4]:
# Read csv
df = pd.read_csv('data/internet_service_churn.csv')
df.head()

Unnamed: 0,id,is_tv_subscriber,is_movie_package_subscriber,subscription_age,bill_avg,reamining_contract,service_failure_count,download_avg,upload_avg,download_over_limit,churn
0,15,1,0,11.95,25,0.14,0,8.4,2.3,0,0
1,18,0,0,8.22,0,,0,0.0,0.0,0,1
2,23,1,0,8.91,16,0.0,0,13.7,0.9,0,1
3,27,0,0,6.87,21,,1,0.0,0.0,0,1
4,34,0,0,6.39,0,,0,0.0,0.0,0,1


In [5]:
# Set aside some data for batch transform
X = df.drop(labels = ['churn','id'],axis = 1)
y = df['churn']
X_df,X_batch,y_df,y_batch = train_test_split(X,y,test_size = 0.1,random_state = 24)



X_df['churn'] = y_df
X_batch['churn'] = y_batch
X_df.insert(0, 'churn', X_df.pop('churn')) 
X_batch.insert(0, 'churn', X_batch.pop('churn')) 
# Save as dfs
X_df.to_csv("internet_churn.csv")
X_batch.to_csv("batch_churn.csv")


In [6]:
X_df.head()

Unnamed: 0,churn,is_tv_subscriber,is_movie_package_subscriber,subscription_age,bill_avg,reamining_contract,service_failure_count,download_avg,upload_avg,download_over_limit
24466,0,1,1,5.75,15,1.32,0,45.1,7.9,0
45438,0,1,1,2.69,21,1.5,0,0.2,0.0,0
63208,0,1,1,0.79,28,1.43,5,131.6,9.2,0
37070,0,1,1,3.69,20,0.19,0,103.5,10.7,0
305,1,0,0,10.09,20,,0,0.0,0.0,0


In [7]:
# Make sure to get in correct dictionary
os.getcwd()

'/root/AAI-540-Internet-Churn-Project'

### Use lab 6 code as guide

In [8]:
# Input data 
local_path = "internet_churn.csv"

s3 = boto3.resource("s3")

base_uri = f"s3://{default_bucket}/churn"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-us-east-1-601712311111/churn/internet_churn.csv


In [9]:
# Batch data
local_path = "batch_churn.csv"

s3 = boto3.resource("s3")


base_uri = f"s3://{default_bucket}/churn"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

s3://sagemaker-us-east-1-601712311111/churn/batch_churn.csv


In [10]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
# Accuracy threshold
acc_threshold = ParameterFloat(name="AccThreshold", default_value=0.8)

In [11]:
!mkdir -p code

In [12]:
#Code preprocessing step

In [13]:
%%writefile code/preprocessing.py
import argparse
import requests
import tempfile
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
import os
import io
import time
from time import strftime, gmtime
if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/internet_churn.csv")
    
    # Fix spelling error in column
    df = df.rename(columns = {'reamining_contract':'remaining_contract'})
    df['remaining_contract'] = df['remaining_contract'].astype(str)
    
    # Fix negative values
    df = df[df[df.columns].min(axis=1) >= 0]
    
    # Discretize column
    df['remaining_contract'].replace('nan', 'no contract', inplace=True)
    for i in df['remaining_contract']:
        try:
            if float(i) >= 0 and float(i) <1:
                df['remaining_contract'].replace(i, '0-1 years', inplace=True)
            elif float(i) >= 1 and float(i) < 2:
                df['remaining_contract'].replace(i, '1-2 years', inplace=True)
            elif float(i) >= 2 and float(i)<3:
                df['remaining_contract'].replace(i, '2-3 years', inplace=True)
        except:
            continue
            
    # Fill na with column median 
    df[['download_avg','upload_avg']] = df[['download_avg','upload_avg']].fillna(df[['download_avg','upload_avg']].median())
    
    # Get dummy variables
    df = pd.get_dummies(df, columns = ['remaining_contract'],dtype = int)
    
    # Rename columns
    df= df.rename({'remaining_contract_0-1 years':'remaining_contract_0-1_years',
                  'remaining_contract_1-2 years': 'remaining_contract_1-2_years',
                  'remaining_contract_2-3 years': 'remaining_contract_2-3_years',
                  'remaining_contract_no contract':'remaining_contract_no_contract'},axis = 1)
    train, validation, test = np.split(df, [int(0.7 * len(df)), int(0.85 * len(df))])
    
    # Scale Variables
    cols_scale = ['subscription_age','bill_avg','service_failure_count','download_avg','upload_avg','download_over_limit']
    SS = ColumnTransformer([('scaler',StandardScaler(),cols_scale)],remainder='passthrough')
    train = SS.fit_transform(pd.DataFrame(train))
    validation = SS.fit_transform(pd.DataFrame(validation))
    test = SS.fit_transform(pd.DataFrame(test))
    
    # Save to csv
    pd.DataFrame(train).to_csv(
        f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(
        f"{base_dir}/test/test.csv", header=False, index=False)
    


Overwriting code/preprocessing.py


In [14]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-churn-process",
    role=role,
    sagemaker_session=pipeline_session,
)

In [15]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="ChurnProcess", step_args=processor_args)



In [16]:
# Training step definition
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/ChurnTrain"

image = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)

# xgb_train = Estimator(
#     image,
#     role,
#     instance_count=1,
#     instance_type="ml.m5.xlarge",
#     volume_size=50,
#     input_mode="File",
#     output_path=model_path,
#     sagemaker_session=sagemaker_session,
# )

# TODO: Estimator must use pipeline_session so that fit() is not evaluated until pipeline runs
xgb_train = Estimator(
    image_uri=image,
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size=50,
    output_path=model_path,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    eval_metric = 'accuracy',
    objective='binary:logistic',
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

# TODO: Review Changes
train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

In [17]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


# TODO: Training step must use result of fit() as input to step_args
step_train = TrainingStep(
    name="ChurnTrain",
    step_args=train_args,
)

# step_train = TrainingStep(
#     name="ChurnTrain",
#     estimator = xgb_train,
#     inputs = [TrainingInput(
#             s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
#             content_type="text/csv",
#         ), TrainingInput(
#             s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
#                 "validation"
#             ].S3Output.S3Uri,
#             content_type="text/csv",
#         )]
# )

In [18]:
%%writefile code/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df['churn'].to_numpy()
    df.drop(df['churn'], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    acc = accuracy_score(y_test, predictions)
    precision = precision_score(y_test, predictions)
    recall = recall_score(y_test, predictions)
    f1 = f1_score(y_test, predictions)

    report_dict = {
        "regression_metrics": {
            "Accuracy": {"Value": acc},
            "Precision": {"Value":precision},
            "Recall": {"Value":recall},
            "F1 Score": {"Value":f1}
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting code/evaluation.py


In [19]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-churn-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

In [20]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="ChurnEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

In [21]:
# Create model
from sagemaker.model import Model

model = Model(
    image_uri=image,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

In [22]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="ChurnCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

In [23]:
# Transform step
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/ChurnTransform",
)

In [24]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="ChurnTransform", transformer=transformer, inputs=TransformInput(data=batch_data)
)


In [25]:
# Register model
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="ChurnRegisterModel", step_args=register_args)



In [26]:
# Fail step
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="ChurnAccFail",
    error_message=Join(on=" ", values=["Execution failed due to Accuracy <", acc_threshold]),
)

In [27]:
# Condition to register model -> Greater than accuracy threshold
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.Accuracy.Value",
    ),
    right=acc_threshold,
)

step_cond = ConditionStep(
    name="ChurnAccCond",
    conditions=[cond_gte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

In [28]:
# Define a pipeline with above steps
from sagemaker.workflow.pipeline import Pipeline

input_data_input = ProcessingInput(input_name="input_data", source=input_data)
batch_data_input = ProcessingInput(input_name="batch_data", source=batch_data)


# TODO: Pipeline must receive only parameter* objects as inputs for parameters
pipeline_name = f"ChurnPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        acc_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

In [29]:
# TODO: Review pipeline before you upsert pipeline

import json


definition = json.loads(pipeline.definition())
definition



{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-601712311111/churn/internet_churn.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-601712311111/churn/batch_churn.csv'},
  {'Name': 'AccThreshold', 'Type': 'Float', 'DefaultValue': 0.8}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'ChurnProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.Proces

In [30]:
# Start pipeline execution
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:601712311111:pipeline/ChurnPipeline',
 'ResponseMetadata': {'RequestId': '679764e6-bb11-4d9b-a536-413c3f598fe7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '679764e6-bb11-4d9b-a536-413c3f598fe7',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '81',
   'date': 'Wed, 28 Feb 2024 02:08:18 GMT'},
  'RetryAttempts': 0}}

In [31]:
execution = pipeline.start()

In [32]:
# Describe execution
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:601712311111:pipeline/ChurnPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:601712311111:pipeline/ChurnPipeline/execution/ci1ivwnq02mv',
 'PipelineExecutionDisplayName': 'execution-1709086099552',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2024, 2, 28, 2, 8, 19, 490000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 2, 28, 2, 8, 19, 490000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:601712311111:user-profile/d-3d9xspmqlffg/ltaylor',
  'UserProfileName': 'ltaylor',
  'DomainId': 'd-3d9xspmqlffg'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:601712311111:user-profile/d-3d9xspmqlffg/ltaylor',
  'UserProfileName': 'ltaylor',
  'DomainId': 'd-3d9xspmqlffg'},
 'ResponseMetadata': {'RequestId': '7fd6444f-d6e2-43fc-8e06-a29bedc1128e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '7fd6444f-d6e2-43fc-8e06-a29

In [33]:
# Wait for execution to complete
execution.wait()

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

In [34]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:601712311111:pipeline/ChurnPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:601712311111:pipeline/ChurnPipeline/execution/ci1ivwnq02mv',
 'PipelineExecutionDisplayName': 'execution-1709086099552',
 'PipelineExecutionStatus': 'Failed',
 'PipelineExperimentConfig': {'ExperimentName': 'churnpipeline',
  'TrialName': 'ci1ivwnq02mv'},
 'FailureReason': 'Step failure: One or multiple steps failed.',
 'CreationTime': datetime.datetime(2024, 2, 28, 2, 8, 19, 490000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 2, 28, 2, 15, 46, 792000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:601712311111:user-profile/d-3d9xspmqlffg/ltaylor',
  'UserProfileName': 'ltaylor',
  'DomainId': 'd-3d9xspmqlffg'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:601712311111:user-profile/d-3d9xspmqlffg/ltaylor',
  'UserProfileName': 'ltaylor',
  'DomainId': 'd-3d9xspmqlffg'},
 'Resp

In [35]:
execution.list_steps()

[{'StepName': 'ChurnTrain',
  'StartTime': datetime.datetime(2024, 2, 28, 2, 13, 37, 566000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 2, 28, 2, 15, 46, 462000, tzinfo=tzlocal()),
  'StepStatus': 'Failed',
  'AttemptCount': 1,
  'FailureReason': 'ClientError: AlgorithmError: framework error: \nTraceback (most recent call last):\n  File "/miniconda3/lib/python3.7/site-packages/sagemaker_xgboost_container/algorithm_mode/train.py", line 226, in train_job\n    verbose_eval=False)\n  File "/miniconda3/lib/python3.7/site-packages/xgboost/training.py", line 209, in train\n    xgb_model=xgb_model, callbacks=callbacks)\n  File "/miniconda3/lib/python3.7/site-packages/xgboost/training.py", line 74, in _train_internal\n    bst.update(dtrain, i, obj)\n  File "/miniconda3/lib/python3.7/site-packages/xgboost/core.py", line 1248, in update\n    dtrain.handle))\n  File "/miniconda3/lib/python3.7/site-packages/xgboost/core.py", line 189, in _check_call\n    raise XGBoostError(py_str(_LIB.

In [None]:
# Veiw evaluation results
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

In [None]:
# Artifact lineage
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

In [None]:
execution = pipeline.start(
    parameters=dict(
        ModelApprovalStatus="Approved",
    )
)

In [None]:
execution.wait()

In [None]:
execution.list_steps()