<a href="https://colab.research.google.com/github/nraptis/Machine-Learning-Notebooks/blob/main/SageMaker_Pipelines_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Step 1: Import Packages and Declare Constants

In [None]:
import boto3
import sagemaker
import datetime as dt
import pandas as pd

sagemaker.config INFO - Fetched defaults config from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix


In [None]:
default_bucket = "sagemaker-us-east-2-005200800801"
region = boto3.Session().region_name
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
sklearn_processor_version="0.23-1"
model_package_group_name="ChurnModelPackageGroup"
pipeline_name= "ChurnModelSMPipeline"
clarify_image = sagemaker.image_uris.retrieve(framework='sklearn',version=sklearn_processor_version,region=region)

sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix


## Step 2: Generate Baseline Dataset

Baseline Data will be used as part of SageMaker Clarify Step to generate SHAP Values

In [None]:
def preprocess_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df

In [None]:
baseline_data = preprocess_data("data/storedata_total.csv")
baseline_data.pop("retained")
baseline_sample = baseline_data.sample(frac=0.0002)

In [None]:
pd.DataFrame(baseline_sample).to_csv("data/baseline.csv",header=False,index=False)

## Step 3: Generate Batch Dataset

In [None]:
batch_data = preprocess_data("data/storedata_total.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)

In [None]:
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

## Step 4: Copy Data and Scripts to S3 Bucket

In [None]:
s3_client = boto3.resource('s3')
s3_client.Bucket(default_bucket).upload_file("data/storedata_total.csv","data/storedata_total.csv")
s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/baseline.csv","input/baseline/baseline.csv")

In [None]:
s3_client.Bucket(default_bucket).upload_file("/mnt/custom-file-systems/s3/shared/pipelines/customerchurn/preprocess.py","input/code/preprocess.py")
s3_client.Bucket(default_bucket).upload_file("/mnt/custom-file-systems/s3/shared/pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
s3_client.Bucket(default_bucket).upload_file("/mnt/custom-file-systems/s3/shared/pipelines/customerchurn/generate_config.py","input/code/generate_config.py")

## Step 5: Get the Pipeline Instance

In [None]:
import sys
from pathlib import Path

ROOT = Path("/mnt/custom-file-systems/s3/shared")
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))
print("sys.path[0] =", sys.path[0])

from pipelines.customerchurn.pipeline import get_pipeline

pipeline = get_pipeline(
    region = region,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name=model_package_group_name,
    pipeline_name=pipeline_name,
    custom_image_uri=clarify_image,
    sklearn_processor_version=sklearn_processor_version
)

sys.path[0] = /mnt/custom-file-systems/s3/shared
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.Environment
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix


In [None]:
pipeline.definition()

'{"Version": "2020-12-01", "Metadata": {}, "Parameters": [{"Name": "ProcessingInstanceType", "Type": "String", "DefaultValue": "ml.m5.xlarge"}, {"Name": "ProcessingInstanceCount", "Type": "Integer", "DefaultValue": 1}, {"Name": "TrainingInstanceType", "Type": "String", "DefaultValue": "ml.m5.xlarge"}, {"Name": "InputData", "Type": "String", "DefaultValue": "s3://sagemaker-us-east-2-005200800801/data/storedata_total.csv"}, {"Name": "BatchData", "Type": "String", "DefaultValue": "s3://sagemaker-us-east-2-005200800801/data/batch/batch.csv"}], "PipelineExperimentConfig": {"ExperimentName": {"Get": "Execution.PipelineName"}, "TrialName": {"Get": "Execution.PipelineExecutionId"}}, "Steps": [{"Name": "ChurnModelProcess", "Type": "Processing", "Arguments": {"ProcessingResources": {"ClusterConfig": {"InstanceType": {"Get": "Parameters.ProcessingInstanceType"}, "InstanceCount": {"Get": "Parameters.ProcessingInstanceCount"}, "VolumeSizeInGB": 30}}, "AppSpecification": {"ImageUri": "257758044811.d

## Step 5: Submit the pipeline to SageMaker and start execution

In [None]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-2:005200800801:pipeline/ChurnModelSMPipeline',
 'ResponseMetadata': {'RequestId': 'bde65160-d7b9-4c66-b939-d704faa77a05',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'bde65160-d7b9-4c66-b939-d704faa77a05',
   'strict-transport-security': 'max-age=47304000; includeSubDomains',
   'x-frame-options': 'DENY',
   'content-security-policy': "frame-ancestors 'none'",
   'cache-control': 'no-cache, no-store, must-revalidate',
   'x-content-type-options': 'nosniff',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '88',
   'date': 'Mon, 15 Dec 2025 22:41:43 GMT'},
  'RetryAttempts': 0}}

Start Pipeline Execution

In [None]:
execution = pipeline.start(parameters={
    "ProcessingInstanceType": "ml.m5.large",   # or ml.t3.medium
    "ProcessingInstanceCount": 1
})
print(execution.arn)

arn:aws:sagemaker:us-east-2:005200800801:pipeline/ChurnModelSMPipeline/execution/8iz8pjtkklem


Now we describe execution instance and list the steps in the execution to find out more about the execution.

In [None]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-2:005200800801:pipeline/ChurnModelSMPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-2:005200800801:pipeline/ChurnModelSMPipeline/execution/8iz8pjtkklem',
 'PipelineExecutionDisplayName': 'execution-1765838585787',
 'PipelineExecutionStatus': 'Failed',
 'PipelineExperimentConfig': {'ExperimentName': 'ChurnModelSMPipeline',
  'TrialName': '8iz8pjtkklem'},
 'FailureReason': 'Step failure: One or multiple steps failed.',
 'CreationTime': datetime.datetime(2025, 12, 15, 22, 43, 5, 655000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2025, 12, 15, 22, 43, 8, 119000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-2:005200800801:user-profile/d-lzsyji8vg5he/7425b343-bb8d-484c-b1b3-7036868d29a9',
  'UserProfileName': '7425b343-bb8d-484c-b1b3-7036868d29a9',
  'DomainId': 'd-lzsyji8vg5he',
  'IamIdentity': {'Arn': 'arn:aws:sts::005200800801:assumed-role/AmazonSageMakerAdminIAMExecutionRole/SageMa

We can list the execution steps to check out the status and artifacts:

In [None]:
execution.list_steps()

[{'StepName': 'ChurnModelProcess',
  'StartTime': datetime.datetime(2025, 12, 15, 22, 43, 6, 609000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 12, 15, 22, 43, 7, 815000, tzinfo=tzlocal()),
  'StepStatus': 'Failed',
  'FailureReason': "ClientError: Failed to invoke sagemaker:CreateProcessingJob. Error Details: The account-level service limit 'ml.m5.large for processing job usage' is 0 Instances, with current utilization of 0 Instances and a request delta of 1 Instances. Please use AWS Service Quotas to request an increase for this quota. If AWS Service Quotas is not available, contact AWS support to request an increase for this quota.\nRetry not appropriate on execution of step with PipelineExecutionArn arn:aws:sagemaker:us-east-2:005200800801:pipeline/churnmodelsmpipeline/execution/8iz8pjtkklem and StepId ChurnModelProcess. No retry policy configured for the exception type SAGEMAKER_RESOURCE_LIMIT.",
  'Metadata': {},
  'AttemptCount': 1}]