Step 1: Import Packages and Declare Constants

In [41]:
import boto3
import sagemaker
import datetime as dt
import pandas as pd

In [42]:
#Replace this value with the S3 Bucket Created
default_bucket = "sagemaker-studio-975049910566-xx42t2smsb"

In [43]:
region = boto3.Session().region_name
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
sklearn_processor_version="0.23-1"
model_package_group_name="ChurnModelPackageGroup"
pipeline_name= "ChurnModelSMPipeline"
clarify_image = sagemaker.image_uris.retrieve(framework='sklearn',version=sklearn_processor_version,region=region)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


Step 2: Generate Baseline Dataset

Baseline Data will be used as part of SageMaker Clarify Step to generate SHAP Values

In [44]:
def preprocess_data(file_path):
    df = pd.read_csv(file_path)
    ## Convert to datetime columns
    df["firstorder"]=pd.to_datetime(df["firstorder"],errors='coerce')
    df["lastorder"] = pd.to_datetime(df["lastorder"],errors='coerce')
    ## Drop Rows with null values
    df = df.dropna()
    ## Create Column which gives the days between the last order and the first order
    df["first_last_days_diff"] = (df['lastorder']-df['firstorder']).dt.days
    ## Create Column which gives the days between when the customer record was created and the first order
    df['created'] = pd.to_datetime(df['created'])
    df['created_first_days_diff']=(df['created']-df['firstorder']).dt.days
    ## Drop Columns
    df.drop(['custid','created','firstorder','lastorder'],axis=1,inplace=True)
    ## Apply one hot encoding on favday and city columns
    df = pd.get_dummies(df,prefix=['favday','city'],columns=['favday','city'])
    return df

Make sure to upload "storedata_total - data.csv" to data subdirectory of current working directory.

In [45]:
baseline_data = preprocess_data("data/storedata_total - data.csv")
baseline_data.pop("retained")
baseline_sample = baseline_data.sample(frac=0.0002)

In [46]:
pd.DataFrame(baseline_sample).to_csv("data/baseline.csv",header=False,index=False)

Confirm in File Browser that "baseline.csv" file was created in data directory 

Step 3: Generate Batch Dataset

In [47]:
batch_data = preprocess_data("data/storedata_total - data.csv")
batch_data.pop("retained")
batch_sample = batch_data.sample(frac=0.2)

In [48]:
pd.DataFrame(batch_sample).to_csv("data/batch.csv",header=False,index=False)

Step 4: Copy Data and Scripts to S3 Bucket

In [49]:
s3_client = boto3.resource('s3')
s3_client.Bucket(default_bucket).upload_file("data/storedata_total - data.csv","data/storedata_total.csv")
s3_client.Bucket(default_bucket).upload_file("data/batch.csv","data/batch/batch.csv")
s3_client.Bucket(default_bucket).upload_file("data/baseline.csv","input/baseline/baseline.csv")

In [50]:
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/preprocess.py","input/code/preprocess.py")
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/evaluate.py","input/code/evaluate.py")
s3_client.Bucket(default_bucket).upload_file("pipelines/customerchurn/generate_config.py","input/code/generate_config.py")

Step 5: Get the Pipeline Instance

In [51]:
from pipelines.customerchurn.pipeline import get_pipeline

pipeline = get_pipeline(
    region = region,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name=model_package_group_name,
    pipeline_name=pipeline_name,
    custom_image_uri=clarify_image,
    sklearn_processor_version=sklearn_processor_version
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.
The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is interpreted in pipeline execution time only. As the function needs to evaluate the argument value in SDK compile time, the default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


In [52]:
pipeline.definition()

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'HyperParameterTuningJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
No finished train

'{"Version": "2020-12-01", "Metadata": {}, "Parameters": [{"Name": "ProcessingInstanceType", "Type": "String", "DefaultValue": "ml.m5.xlarge"}, {"Name": "ProcessingInstanceCount", "Type": "Integer", "DefaultValue": 1}, {"Name": "TrainingInstanceType", "Type": "String", "DefaultValue": "ml.m5.xlarge"}, {"Name": "InputData", "Type": "String", "DefaultValue": "s3://sagemaker-studio-975049910566-xx42t2smsb/data/storedata_total.csv"}, {"Name": "BatchData", "Type": "String", "DefaultValue": "s3://sagemaker-studio-975049910566-xx42t2smsb/data/batch/batch.csv"}], "PipelineExperimentConfig": {"ExperimentName": {"Get": "Execution.PipelineName"}, "TrialName": {"Get": "Execution.PipelineExecutionId"}}, "Steps": [{"Name": "ChurnModelProcess", "Type": "Processing", "Arguments": {"ProcessingResources": {"ClusterConfig": {"InstanceType": {"Get": "Parameters.ProcessingInstanceType"}, "InstanceCount": {"Get": "Parameters.ProcessingInstanceCount"}, "VolumeSizeInGB": 30}}, "AppSpecification": {"ImageUri":

Step 6: Submit the pipeline to SageMaker and start execution

In [53]:
pipeline.upsert(role_arn=role)

Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'HyperParameterTuningJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ProcessingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
No finished train

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:975049910566:pipeline/ChurnModelSMPipeline',
 'ResponseMetadata': {'RequestId': 'a1332a2e-f759-4392-8910-df0c37fe257b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a1332a2e-f759-4392-8910-df0c37fe257b',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '88',
   'date': 'Tue, 20 Feb 2024 21:00:46 GMT'},
  'RetryAttempts': 0}}

