# Iris Training and Prediction with Sagemaker Scikit-learn
This tutorial shows you how to use [Scikit-learn](https://scikit-learn.org/stable/) with SageMaker by utilizing the pre-built container. Scikit-learn is a popular Python machine learning framework. It includes a number of different algorithms for classification, regression, clustering, dimensionality reduction, and data/feature pre-processing. 

The [SageMaker boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html) is a low-level client representing Amazon SageMaker Service. It provides APIs for creating and managing SageMaker resources. We will show how to train a model on the Iris dataset and generating a set of predictions. For more information about the Scikit-learn container, see the [sagemaker-scikit-learn-containers](https://github.com/aws/sagemaker-scikit-learn-container) repository and the [sagemaker-python-sdk](https://github.com/aws/sagemaker-python-sdk) repository.

## Runtime

This notebook takes approximately 15 minutes to run.

## Contents
* [Upload the data for training](#upload_data)
* [Create a Scikit-learn script to train with](#create_sklearn_script)
* [Create the SageMaker Scikit Training Job](#create_sklearn_estimator)
* [Train the SKLearn Estimator on the Iris data](#train_sklearn)
* [Use the trained model to make inference requests](#inference)
 * [Deploy the model](#deploy)
 * [Choose some data and use it for a prediction](#prediction_request)
 * [Endpoint cleanup](#endpoint_cleanup)
* [Batch Transform](#batch_transform)
 * [Prepare Input Data](#prepare_input_data)
 * [Run Transform Job](#run_transform_job)
 * [Check Output Data](#check_output_data)

In [None]:
%pip install -U sagemaker
%pip install tqdm

First, let's create our Sagemaker session and role, and create a S3 prefix to use for the notebook example.

In [None]:
import boto3
import numpy as np
import pandas as pd
import os
import datetime
from pathlib import Path
import utils
import re

import sagemaker
from sagemaker import get_execution_role
from sagemaker.image_uris import retrieve
from sagemaker.s3 import S3Uploader, s3_path_join

import itertools
import pandas as pd
import glob

# S3 prefix
prefix = "sagemaker/DEMO-scikit-iris"

sagemaker_session = sagemaker.Session()
role = get_execution_role()
sm_client = boto3.client('sagemaker')
bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name

## Upload the data for training <a class="anchor" id="upload_data"></a>

When training large models with huge amounts of data, you may use big data tools like Amazon Athena, AWS Glue, or Amazon EMR to process your data backed by S3. For the purposes of this example, we're using a sample of the classic [Iris dataset](https://archive.ics.uci.edu/ml/datasets/iris). We load the dataset, write it locally, then upload it to S3.

In [None]:
os.makedirs("./data", exist_ok=True)

s3_client = boto3.client("s3")
s3_client.download_file(
    f"sagemaker-sample-files", "datasets/tabular/iris/iris.data", "./data/iris.csv"
)

df_iris = pd.read_csv("./data/iris.csv", header=None)
df_iris[4] = df_iris[4].map({"Iris-setosa": 0, "Iris-versicolor": 1, "Iris-virginica": 2})
iris = df_iris[[4, 0, 1, 2, 3]].to_numpy()
np.savetxt("./data/iris.csv", iris, delimiter=",", fmt="%1.1f, %1.3f, %1.3f, %1.3f, %1.3f")

Once we have the data locally, we can use use the tools provided by the SageMaker Python SDK to upload the data to a default bucket. 

In [None]:
WORK_DIRECTORY = "data"

train_input = sagemaker_session.upload_data(
    WORK_DIRECTORY, key_prefix="{}/{}".format(prefix, WORK_DIRECTORY)
)

## Create a Scikit-learn script for training <a class="anchor" id="create_sklearn_script"></a>
SageMaker can run a scikit-learn script using the `SKLearn` estimator. When run on SageMaker, a number of helpful environment variables are available to access properties of the training environment, such as:

* `SM_MODEL_DIR`: A string representing the path to the directory to write model artifacts to. Any artifacts saved in this folder are uploaded to S3 for model hosting after the training job completes.
* `SM_OUTPUT_DIR`: A string representing the file system path to write output artifacts to. Output artifacts may include checkpoints, graphs, and other files to save, not including model artifacts. These artifacts are compressed and uploaded to S3 to the same S3 prefix as the model artifacts.

Supposing two input channels, 'train' and 'test', were used in the call to the `SKLearn` estimator's `fit()` method, the following environment variables are set, following the format `SM_CHANNEL_[channel_name]`:

* `SM_CHANNEL_TRAIN`: A string representing the path to the directory containing data in the 'train' channel.
* `SM_CHANNEL_TEST`: Same as above, but for the 'test' channel.

A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to the `model_dir` so that it can be hosted later. Hyperparameters are passed to your script as arguments and can be retrieved with an `argparse.ArgumentParser` instance. For example, the script that we run in this notebook is below:

```python
from __future__ import print_function

import argparse
import joblib
import os
import pandas as pd

from sklearn import tree


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # Hyperparameters are described here. In this simple example we are just including one hyperparameter.
    parser.add_argument('--max_leaf_nodes', type=int, default=-1)

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])

    args = parser.parse_args()

    # Take the set of files and read them all into a single pandas dataframe
    input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))
    raw_data = [ pd.read_csv(file, header=None, engine="python") for file in input_files ]
    train_data = pd.concat(raw_data)

    # labels are in the first column
    train_y = train_data.iloc[:, 0]
    train_X = train_data.iloc[:, 1:]

    # Here we support a single hyperparameter, 'max_leaf_nodes'. Note that you can add as many
    # as your training my require in the ArgumentParser above.
    max_leaf_nodes = args.max_leaf_nodes

    # Now use scikit-learn's decision tree classifier to train the model.
    clf = tree.DecisionTreeClassifier(max_leaf_nodes=max_leaf_nodes)
    clf = clf.fit(train_X, train_y)

    # Print the coefficients of the trained classifier, and save the coefficients
    joblib.dump(clf, os.path.join(args.model_dir, "model.joblib"))


def model_fn(model_dir):
    """Deserialized and return fitted model
    
    Note that this should have the same name as the serialized model in the main method
    """
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf
```

Because the Scikit-learn container imports your training script, you should always put your training code in a main guard `(if __name__=='__main__':)` so that the container does not inadvertently run your training code at the wrong point in execution.

For more information about training environment variables, please visit https://github.com/aws/sagemaker-containers.

## Create a SageMaker SKLearn Training Job <a class="anchor" id="create_sklearn_estimator"></a>

To run our Scikit-learn training script on SageMaker, we submit a SageMaker training job using boto3 api. [create_training_job()](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_training_job) and provide necessary parameters required by the training job

In [None]:
image = retrieve(framework="sklearn", region=boto3.Session().region_name, version="0.23-1")
image

In [None]:
tar_file = "sourcedir.tar.gz"
utils.create_tar(tar_file, Path("code"))

In [None]:
sagemaker_job_name = f"sklearn-demo-boto3-{datetime.datetime.now():%Y-%m-%d-%H-%M-%S}"

sourcedir_path = s3_path_join("s3://", bucket, f"{prefix}/{sagemaker_job_name}/source_code")
print(f"Uploading source code to {sourcedir_path}")
sourcedir_uri = S3Uploader.upload("sourcedir.tar.gz", sourcedir_path)
print(f"Uploaded roberta model to {model_roberta_uri}")

In [None]:
create_training_params = {
    "TrainingJobName": sagemaker_job_name,
    "AlgorithmSpecification": {
        "TrainingImage": image,
        "TrainingInputMode": "File"
    },
    "RoleArn": role,
    "InputDataConfig": [
        {
            "ChannelName": "train",
            "DataSource": {
                "S3DataSource": {
                    "S3DataType": "S3Prefix",
                    "S3Uri": "s3://sagemaker-us-east-1-631450739534/sagemaker/DEMO-scikit-iris-sdk/data",
                    "S3DataDistributionType": "FullyReplicated"
                }
            },
        }
    ],
    "OutputDataConfig": {
        "S3OutputPath": "s3://sagemaker-us-east-1-631450739534/",
    },
    "ResourceConfig": {
        "InstanceType": "ml.m5.xlarge",
        "InstanceCount": 1,
        "VolumeSizeInGB": 10
    },
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 3600,
        'MaxWaitTimeInSeconds': 3600
    },
    "EnableManagedSpotTraining": True,
    "Environment": {
        "USE_SMDEBUG": "0",
    },
    "HyperParameters": {
        "max_leaf_nodes": "30",
        "sagemaker_program": "scikit_learn_iris.py",
        "sagemaker_submit_directory": sourcedir_uri,
    },
}

## Train a sklearn model on Iris data <a class="anchor" id="train_sklearn"></a>
Training is straightforward, just create a training job based on the parameters prepared above! This starts a SageMaker training job that downloads the data, invokes our scikit-learn code (in the provided script file), and saves any model artifacts that the script creates.

In [None]:
sm_client.create_training_job(**create_training_params)
status = sm_client.describe_training_job(TrainingJobName=sagemaker_job_name)["TrainingJobStatus"]
print(status)

In [None]:
try:
    sm_client.get_waiter("training_job_completed_or_stopped").wait(TrainingJobName=sagemaker_job_name)
finally:
    status = sm_client.describe_training_job(TrainingJobName=sagemaker_job_name)["TrainingJobStatus"]
    print("Training job ended with status: " + status)
    if status == "Failed":
        message = sm_client.describe_training_job(TrainingJobName=sagemaker_job_name)["FailureReason"]
        print("Training failed with the following error: {}".format(message))
        raise Exception("Training job failed")

## Use the trained model to make inference requests <a class="anchor" id="inference"></a>

### Let's create a model based on our training job <a class="anchor" id="deploy"></a>

The below cell creates a model in SageMaker based on the training job we just executed. The model can later be deployed using the SageMaker hosting services or in our case used in a Batch Transform job.

In [None]:
model_name = sagemaker_job_name
print(model_name)

info = sm_client.describe_training_job(TrainingJobName=sagemaker_job_name)
model_data = info["ModelArtifacts"]["S3ModelArtifacts"]

primary_container = {
    "Image": image,
    "ModelDataUrl": model_data,
    "Environment": {
        "SAGEMAKER_CONTAINER_LOG_LEVEL": "20",
        "SAGEMAKER_REGION": region,
        "SAGEMAKER_SUBMIT_DIRECTORY": sourcedir_uri,
        "SAGEMAKER_PROGRAM": "scikit_learn_iris.py"
    },
}

create_model_response = sm_client.create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer=primary_container,
)

print(create_model_response["ModelArn"])

## Batch Transform <a class="anchor" id="batch_transform"></a>
In SageMaker Batch Transform, we introduced a new attribute called DataProcessing.In the below cell, we use the [Boto3 SDK](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_transform_job) to kick-off several Batch Transform jobs using different configurations of DataProcessing. Please refer to [Associate Prediction Results with Input Records](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform-data-processing.html) to learn more about how to utilize the **DataProcessing** attribute.

### Prepare Input Data <a class="anchor" id="prepare_input_data"></a>
We extract 10 random samples of 100 rows from the training data, split the features (X) from the labels (Y), and upload the input data to a given location in S3.

In [None]:
%%bash
# Randomly sample the iris dataset 10 times, then split X and Y
mkdir -p batch_data/XY batch_data/X batch_data/Y
for i in {0..9}; do
    cat data/iris.csv | shuf -n 100 > batch_data/XY/iris_sample_${i}.csv
    cat batch_data/XY/iris_sample_${i}.csv | cut -d',' -f2- > batch_data/X/iris_sample_X_${i}.csv
    cat batch_data/XY/iris_sample_${i}.csv | cut -d',' -f1 > batch_data/Y/iris_sample_Y_${i}.csv
done

In [None]:
# Upload input data from local file system to S3
batch_input_s3_noID = sagemaker_session.upload_data("batch_data/X", key_prefix=f"{prefix}/batch_input_noID")
batch_input_s3_withID = sagemaker_session.upload_data("batch_data/XY", key_prefix=f"{prefix}/batch_input_withID")

### 1. Without data processing
Let's first set the data processing fields to null and inspect the inference results. We'll use it as a baseline to compare to the results with data processing.

In [None]:
batch_job_name_noID = f"Batch-Transform-noID-{datetime.datetime.now():%Y-%m-%d-%H-%M-%S}"
input_location = batch_input_s3_noID # use input data without ID column
output_location = f"s3://{bucket}/{prefix}/output/{batch_job_name_noID}"

request = {
    "TransformJobName": batch_job_name_noID,
    "ModelName": sagemaker_job_name,
    "TransformOutput": {
        "S3OutputPath": output_location,
        "Accept": "text/csv",
        "AssembleWith": "Line",
    },
    "TransformInput": {
        "DataSource": {"S3DataSource": {"S3DataType": "S3Prefix", "S3Uri": input_location}},
        "ContentType": "text/csv",
        "SplitType": "Line",
        "CompressionType": "None",
    },
    "TransformResources": {"InstanceType": "ml.m4.xlarge", "InstanceCount": 1},
}

sm_client.create_transform_job(**request)
print("Created Transform job with name: ", batch_job_name_noID)

# Wait until the job finishes
try:
    sm_client.get_waiter("transform_job_completed_or_stopped").wait(TransformJobName=batch_job_name_noID)
finally:
    response = sm_client.describe_transform_job(TransformJobName=batch_job_name_noID)
    status = response["TransformJobStatus"]
    print(f"Transform job ended with status: {status}")
    if status == "Failed":
        message = response["FailureReason"]
        print(f"Transform failed with the following error: {message}")
        raise Exception("Transform job failed")

Let's inspect the output of the Batch Transform job in S3. It should show the list probabilities of the input features.

### Check Output Data  <a class="anchor" id="check_output_data"></a>
After the transform job has completed, download the output data from S3. For each file "f" in the input data, we have a corresponding file "f.out" containing the predicted labels from each input row. We can compare the predicted labels to the true labels saved earlier.

In [None]:
# Download the output data from S3 to local file system
response = sm_client.describe_transform_job(TransformJobName=batch_job_name_noID)
batch_output = response["TransformOutput"]["S3OutputPath"]
!mkdir -p batch_data/output
!aws s3 cp --recursive $batch_output/ batch_data/output/
# Head to see what the batch output looks like
!head batch_data/output/*

In [None]:
%%bash
# For each sample file, compare the predicted labels from batch output to the true labels
for i in {1..9}; do
    diff -s batch_data/Y/iris_sample_Y_${i}.csv \
        <(cat batch_data/output/iris_sample_X_${i}.csv.out | sed 's/[["]//g' | sed 's/, \|]/\n/g') \
        | sed "s/\/dev\/fd\/63/batch_data\/output\/iris_sample_X_${i}.csv.out/"
done

### 2. Join the input and the prediction results
Now, let's use the new feature to associate the prediction results with their corresponding input records. We can also use the InputFilter to exclude the ID column easily and there's no need to have a separate file in S3.

- Set InputFilter to "\$[1:]": indicates that we are excluding column 0 (the 'ID') before processing the inferences and keeping everything from column 1 to the last column (all the features or predictors)
- Set JoinSource to "Input": indicates our desire to join the input data with the inference results
- Leave OutputFilter to default ("$"), indicating that the joined input and inference results be will saved as output.


In [None]:
batch_job_name_ID = f"Batch-Transform-withID-{datetime.datetime.now():%Y-%m-%d-%H-%M-%S}"
input_location = batch_input_s3_withID  # use input data with ID column cause InputFilter will filter it out
output_location = f"s3://{bucket}/{prefix}/output/{batch_job_name_ID}"

request["TransformJobName"] = batch_job_name_ID
request["TransformInput"]["DataSource"]["S3DataSource"]["S3Uri"] = input_location
request["TransformOutput"]["S3OutputPath"] = output_location

request["DataProcessing"] = {
    "InputFilter": "$[1:]",  # exclude the ID column (index 0)
    "JoinSource": "Input",  # join the input with the inference results
}

sm_client.create_transform_job(**request)
print("Created Transform job with name: ", batch_job_name_ID)

# Wait until the job finishes
try:
    sm_client.get_waiter("transform_job_completed_or_stopped").wait(TransformJobName=batch_job_name_ID)
finally:
    response = sm_client.describe_transform_job(TransformJobName=batch_job_name_ID)
    status = response["TransformJobStatus"]
    print("Transform job ended with status: " + status)
    if status == "Failed":
        message = response["FailureReason"]
        print("Transform failed with the following error: {}".format(message))
        raise Exception("Transform job failed")

In [None]:
# Download the output data from S3 to local file system
response = sm_client.describe_transform_job(TransformJobName=batch_job_name_ID)
batch_output = response["TransformOutput"]["S3OutputPath"]
!mkdir -p batch_data/output_ID
!aws s3 cp --recursive $batch_output/ batch_data/output_ID/
# Head to see what the batch output looks like
!head batch_data/output_ID/*

In [None]:
def load_data(file_list: list):
    # Concat input files with select columns
    dfs = []
    for file in file_list:
        df = pd.read_csv(file, header=None)
        dfs.append(df)
    return pd.concat(dfs, ignore_index=True, axis=0)

output_dir = 'batch_data/output_ID'
output_file_list = glob.glob(f"{output_dir}/*.out")
data_df = load_data(output_file_list)

assert data_df.iloc[:,0].values.tolist() == data_df.iloc[:,-1].values.tolist(), \
        "outputs are different from input"