# Inference Pipeline with Scikit-learn and Linear Learner
Typically a Machine Learning (ML) process consists of few steps: data gathering with various ETL jobs, pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm. 
In many cases, when the trained model is used for processing real time or batch prediction requests, the model receives data in a format which needs to pre-processed (e.g. featurized) before it can be passed to the algorithm. In the following notebook, we will demonstrate how you can build your ML Pipeline leveraging the Sagemaker Scikit-learn container and SageMaker Linear Learner algorithm & after the model is trained, deploy the Pipeline (Data preprocessing and Lineara Learner) as an Inference Pipeline behind a single Endpoint for real time inference and for batch inferences using Amazon SageMaker Batch Transform.

We will demonstrate this using the Abalone Dataset to guess the age of Abalone with physical features. The dataset is available from [UCI Machine Learning](https://archive.ics.uci.edu/ml/datasets/abalone); the aim for this task is to determine age of an Abalone (a kind of shellfish) from its physical measurements. We'll use Sagemaker's Scikit-learn container to featurize the dataset so that it can be used for training with Linear Learner.

### Table of contents
* [Preprocessing data and training the model](#training)
 * [Upload the data for training](#upload_data)
 * [Create a Scikit-learn script to train with](#create_sklearn_script)
 * [Create SageMaker Scikit Estimator](#create_sklearn_estimator)
 * [Batch transform our training data](#preprocess_train_data)
 * [Fit a LinearLearner Model with the preprocessed data](#training_model)
* [Inference Pipeline with Scikit preprocessor and Linear Learner](#inference_pipeline)
 * [Set up the inference pipeline](#pipeline_setup)
 * [Make a request to our pipeline endpoint](#pipeline_inference_request)
 * [Delete Endpoint](#delete_endpoint)

Let's first create our Sagemaker session and role, and create a S3 prefix to use for the notebook example.

In [1]:
import boto3, re, sys, math, json, os, sagemaker, urllib.request
from sagemaker import get_execution_role
import numpy as np                                
import pandas as pd                               
import matplotlib.pyplot as plt                   
from IPython.display import Image                 
from IPython.display import display               
from time import gmtime, strftime                 
from sagemaker.predictor import csv_serializer   
import io
import time
import json
import sagemaker.amazon.common as smac
%matplotlib inline

In [2]:
# S3 prefix
bucket_name = 'demo-saeed'
prefix = 'fraudcredit-pipeline'

import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

# Get a SageMaker-compatible role used by this Notebook Instance.
role = get_execution_role()

# Preprocessing data and training the model <a class="anchor" id="training"></a>


In [3]:
s3_input_train = sagemaker.s3_input(s3_data='s3://{}/{}/raw_train'.format(bucket_name, prefix))
s3_input_train.config

{'DataSource': {'S3DataSource': {'S3DataDistributionType': 'FullyReplicated',
   'S3DataType': 'S3Prefix',
   'S3Uri': 's3://demo-saeed/fraudcredit-pipeline/raw_train'}}}

## Create SageMaker Scikit Estimator <a class="anchor" id="create_sklearn_estimator"></a>

To run our Scikit-learn training script on SageMaker, we construct a `sagemaker.sklearn.estimator.sklearn` estimator, which accepts several constructor arguments:

* __entry_point__: The path to the Python script SageMaker runs for training and prediction.
* __role__: Role ARN
* __train_instance_type__ *(optional)*: The type of SageMaker instances for training. __Note__: Because Scikit-learn does not natively support GPU training, Sagemaker Scikit-learn does not currently support training on GPU instance types.
* __sagemaker_session__ *(optional)*: The session used to train on Sagemaker.

To see the code for the SKLearn Estimator, see here: https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/sklearn

In [4]:
from sagemaker.sklearn.estimator import SKLearn

script_path = 'sklearn_fd_featurizer.py'

sklearn_preprocessor = SKLearn(
    entry_point=script_path,
    role=role,
    train_instance_type="ml.c4.xlarge",
    output_path='s3://{}/{}/output/'.format(bucket_name, prefix),
    #code_location='s3://<path-to-code_location>', read locally from notebook
    sagemaker_session=sagemaker_session)




In [5]:
sklearn_preprocessor.fit({'train': s3_input_train})

2019-05-30 21:47:19 Starting - Starting the training job...
2019-05-30 21:47:23 Starting - Launching requested ML instances......
2019-05-30 21:48:29 Starting - Preparing the instances for training...
2019-05-30 21:49:13 Downloading - Downloading input data...
2019-05-30 21:49:45 Training - Training image download completed. Training in progress..
[31m2019-05-30 21:49:45,756 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[31m2019-05-30 21:49:45,758 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[31m2019-05-30 21:49:45,769 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[31m2019-05-30 21:49:45,998 sagemaker-containers INFO     Module sklearn_fd_featurizer does not provide a setup.py. [0m
[31mGenerating setup.py[0m
[31m2019-05-30 21:49:45,998 sagemaker-containers INFO     Generating setup.cfg[0m
[31m2019-05-30 21:49:45,999 sagemaker-containers INFO     Generating MANIFE

In [6]:
sklearn_preprocessor.model_data

's3://demo-saeed/fraudcredit-pipeline/output/sagemaker-scikit-learn-2019-05-30-21-47-18-739/output/model.tar.gz'

In [7]:
sklearn_preprocessor.uploaded_code

UserCode(s3_prefix='s3://demo-saeed/sagemaker-scikit-learn-2019-05-30-21-47-18-739/source/sourcedir.tar.gz', script_name='sklearn_fd_featurizer.py')

## Batch transform our training data <a class="anchor" id="preprocess_train_data"></a>
Now that our proprocessor is properly fitted, let's go ahead and preprocess our training data. Let's use batch transform to directly preprocess the raw data and store right back into s3.

In [8]:
# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn_preprocessor.transformer(
    instance_count=1, 
    instance_type='ml.m4.xlarge',
    assemble_with = 'Line',
    output_path = 's3://{}/{}/preprocessed_model/'.format(bucket_name, prefix),
    accept = 'text/csv')

In [9]:
s3_input_train.config['DataSource']['S3DataSource']['S3Uri']

's3://demo-saeed/fraudcredit-pipeline/raw_train'

In [10]:
transformer.output_path

's3://demo-saeed/fraudcredit-pipeline/preprocessed_model/'

In [None]:
# Preprocess training input
transformer.output_path = 's3://demo-saeed/fraudcredit-pipeline/preprocessed_train/'
transformer.transform(s3_input_train.config['DataSource']['S3DataSource']['S3Uri'], content_type='text/csv', split_type='Line')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path
s3_input_processed_train = sagemaker.session.s3_input(
    preprocessed_train, 
    distribution='FullyReplicated',
    content_type='text/csv', 
    s3_data_type='S3Prefix')
print(s3_input_processed_train.config)

Waiting for transform job: sagemaker-scikit-learn-2019-05-30-21-50-31-473
....

In [None]:
data_location = 's3://{}/{}/{}/{}'.format(bucket_name, prefix,'preprocessed_train','train.csv.out')
df = pd.read_csv(data_location,header = None)
df.head(25)

In [None]:
s3_input_validation = sagemaker.s3_input(s3_data='s3://{}/{}/raw_validation'.format(bucket_name, prefix))
s3_input_validation.config
# Preprocess validation input
transformer.output_path = 's3://demo-saeed/fraudcredit-pipeline/preprocessed_validation/'
transformer.transform(s3_input_validation.config['DataSource']['S3DataSource']['S3Uri'], content_type='text/csv', split_type='Line')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_validation = transformer.output_path
s3_input_processed_validation = sagemaker.session.s3_input(
    preprocessed_validation, 
    distribution='FullyReplicated',
    content_type='text/csv', 
    s3_data_type='S3Prefix')
print(s3_input_processed_validation.config)

In [None]:
s3_input_test = sagemaker.s3_input(s3_data='s3://{}/{}/raw_test'.format(bucket_name, prefix))
s3_input_test.config
# Preprocess training input
transformer.output_path = 's3://demo-saeed/fraudcredit-pipeline/preprocessed_test/'
transformer.transform(s3_input_test.config['DataSource']['S3DataSource']['S3Uri'], content_type='text/csv', split_type='Line')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_test = transformer.output_path
s3_input_processed_test = sagemaker.session.s3_input(
    preprocessed_test, 
    distribution='FullyReplicated',
    content_type='text/csv', 
    s3_data_type='S3Prefix')
print(s3_input_processed_test.config)

## Fit a LinearLearner Model with the preprocessed data <a class="anchor" id="training_model"></a>
Let's take the preprocessed training data and fit a LinearLearner Model. Sagemaker provides prebuilt algorithm containers that can be used with the Python SDK. The previous Scikit-learn job preprocessed the raw Titanic dataset into labeled, useable data that we can now use to fit a binary classifier Linear Learner model.

For more on Linear Learner see: https://docs.aws.amazon.com/sagemaker/latest/dg/linear-learner.html

# training

In [None]:
from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'linear-learner')
container

In [None]:
output_location = 's3://{}/{}/model_output'.format(bucket_name, prefix)
print('training artifacts will be uploaded to: {}'.format(output_location))

In [None]:
s3_input_processed_train.config

In [None]:
import pandas as pd
data_location = 's3://{}/{}/raw_train/train.csv'.format(bucket_name, prefix)

data = pd.read_csv(data_location, header=None)
data.head()

In [None]:
import pandas as pd
data_location = 's3://{}/{}/preprocessed_train/train.csv.out'.format(bucket_name, prefix)

data = pd.read_csv(data_location, header=None)
data.head()

In [None]:
output_location

In [None]:
import boto3
import sagemaker

sess = sagemaker.Session()

linear = sagemaker.estimator.Estimator(container,
                                       role, 
                                       train_instance_count=1, 
                                       train_instance_type='ml.m4.2xlarge',
                                       output_path=output_location,
                                       sagemaker_session=sess)
linear.set_hyperparameters(feature_dim=30,
                           predictor_type='binary_classifier',
                           epochs = 1,
                           mini_batch_size=200)
linear.fit({'train': s3_input_processed_train,  'validation': s3_input_processed_validation, 'test': s3_input_processed_test})
#linear.fit({'train': s3_input_processed_train})

# train_max_run = 3600,

In [None]:
linear.output_path

In [None]:
%matplotlib inline
from sagemaker.analytics import TrainingJobAnalytics

training_job_name = linear._current_job_name
metric_name = 'validation:binary_f_beta'

metrics_dataframe = TrainingJobAnalytics(training_job_name=training_job_name,metric_names=[metric_name]).dataframe()
plt = metrics_dataframe.plot(kind='line', figsize=(12,5), x='timestamp', y='value', style='b.', legend=False)
plt.set_ylabel(metric_name);

In [None]:
#linear.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge')
linear_predictor = linear.deploy(initial_instance_count=1, instance_type='ml.c5.xlarge', endpoint_name='pip-model-aws-linear-learner1', update_endpoint=True)


In [None]:
linear.output_path

In [None]:
linear.latest_training_job.job_name

In [None]:
linear_predictor.content_type

In [None]:
linear_predictor.endpoint

### Testing Inference

In [None]:
data_location = 's3://{}/{}/{}/{}'.format(bucket_name, prefix,'preprocessed_train','train.csv.out')
df = pd.read_csv(data_location,header = None)
df.head(5)

In [None]:
X_test = df.iloc[0:1, 1:]
# X_test.columns =['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10',
#        'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20',
#        'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount']
X_test

In [None]:
from sagemaker.predictor import csv_serializer, json_deserializer

linear_predictor.content_type = 'text/csv'
linear_predictor.serializer = csv_serializer
linear_predictor.deserializer = json_deserializer
#linear_predictor.predict(X_test.iloc[0])

linear_predictor.predict(X_test.values)


# Serial Inference Pipeline with Scikit preprocessor and Linear Learner <a class="anchor" id="serial_inference"></a>


## Set up the inference pipeline <a class="anchor" id="pipeline_setup"></a>
Setting up a Machine Learning pipeline can be done with the Pipeline Model. This sets up a list of models in a single endpoint; in this example, we configure our pipeline model with the fitted Scikit-learn inference model and the fitted Linear Learner model. Deploying the model follows the same ```deploy``` pattern in the SDK.

In [None]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

scikit_learn_inferencee_model = sklearn_preprocessor.create_model()
linear_learner_model = linear.create_model()

model_name = 'inference-pipeline-' + timestamp_prefix
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model = PipelineModel(
    name=model_name, 
    role=role, 
    models=[
        scikit_learn_inferencee_model, 
        linear_learner_model])

sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

In [None]:
sm_model.endpoint_name

## Make a request to our pipeline endpoint <a class="anchor" id="pipeline_inference_request"></a>

Here we just grab the first line from the test data (you'll notice that the inference python script is very particular about the ordering of the inference request data). The ```ContentType``` field configures the first container, while the ```Accept``` field configures the last container. You can also specify each container's ```Accept``` and ```ContentType``` values using environment variables.

We make our request with the payload in ```'text/csv'``` format, since that is what our script currently supports. If other formats need to be supported, this would have to be added to the ```output_fn()``` method in our entry point. Note that we set the ```Accept``` to ```application/json```, since Linear Learner does not support ```text/csv``` ```Accept```. The prediction output in this case is trying to guess the number of rings the abalone specimen would have given its other physical features; the actual number of rings is 10.

In [None]:
X_test

In [None]:
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
payload = X_test.values
actual_rings = 10
predictor = RealTimePredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=csv_serializer,
    content_type=CONTENT_TYPE_CSV,
    accept=CONTENT_TYPE_JSON)

print(predictor.predict(payload))


## Delete Endpoint <a class="anchor" id="delete_endpoint"></a>
Once we are finished with the endpoint, we clean up the resources!

In [None]:
# sm_client = sagemaker_session.boto_session.client('sagemaker')
# sm_client.delete_endpoint(EndpointName=endpoint_name)