# Assignment 2: use SageMaker processing and training jobs
In this assignment you move your data processing, feature enginering, and model training code to SageMaker jobs.

The following diagram shows an anatomy of a SageMaker container:

![](../img/container-anatomy.png)

Refer to the notebook [`02-sagemaker-containers.ipynb`](../02-sagemaker-containers.ipynb) for code snippets and a general guidance for the exercises in this assignment.

## Import packages

In [2]:
import time
import boto3
import botocore
import numpy as np  
import pandas as pd  
import sagemaker
from time import gmtime, strftime, sleep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sklearn.metrics import roc_auc_score
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker

sagemaker.__version__

'2.165.0'

In [3]:
session = sagemaker.Session()
sm = session.sagemaker_client

## [Optional] Load an existing or create a new experiment
Load an existing or create a new experiment to track parameters, metrics, and artifacts in this notebook.

In [4]:
# Load experiment based on the a name
# experiment = Experiment.load("mohammadexperiment", sagemaker_boto_client=sm)

In [5]:
# Alternatively, create a new experiment
experiment_name = f"from-idea-to-prod-experiment-{strftime('%d-%H-%M-%S', gmtime())}"
experiment = Experiment.create(
   experiment_name=experiment_name,
   description="Direct marketing binary classification",
   sagemaker_boto_client=sm,
)