In [54]:
execution = pipeline.start()

Now we describe execution instance and list the steps in the execution to find out more about the execution.

In [55]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:975049910566:pipeline/ChurnModelSMPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:975049910566:pipeline/ChurnModelSMPipeline/execution/fcmf0b13y2h1',
 'PipelineExecutionDisplayName': 'execution-1708462854079',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'churnmodelsmpipeline',
  'TrialName': 'fcmf0b13y2h1'},
 'CreationTime': datetime.datetime(2024, 2, 20, 21, 0, 54, 11000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 2, 20, 21, 0, 54, 11000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:975049910566:user-profile/d-pzzjmlf5cqyf/default-20240210t162844',
  'UserProfileName': 'default-20240210t162844',
  'DomainId': 'd-pzzjmlf5cqyf'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:975049910566:user-profile/d-pzzjmlf5cqyf/default-20240210t162844',
  'UserProfileName': 'default-20240210t162844',
  'DomainId': 'd-

We can list the execution steps to check out the status and artifacts:

In [60]:
execution.list_steps()

[{'StepName': 'ClarifyProcessingStep',
  'StartTime': datetime.datetime(2024, 2, 20, 21, 19, 26, 81000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 2, 20, 21, 36, 51, 809000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:975049910566:processing-job/pipelines-fcmf0b13y2h1-ClarifyProcessingSte-4JgtRy214n'}},
  'AttemptCount': 1},
 {'StepName': 'ChurnModelConfigFile',
  'StartTime': datetime.datetime(2024, 2, 20, 21, 14, 27, 396000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 2, 20, 21, 19, 25, 540000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:975049910566:processing-job/pipelines-fcmf0b13y2h1-ChurnModelConfigFile-EgZV3y8c8K'}},
  'AttemptCount': 1},
 {'StepName': 'ChurnTransform',
  'StartTime': datetime.datetime(2024, 2, 20, 21, 14, 27, 396000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 2, 20, 21, 19, 50, 7

Judging from the failure reason, we need to use an instance type for our computing that is actually offered, not ml.m5.xlarge, in order to get the pipeline working. We'll need to go into our pipeline.py file and modify the instance type wherever appropriate. We changed any mentions of "ml.m5.xlarge" to "ml.t3.medium" in that file. Now we need to reinstantiate the pipeline at the beginning of step 5 and rerun all code after that point. We will see if we still get a failure and if so, what the reason is now.

We are still getting the same error; so we probably need to edit some other file(s). I will investigate this and report back.

I didn't find any mentions of the "ml.m5.xlarge" instance type in any of the other files in our project. Creating or modifying the SDK configuration files which were mentioned in the output of instantiating the pipeline, back in step 5, in order set the instance type(s) used by the SDK therein, might work to fix this issue, but we could also heed the advice of the FailureReason returned by execution.list_steps() to request an increase for our quota of the ml.m5.xlarge instance through AWS Service Quotas or AWS support. I will elect to do the latter since it would be nice to use the same instances used by the creators of the project demo, and for the pipeline to run as quickly as possible. The only downside to this is it may take a number of days for the AWS support team to respond, but I will find other things to work on in the meantime. I will go ahead and undo the changes I made to the instance types declared in pipeline.py, since they were inconsequential. 

We have come back after a few days and AWS has increased our quota of these ml.m5.xlarge instances from 0 to 8. I also noticed that I ran up a very heavy bill from SageMaker continually running as I failed to stop this particular instance when I paused a few days ago. :( I'm going to call AWS customer service and hopefully they can let me off the hook from having to pay almost $600. 

I just executed the pipeline and it seems like things are running smoothly. It has successfully completed a number of steps and is now on the ClarifyProcessingStep.

After checking again everything completed successfully, including the clarify step. We should check our S3 bucket and verify there are some artifacts from running this pipeline. We in fact did generate new files and folders, including the ChurnTransform and clarify-output folders, and a evaluation.json file within output/evaluation, which reports an AUC score of .979.

We have demonstrated that this pipeline example indeed works, though there were some gotchas and a few hoops we had to jump through like requesting a higher quota for a larger compute instance to get things working. 