In [69]:
# install dependencies
!pip install sagemaker-experiments



In [70]:
## imports
import time
import json
from pprint import pprint

import boto3
import sagemaker
from sagemaker.sklearn import SKLearn
from sagemaker.debugger import ProfilerRule, rule_configs, DebuggerHookConfig, ProfilerConfig, FrameworkProfile
from sagemaker.inputs import TrainingInput, CreateModelInput
from sagemaker.lineage.visualizer import LineageTableVisualizer
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn import SKLearnModel
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.tuner import IntegerParameter, HyperparameterTuner, ContinuousParameter
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CacheConfig, CreateModelStep

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial


### Create sagemaker client and session

This cell sets up the SageMaker session, default bucket, IAM role, AWS region, and SageMaker client. 
The session manages SageMaker interactions, the bucket stores data, the role is used for this session, 
the region is where the session runs, and the client is a low-level API for AWS SageMaker.

In [71]:
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
# role = sagemaker.get_execution_role()
role = 'arn:aws:iam::863397112005:role/service-role/AmazonSageMaker-ExecutionRole-20231109T153131'
region = boto3.Session().region_name

sagemaker_client = boto3.Session().client(service_name="sagemaker", region_name=region)


sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/zenysisaccount/Library/Application Support/sagemaker/config.yaml


INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials


### Pipeline setup

In [72]:
timestamp = int(time.time())

pipeline_name = "fraud-pipeline-{}".format(timestamp)

pipeline_experiment = Experiment.create(
    experiment_name=pipeline_name,
    description="Online transaction fraud detection Pipeline Experiment",
    sagemaker_boto_client=sagemaker_client,
)

pipeline_experiment_name = pipeline_experiment.experiment_name

print("Pipeline experiment name: {}".format(pipeline_experiment_name))

Pipeline experiment name: fraud-pipeline-1700570555


### Create the Trial

In [73]:
pipeline_trial = Trial.create(
    trial_name="trial-{}".format(timestamp), experiment_name=pipeline_experiment_name, sagemaker_boto_client=sagemaker_client
)

pipeline_trial_name = pipeline_trial.trial_name
print("Trial name: {}".format(pipeline_trial_name))

Trial name: trial-1700570555


### Pipeline parameters
- processing_instance_type - The instance type of the processing job.
- processing_instance_count - The instance count of the processing job.
- train_instance_type - The instance type of the training job.
- model_approval_status - What approval status to register the trained model with for CI/CD purposes. Defaults to "PendingManualApproval".
- input_data - The URL location of the input data

In [74]:
timestamp = int(time.time())

raw_input_data_s3_uri = "s3://{}/data/".format(bucket)

input_data = ParameterString(
    name="InputData",
    default_value=f'{raw_input_data_s3_uri}online_fraud_dataset.csv',
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.large")

train_split_percentage = ParameterFloat(
    name="TrainSplitPercentage",
    default_value=0.70,
)

validation_split_percentage = ParameterFloat(
    name="ValidationSplitPercentage",
    default_value=0.15,
)

test_split_percentage = ParameterFloat(
    name="TestSplitPercentage",
    default_value=0.15,
)

feature_store_offline_prefix = ParameterString(
    name="FeatureStoreOfflinePrefix",
    default_value="fraud-feature-store-" + str(timestamp),
)

feature_group_name = ParameterString(name="FeatureGroupName", default_value="reviews-feature-group-" + str(timestamp))

### Create instance of SKLearnProcessor

In [75]:
processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"AWS_DEFAULT_REGION": region},
)

sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/zenysisaccount/Library/Application Support/sagemaker/config.yaml


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [76]:
processing_inputs = [
    ProcessingInput(
        input_name="raw-input-data",
        source=input_data,
        destination="/opt/ml/processing/input/data/"
    )
]

processing_outputs = [
    ProcessingOutput(
        output_name="fraud-train",
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/output/fraud/train",
    ),
    ProcessingOutput(
        output_name="fraud-validation",
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/output/fraud/validation",
    ),
    ProcessingOutput(
        output_name="fraud-test",
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/output/fraud/test",
    ),
]

processing_step = ProcessingStep(
    name="Processing",
    code="preprocess-fraud-dataset-feature-store.py",
    processor=processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=[
        "--train-split-percentage",
        str(train_split_percentage.default_value),
        "--validation-split-percentage",
        str(validation_split_percentage.default_value),
        "--test-split-percentage",
        str(test_split_percentage.default_value),
        "--feature-store-offline-prefix",
        str(feature_store_offline_prefix.default_value),
        "--feature-group-name",
        str(feature_group_name.default_value),
    ],
)

### Train step hyperparameters

In [77]:
train_instance_type = ParameterString(name="TrainInstanceType", default_value="ml.c5.2xlarge")