## Excercise 1: Process data
- Use SageMaker session object to [upload](https://sagemaker.readthedocs.io/en/stable/api/utility/session.html#sagemaker.session.Session.upload_data)  the dataset to an Amazon S3 bucket. Use a SageMaker [default bucket](https://sagemaker.readthedocs.io/en/stable/api/utility/session.html#sagemaker.session.Session.default_bucket)
- Move data processing code from the previous notebook to a Python executable script. You can pass any parameters to your script to parametrize the data processing
- Set the Amazon S3 paths for the output datasets
- Use [SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/overview.html) [`SKLearnProcessor`](https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#sagemaker.sklearn.processing.SKLearnProcessor) class to setup a processing job. 
- Configure processing job's [inputs](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ProcessingInput) and [outputs](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ProcessingOutput) to point the processing job to Amazon S3 locations
- [Run](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ScriptProcessor.run) the processing job

### Python SDK processor classes
Use the most suitable class to implement a processor for your use case:
    
![](../img/python-sdk-processors.png)

In [6]:
session = sagemaker.Session()

In [7]:
bucket_name = sagemaker.Session().default_bucket()
bucket_prefix = "from-idea-to-prod/xgboost"  

In [8]:
sm_role = sagemaker.get_execution_role()
region = session.boto_session.region_name
sm_client = boto3.client("sagemaker",region_name = region)

In [9]:
# Write data upload code
# S3 key to the full dataset
input_s3_url = session.upload_data(
    path="data/bank-additional/bank-additional-full.csv",
    bucket=bucket_name,
    key_prefix=f"{bucket_prefix}/input"
)


In [10]:
%%writefile preprocessing_assignment.py

# Write executable data processing code here
import pandas as pd
import numpy as np
import argparse
import os

def _parse_args():
    
    parser = argparse.ArgumentParser()
    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument('--filepath', type=str, default='/opt/ml/processing/input/')
    parser.add_argument('--filename', type=str, default='bank-additional-full.csv')
    parser.add_argument('--outputpath', type=str, default='/opt/ml/processing/output/')
    
    return parser.parse_known_args()


if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    
    print("Data processing and feature engineering start")
    df_data = pd.read_csv(os.path.join(args.filepath, args.filename), sep=";")
    df_data["no_previous_contact"] = np.where(df_data["pdays"] == 999, 1, 0)
    df_data["not_working"] = np.where(np.in1d(df_data["job"], ["student", "retired", "unemployed"]), 1, 0)
    df_model_data = df_data.drop(["duration", "emp.var.rate", "cons.price.idx", "cons.conf.idx", "euribor3m", "nr.employed"],axis=1,)
    df_model_data = pd.get_dummies(df_model_data)
    target_col = "y"
    df_model_data = pd.concat(
    [
        df_model_data["y_yes"].rename(target_col),
        df_model_data.drop(["y_no", "y_yes"], axis=1),
    ],
    axis=1,
    )
    train_data, validation_data, test_data = np.split(df_model_data.sample(frac = 1, random_state=1234),[int(0.7 * len(df_model_data)),int(0.9*len(df_model_data))],)
    # Save the datasets (train, validation, test, baseline) locally
    train_data.to_csv(os.path.join(args.outputpath, 'train/train.csv'), index=False, header=False)
    validation_data.to_csv(os.path.join(args.outputpath, 'validation/validation.csv'), index=False, header=False)
    test_data[target_col].to_csv(os.path.join(args.outputpath, 'test/test_y.csv'), index=False, header=False)
    test_data.drop([target_col], axis=1).to_csv(os.path.join(args.outputpath, 'test/test_x.csv'), index=False, header=False)
    print("## Processing complete. Exiting.")

Overwriting preprocessing_assignment.py


In [11]:
# Set the Amazon S3 paths for the output datasets
train_s3_url = f"s3://{bucket_name}/{bucket_prefix}/train"
validation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/validation"
test_s3_url = f"s3://{bucket_name}/{bucket_prefix}/test"
baseline_s3_url = f"s3://{bucket_name}/{bucket_prefix}/baseline"


### [Optional] Create a trail
If you use an experiment, you must create a trial to capture processing and training output from this notebook. 

Use [`Trial`](https://sagemaker-experiments.readthedocs.io/en/latest/trial.html) class to interact with trials and the [`Tracker`](https://sagemaker-experiments.readthedocs.io/en/latest/tracker.html) class to record information to a trial component. 

SageMaker processing and training jobs automatically handle trial components and save metrics, parameters, metadata, and artifacts in the trial components if you provide an `experiment_config` in `Processor.run()` or `Estimator.fit()` calls.

In [12]:
# trial = experiment.create_trial(trial_name_prefix="Container-training")

In [13]:
# with Tracker.create(display_name="Preprocessing-split", sagemaker_boto_client=sm) as tracker:
#    tracker.log_parameters()
#    tracker.log_input()

In [14]:
# Create experiment config to use in the processing and training jobs
#experiment_config = {
#    "ExperimentName": experiment.experiment_name,
#    "TrialName": trial.trial_name,
#    "TrialComponentDisplayName": "Preprocessing",
#}

### Create a processor

In [15]:
# Create SKLearnProcessor
framework_version = "0.23-1"
processing_instance_type = "ml.m5.large"
processing_instance_count = 1

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    role=sm_role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count, 
    base_job_name='from-idea-to-prod-processing',
    sagemaker_session=session,
)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [16]:
# Define procesing inputs and outputs
processing_inputs = [ProcessingInput(
            source=input_s3_url, 
            destination="/opt/ml/processing/input",
            s3_input_mode="File",
            s3_data_distribution_type="ShardedByS3Key"
        )] # use input_s3_url as pointer to the full dataset

processing_outputs = [ProcessingOutput(
            output_name="train_data", 
            source="/opt/ml/processing/output/train",
            destination=train_s3_url,
        ),
        ProcessingOutput(
            output_name="validation_data", 
            source="/opt/ml/processing/output/validation", 
            destination=validation_s3_url
        ),
        ProcessingOutput(
            output_name="test_data", 
            source="/opt/ml/processing/output/test", 
            destination=test_s3_url
        ),
        ProcessingOutput(
            output_name="baseline_data", 
            source="/opt/ml/processing/output/baseline", 
            destination=baseline_s3_url
        ),] 
# map local directories in the processing container to Amazon S3 locations

In [17]:
# Start the processing job, pass an experiment_config parameter if you use experiments
sklearn_processor.run(
        inputs=processing_inputs,
        outputs=processing_outputs,
        code='preprocessing_assignment.py',
        wait=False,
        # experiment_config=experiment_config,
    )

INFO:sagemaker:Creating processing-job with name from-idea-to-prod-processing-2023-06-18-06-44-05-259


In [18]:
!aws s3 ls {bucket_name}/{bucket_prefix} --recursive

2023-06-18 06:44:05    5834924 from-idea-to-prod/xgboost/input/bank-additional-full.csv
2023-06-18 06:30:53          0 from-idea-to-prod/xgboost/output/from-idea-to-prod-training-2023-06-18-06-28-40-237/debug-output/claim.smd
2023-06-18 06:30:53       6274 from-idea-to-prod/xgboost/output/from-idea-to-prod-training-2023-06-18-06-28-40-237/debug-output/collections/000000000/worker_0_collections.json
2023-06-18 06:30:53        214 from-idea-to-prod/xgboost/output/from-idea-to-prod-training-2023-06-18-06-28-40-237/debug-output/events/000000000000/000000000000_worker_0.tfevents
2023-06-18 06:30:53        220 from-idea-to-prod/xgboost/output/from-idea-to-prod-training-2023-06-18-06-28-40-237/debug-output/events/000000000010/000000000010_worker_0.tfevents
2023-06-18 06:30:53        220 from-idea-to-prod/xgboost/output/from-idea-to-prod-training-2023-06-18-06-28-40-237/debug-output/events/000000000020/000000000020_worker_0.tfevents
2023-06-18 06:30:53        220 from-idea-to-prod/xgboost/outp

## Exercise 2: Model training
- Get a container image URI for the used built-in SageMaker ML algorithm using SageMaker SDK [helper](https://sagemaker.readthedocs.io/en/stable/api/utility/image_uris.html#sagemaker.image_uris.retrieve)
- Configure data [input channels](https://sagemaker.readthedocs.io/en/stable/api/utility/inputs.html#sagemaker.inputs.TrainingInput) for the training job
- Use [`Estimator`](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Estimator) class to setup a training job
- Set [hyperparameters](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Estimator.set_hyperparameters)
- [Run](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.EstimatorBase.fit) the training job

In [19]:
# Write code to retrieve a container image URI
training_image = sagemaker.image_uris.retrieve("xgboost", region=region, version="1.5-1")
print(training_image)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.5-1


In [20]:
# Set the input data channels
s3_input_train = sagemaker.inputs.TrainingInput(train_s3_url, content_type='csv')
s3_input_validation = sagemaker.inputs.TrainingInput(validation_s3_url, content_type='csv')

In [21]:
# Set an Amazon S3 path for a model artifact
output_s3_url = f"s3://{bucket_name}/{bucket_prefix}/output"

### Python SDK estimator classes
SageMaker Python SDK contains corresponding [`EstimatorBase`](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.EstimatorBase)-derived classes to access each of the built-in algorithms. You can extend [`Framework`](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Framework) class to implement a training with a custom framework.

![](../img/python-sdk-estimators.png)

In [22]:
# Create an estimator
train_instance_count = 1
train_instance_type = "ml.m5.xlarge"

estimator = sagemaker.estimator.Estimator(
    image_uri=training_image, 
    instance_type=train_instance_type,  
    instance_count=train_instance_count,  
    role=sm_role,  
    max_run=20 * 60, 
    output_path=output_s3_url, 
    sagemaker_session=session, 
    base_job_name="from-idea-to-prod-training",
)

In [23]:
# Set hyperparameters for the estimator algorithm
estimator.set_hyperparameters(
    num_round=150, 
    max_depth=3, 
    eta=0.5, 
    alpha=2.5, 
    objective="binary:logistic",
    eval_metric="auc", 
    subsample=0.8, 
    colsample_bytree=0.8,
    min_child_weight=3, 
    early_stopping_rounds=10, 
    verbosity=1, 
)

In [24]:
# Set the training inputs
training_inputs =  {'train': s3_input_train, 'validation': s3_input_validation}

In [25]:
# Run the training job, optionally use an experiment_config parameter
estimator.fit(training_inputs)

INFO:sagemaker:Creating training-job with name: from-idea-to-prod-training-2023-06-18-06-44-07-007


2023-06-18 06:44:07 Starting - Starting the training job...
2023-06-18 06:44:22 Starting - Preparing the instances for training......
2023-06-18 06:45:28 Downloading - Downloading input data...
2023-06-18 06:46:03 Training - Downloading the training image...
2023-06-18 06:46:34 Uploading - Uploading generated training model.[34m[2023-06-18 06:46:30.294 ip-10-0-67-208.ec2.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2023-06-18 06:46:30.367 ip-10-0-67-208.ec2.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2023-06-18:06:46:30:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2023-06-18:06:46:30:INFO] Failed to parse hyperparameter eval_metric value auc to Json.[0m
[34mReturning the value itself[0m
[34m[2023-06-18:06:46:30:INFO] Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34m[2023-06-18:06:46:30:INFO] No GPUs detected (normal i

Wait until the training job is done.

In [26]:
# Describe the training job
training_job_name = estimator._current_job_name
boto3.client("sagemaker", region_name=region).describe_training_job(TrainingJobName=training_job_name)

{'TrainingJobName': 'from-idea-to-prod-training-2023-06-18-06-44-07-007',
 'TrainingJobArn': 'arn:aws:sagemaker:us-east-1:948851557155:training-job/from-idea-to-prod-training-2023-06-18-06-44-07-007',
 'ModelArtifacts': {'S3ModelArtifacts': 's3://sagemaker-us-east-1-948851557155/from-idea-to-prod/xgboost/output/from-idea-to-prod-training-2023-06-18-06-44-07-007/output/model.tar.gz'},
 'TrainingJobStatus': 'Completed',
 'SecondaryStatus': 'Completed',
 'HyperParameters': {'alpha': '2.5',
  'colsample_bytree': '0.8',
  'early_stopping_rounds': '10',
  'eta': '0.5',
  'eval_metric': 'auc',
  'max_depth': '3',
  'min_child_weight': '3',
  'num_round': '150',
  'objective': 'binary:logistic',
  'subsample': '0.8',
  'verbosity': '1'},
 'AlgorithmSpecification': {'TrainingImage': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.5-1',
  'TrainingInputMode': 'File',
  'MetricDefinitions': [{'Name': 'train:mae',
    'Regex': '.*\\[[0-9]+\\].*#011train-mae:([-+]?[0-9]*\\.?[0-9]+

In [27]:
metrics = None
while not metrics:
    metrics = sm.describe_training_job(
        TrainingJobName=training_job_name
        ).get("FinalMetricDataList")

    if not metrics:
        print(f"Training job {training_job_name} hasn't finished yet!")
        time.sleep(10)
    
train_auc = float([m['Value'] for m in metrics if m['MetricName'] == 'train:auc'][0])
validate_auc = float([m['Value'] for m in metrics if m['MetricName'] == 'validation:auc'][0])

print(f"Train-auc:{train_auc:.2f}, Validate-auc:{validate_auc:.2f}")


Train-auc:0.80, Validate-auc:0.77


## Exercise 3: Validate model
To validate the model, you use the model artifact from the training job to run predictions on the test dataset. You can either create a [real-time inference endpoint](https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints.html) or create a [batch transform](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html).

### Option 1: Real-time inference
- Use [Estimator.deploy](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.EstimatorBase.deploy) function to provision a real-time inference endpoint
- Load test dataset
- Send the test dataset to the endpoint. Use [Predictor.predict](https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html#sagemaker.predictor.Predictor.predict) function
- Evaluate the predictions

In [29]:
# Create a predictor
# Remember, the training job saved the test dataset in the specified S3 location

predictor = estimator.deploy(initial_instance_count=1,
        instance_type="ml.m5.large", wait=False)

INFO:sagemaker:Creating model with name: from-idea-to-prod-training-2023-06-18-06-47-38-003
INFO:sagemaker:Creating endpoint-config with name from-idea-to-prod-training-2023-06-18-06-47-38-003
INFO:sagemaker:Creating endpoint with name from-idea-to-prod-training-2023-06-18-06-47-38-003


In [30]:
# Load the test dataset
test_x = pd.read_csv("tmp/test_x.csv", names=[f'{i}' for i in range(59)])
test_y = pd.read_csv("tmp/test_y.csv", names=['y'])

FileNotFoundError: [Errno 2] No such file or directory: 'tmp/test_x.csv'

In [None]:
# Predict
predictions = np.array(predictor.predict(test_x.values), dtype=float).squeeze()
predictions

In [28]:
# Evaluate predictions
# Compare the predicted label to ground truth label
test_results = pd.concat(
    [
        pd.Series(predictions, name="y_pred", index=test_x.index),
        test_x,
    ],
    axis=1,
)
pd.crosstab(
    index=test_y['y'].values,
    columns=np.round(predictions), 
    rownames=['actuals'], 
    colnames=['predictions']
)
test_auc = roc_auc_score(test_y, test_results["y_pred"])
print(f"Test-auc: {test_auc:.2f}")

NameError: name 'predictions' is not defined

### Option 2: Batch transform
For an asynchronous inference you can use a SageMaker [transform job](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html).
- Use [Estimator.tranformer](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.EstimatorBase.transformer) function to create a transformer
- [Run](https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html#sagemaker.transformer.Transformer.transform) a tranform job
- Download the dataset from an S3 output location
- Evalute the predictions

In [None]:
# Create a transformer
# transformer = estimator.transformer()

In [None]:
# Run a transform, use an experiment_config parameter
# transformer.transform()

Wait until the transform job is done.

The transformer outputs the prediction probabilities and stores them as a `csv` file in the specified S3 location. The S3 path is stored in `transformer.output_path`. To compare the predictions with the ground truth labels, you must download the dataset from S3 and load it into a Pandas DataFrame.

In [None]:
# Download the predictions and the ground truth labels from S3


In [None]:
# Load the output dataset and the ground truth label
# predictions = pd.read_csv()
# test_y = pd.read_csv()

In [None]:
# Show the confusion matrix
# pd.crosstab()


In [None]:
# Calculate AUC
# test_auc = roc_auc_score(test_y, predictions)
#  print(f"Test-auc: {test_auc:.2f}")

### [Optional] build ROC and precision-recall curves
You can create various charts using [`sklearn.metrics`](https://scikit-learn.org/stable/modules/model_evaluation.html) package.

### [Optional] Save charts to the trial component
You can use the Tracker class to save various charts to a trial component of the trial of your experiment.

A Jupyter notebook trick: Press `Ctrl` + `/` to comment or uncomment all selected lines in the sell.

In [None]:
# Find a trial component name of the current trial based on the display name
#batch_transform_trail_component = [
#    tc for tc in trial.list_trial_components() 
#    if tc.display_name == <DISPLAY NAME OF THE TRIAL COMPONENT>][0]

In [None]:
# Add charts
# with Tracker.load(
#    trial_component_name=batch_transform_trail_component.trial_component_name,
#    sagemaker_boto_client=sm
# ) as tracker:
#    tracker.log_precision_recall()
#    tracker.log_confusion_matrix()
#    tracker.log_roc_curve()

### [Optional] Explore experiments, trials, and trial components in Studio
In **SageMaker resources** select **Experiment and trials**, choose **Open in trial component list** from the context menu:

<img src="../img/experiment-and-trials-context-menu.png" width="400"/>

## Exercise 4: [Optional] hyperparameter optimization (HPO)
- Use [HyperparameterTuner](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html#sagemaker.tuner.HyperparameterTuner) to run a HPO job
- Specify hyperparameters ranges and tuning strategy
- [Run](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html#sagemaker.tuner.HyperparameterTuner.fit) the tuning job
- Compare performance of the tuned and non-tuned models

In [None]:
# import required HPO objects
from sagemaker.tuner import (
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
    IntegerParameter,
)

In [None]:
# set up hyperparameter ranges
# hp_ranges = {}


In [None]:
# set up the objective metric
objective = "validation:auc"

In [None]:
# instantiate a HPO object
# tuner = HyperparameterTuner()

In [None]:
# evaluate performance

## Clean-up
Remove all real-time endpoints you created

In [None]:
# predictor.delete_endpoint(delete_endpoint_config=True)


In [None]:
# run if you created a tuned predictor after HPO
# hpo_predictor.delete_endpoint(delete_endpoint_config=True)


## Continue with the assignment 3
Navigate to the [assignment 3](03-assignment-sagemaker-pipeline.ipynb) notebook.