train_instance_count = ParameterInteger(name="TrainInstanceCount", default_value=1)

max_depth = ParameterInteger(name="MaxDepth", default_value=5)

learning_rate = ParameterFloat(name="LearningRate", default_value=0.1)

n_estimators = ParameterInteger(name="NEstimators", default_value=100)

min_child_weight = ParameterInteger(name="MinChildWeight", default_value=1)

subsample = ParameterFloat(name="Subsample", default_value=0.5)

colsample_bytree = ParameterFloat(name="ColsampleByTree", default_value=0.5)


In [78]:
estimator = SKLearn(
    entry_point='sklearn-xgboost.py',
    instance_type=train_instance_type,
    framework_version='0.23-1',
    role=role,
    sagemaker_session=sagemaker_session,
    hyperparameters={
        'n_estimators': n_estimators,
        'max_depth': max_depth,
        'learning_rate': learning_rate,
        'min_child_weight': min_child_weight,
        'subsample': subsample,
        'colsample_bytree': colsample_bytree,
    }
)



### Configure Training Step

In [79]:
# Enable caching for the model training process with a 1-hour expiration
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

training_step = TrainingStep(
    name="Train",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["fraud-train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["fraud-test"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
    cache_config=cache_config,
)

### Evaluation Step

In [80]:
evaluation_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type='ml.m5.large',
    instance_count=1,
    env={"AWS_DEFAULT_REGION": region},
    max_runtime_in_seconds=7200,
)

sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/zenysisaccount/Library/Application Support/sagemaker/config.yaml


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [81]:
evaluation_report = PropertyFile(name="EvaluationReport", output_name="metrics", path="evaluation.json")

In [82]:
evaluation_step = ProcessingStep(
    name="EvaluateModel",
    processor=evaluation_processor,
    code="evaluate_model_metrics.py",
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/input/model",
        ),
        ProcessingInput( 
            source=processing_step.properties.ProcessingOutputConfig.Outputs["fraud-validation"].S3Output.S3Uri,
            destination="/opt/ml/processing/input/data",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="metrics", s3_upload_mode="EndOfJob", source="/opt/ml/processing/output/metrics/"
        ),
    ],
    property_files=[evaluation_report],
)

In [83]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)



### Register Model Step

In [84]:
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")

deploy_instance_type = ParameterString(name="DeployInstanceType", default_value="ml.m4.xlarge")

deploy_instance_count = ParameterInteger(name="DeployInstanceCount", default_value=1)

In [85]:
# Create a ScriptModeModel
sklearn_model = SKLearnModel(
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,                       
    entry_point='inference.py',
    framework_version='1.0-1',
    py_version='py3',
    sagemaker_session=sagemaker_session,
)

In [86]:
register_step = RegisterModel(
    name="RegisterModel",
    model=sklearn_model,
    content_types=["text/csv"],
    response_types=["application/json"],
    inference_instances=[deploy_instance_type],
    transform_instances=[deploy_instance_type],
    model_package_group_name=f"Fraud-Detection-{timestamp}",
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)




### Create Model for Deployment Step

In [87]:
create_inputs = CreateModelInput(
    instance_type=deploy_instance_type,
)

create_step = CreateModelStep(
    name="CreateModel",
    model=sklearn_model,
    inputs=create_inputs,
)

### Define a Condition Step to Check Accuracy and Conditionally Register Model

In [88]:
min_recall_value = ParameterFloat(name="MinRecallValue", default_value=0.80)

In [89]:
minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="recall",
    ),
    right=min_recall_value,  # recall
)

minimum_accuracy_condition_step = ConditionStep(
    name="AccuracyCondition",
    conditions=[minimum_accuracy_condition],
    if_steps=[register_step, create_step],  # success, continue with model registration
    else_steps=[],  # fail, end the pipeline
)

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


### Define a Pipeline of Parameters, Steps, and Conditions

In [90]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        processing_instance_count,
        processing_instance_type,
        train_split_percentage,
        validation_split_percentage,
        test_split_percentage,
        feature_store_offline_prefix,
        feature_group_name,
        train_instance_type,
        train_instance_count,
        max_depth,
        learning_rate,
        n_estimators,
        min_child_weight,
        subsample,
        colsample_bytree,
        min_recall_value,
        model_approval_status,
        deploy_instance_type,
        deploy_instance_count,
    ],
    steps=[processing_step, training_step, evaluation_step, minimum_accuracy_condition_step],
    sagemaker_session=sagemaker_session,
)

In [91]:
definition = json.loads(pipeline.definition())

pprint(definition, width=200)



Using provided s3_resource




Using provided s3_resource




{'Metadata': {},
 'Parameters': [{'DefaultValue': 's3://sagemaker-us-east-1-863397112005/data/online_fraud_dataset.csv', 'Name': 'InputData', 'Type': 'String'},
                {'DefaultValue': 1, 'Name': 'ProcessingInstanceCount', 'Type': 'Integer'},
                {'DefaultValue': 'ml.m5.large', 'Name': 'ProcessingInstanceType', 'Type': 'String'},
                {'DefaultValue': 0.7, 'Name': 'TrainSplitPercentage', 'Type': 'Float'},
                {'DefaultValue': 0.15, 'Name': 'ValidationSplitPercentage', 'Type': 'Float'},
                {'DefaultValue': 0.15, 'Name': 'TestSplitPercentage', 'Type': 'Float'},
                {'DefaultValue': 'fraud-feature-store-1700570556', 'Name': 'FeatureStoreOfflinePrefix', 'Type': 'String'},
                {'DefaultValue': 'reviews-feature-group-1700570556', 'Name': 'FeatureGroupName', 'Type': 'String'},
                {'DefaultValue': 'ml.c5.2xlarge', 'Name': 'TrainInstanceType', 'Type': 'String'},
                {'DefaultValue': 1, 'Nam

In [92]:
pipeline.upsert(role_arn=role)



Using provided s3_resource




Using provided s3_resource




{'PipelineArn': 'arn:aws:sagemaker:us-east-1:863397112005:pipeline/fraud-pipeline-1700570555',
 'ResponseMetadata': {'RequestId': 'c6711932-e614-4816-a143-02924750952b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'c6711932-e614-4816-a143-02924750952b',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '93',
   'date': 'Tue, 21 Nov 2023 12:42:59 GMT'},
  'RetryAttempts': 0}}

In [93]:
execution = pipeline.start(
    parameters=dict(
        InputData=f'{raw_input_data_s3_uri}online_fraud_dataset.csv',
        ProcessingInstanceCount=1,
        ProcessingInstanceType="ml.t3.xlarge",
        TrainSplitPercentage=0.70,
        ValidationSplitPercentage=0.10,
        TestSplitPercentage=0.20,
        FeatureStoreOfflinePrefix="fraud-feature-store-" + str(timestamp),
        FeatureGroupName="fraud-feature-group-" + str(timestamp),
        TrainInstanceType="ml.c5.2xlarge",
        TrainInstanceCount=1,
        MinRecallValue=0.99,
        ModelApprovalStatus="PendingManualApproval",
        DeployInstanceType="ml.m5.xlarge",
        DeployInstanceCount=1,
        MaxDepth=7,
        LearningRate=0.16466967895118775,
        NEstimators=119,
        MinChildWeight=6,
        Subsample=0.9758503388962246,
        ColsampleByTree=0.5146998846523689,
    )
)

In [94]:
execution_run = execution.describe()
pprint(execution_run)

{'CreatedBy': {},
 'CreationTime': datetime.datetime(2023, 11, 21, 15, 43, 1, 130000, tzinfo=tzlocal()),
 'LastModifiedBy': {},
 'LastModifiedTime': datetime.datetime(2023, 11, 21, 15, 43, 1, 130000, tzinfo=tzlocal()),
 'PipelineArn': 'arn:aws:sagemaker:us-east-1:863397112005:pipeline/fraud-pipeline-1700570555',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:863397112005:pipeline/fraud-pipeline-1700570555/execution/whn6xqt9rinf',
 'PipelineExecutionDisplayName': 'execution-1700570581192',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'fraud-pipeline-1700570555',
                              'TrialName': 'whn6xqt9rinf'},
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '514',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Tue, 21 Nov 2023 12:43:00 GMT',
                                      'x-amzn-requestid': 'a269ddf5-5bfd-4352-afb3-aa1808ca21d1

In [95]:
# Giving the first step time to start up
time.sleep(30)

execution.list_steps()

[{'StepName': 'Processing',
  'StartTime': datetime.datetime(2023, 11, 21, 15, 43, 2, 344000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'AttemptCount': 1,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:863397112005:processing-job/pipelines-whn6xqt9rinf-Processing-neo42d8niy'}}}]

In [96]:
%%time
executions_response = sagemaker_client.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

while pipeline_execution_status == "Executing":
    try:
        executions_response = sagemaker_client.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
        pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
    except Exception as e:
        print("Please wait...")
        time.sleep(30)

pprint(executions_response)

Executing
[{'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:863397112005:pipeline/fraud-pipeline-1700570555/execution/whn6xqt9rinf',
  'PipelineExecutionDisplayName': 'execution-1700570581192',
  'PipelineExecutionStatus': 'Succeeded',
  'StartTime': datetime.datetime(2023, 11, 21, 15, 43, 1, 130000, tzinfo=tzlocal())}]
CPU times: user 18.6 s, sys: 2 s, total: 20.6 s
Wall time: 21min 35s


In [97]:
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

Succeeded
