# Inference Pipeline with Scikit-learn and Linear Learner
Typically a Machine Learning (ML) process consists of few steps: data gathering with various ETL jobs, pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm. 
In many cases, when the trained model is used for processing real time or batch prediction requests, the model receives data in a format which needs to pre-processed (e.g. featurized) before it can be passed to the algorithm. In the following notebook, we will demonstrate how you can build your ML Pipeline leveraging the Sagemaker Scikit-learn container and SageMaker Linear Learner algorithm & after the model is trained, deploy the Pipeline (Data preprocessing and Lineara Learner) as an Inference Pipeline behind a single Endpoint for real time inference and for batch inferences using Amazon SageMaker Batch Transform.

We will demonstrate this using the Abalone Dataset to guess the age of Abalone with physical features. The dataset is available from [UCI Machine Learning](https://archive.ics.uci.edu/ml/datasets/abalone); the aim for this task is to determine age of an Abalone (a kind of shellfish) from its physical measurements. We'll use Sagemaker's Scikit-learn container to featurize the dataset so that it can be used for training with Linear Learner.

### Table of contents
* [Preprocessing data and training the model](#training)
 * [Upload the data for training](#upload_data)
 * [Create a Scikit-learn script to train with](#create_sklearn_script)
 * [Create SageMaker Scikit Estimator](#create_sklearn_estimator)
 * [Batch transform our training data](#preprocess_train_data)
 * [Fit a LinearLearner Model with the preprocessed data](#training_model)
* [Inference Pipeline with Scikit preprocessor and Linear Learner](#inference_pipeline)
 * [Set up the inference pipeline](#pipeline_setup)
 * [Make a request to our pipeline endpoint](#pipeline_inference_request)
 * [Delete Endpoint](#delete_endpoint)

Let's first create our Sagemaker session and role, and create a S3 prefix to use for the notebook example.

In [1]:
# S3 prefix

import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

# Get a SageMaker-compatible role used by this Notebook Instance.
role = get_execution_role()

bucket = bucket=sagemaker_session.default_bucket()
prefix = 'Scikit-LinearLearner-pipeline-abalone-example'


In [2]:
import boto3

# Preprocessing data and training the model <a class="anchor" id="training"></a>
## Downloading dataset <a class="anchor" id="download_data"></a>
SageMaker team has downloaded the dataset from UCI and uploaded to one of the S3 buckets in our account.

In [3]:
!wget --directory-prefix=./abalone_data https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv

--2020-04-27 03:12:19--  https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv
Resolving s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)... 52.218.253.48
Connecting to s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)|52.218.253.48|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 191873 (187K) [binary/octet-stream]
Saving to: ‘./abalone_data/abalone.csv.4’


2020-04-27 03:12:19 (2.72 MB/s) - ‘./abalone_data/abalone.csv.4’ saved [191873/191873]



## Upload the data for training <a class="anchor" id="upload_data"></a>

When training large models with huge amounts of data, you'll typically use big data tools, like Amazon Athena, AWS Glue, or Amazon EMR, to create your data in S3. We can use the tools provided by the SageMaker Python SDK to upload the data to a default bucket. 

In [4]:
WORK_DIRECTORY = 'abalone_data'

train_input = sagemaker_session.upload_data(
    path='{}/{}'.format(WORK_DIRECTORY, 'abalone.csv'), 
    bucket=bucket,
    key_prefix='{}/{}'.format(prefix, 'train'))

## Create a Scikit-learn script to train with <a class="anchor" id="create_sklearn_script"></a>
To run Scikit-learn on Sagemaker `SKLearn` Estimator with a script as an entry point. The training script is very similar to a training script you might run outside of SageMaker, but you can access useful properties about the training environment through various environment variables, such as:

* SM_MODEL_DIR: A string representing the path to the directory to write model artifacts to. These artifacts are uploaded to S3 for model hosting.
* SM_OUTPUT_DIR: A string representing the filesystem path to write output artifacts to. Output artifacts may include checkpoints, graphs, and other files to save, not including model artifacts. These artifacts are compressed and uploaded to S3 to the same S3 prefix as the model artifacts.

Supposing two input channels, 'train' and 'test', were used in the call to the Chainer estimator's fit() method, the following will be set, following the format SM_CHANNEL_[channel_name]:

* SM_CHANNEL_TRAIN: A string representing the path to the directory containing data in the 'train' channel
* SM_CHANNEL_TEST: Same as above, but for the 'test' channel.

A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to model_dir so that it can be hosted later. Hyperparameters are passed to your script as arguments and can be retrieved with an argparse.ArgumentParser instance. For example, the script run by this notebook:

```python
from __future__ import print_function

import time
import sys
from io import StringIO
import os
import shutil

import argparse
import csv
import json
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.externals import joblib
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import Binarizer, StandardScaler, OneHotEncoder

from sagemaker_containers.beta.framework import (
    content_types, encoders, env, modules, transformer, worker)

# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
    'sex', # M, F, and I (infant)
    'length', # Longest shell measurement
    'diameter', # perpendicular to length
    'height', # with meat in shell
    'whole_weight', # whole abalone
    'shucked_weight', # weight of meat
    'viscera_weight', # gut weight (after bleeding)
    'shell_weight'] # after being dried

label_column = 'rings'

feature_columns_dtype = {
    'sex': str,
    'length': np.float64,
    'diameter': np.float64,
    'height': np.float64,
    'whole_weight': np.float64,
    'shucked_weight': np.float64,
    'viscera_weight': np.float64,
    'shell_weight': np.float64}

label_column_dtype = {'rings': np.float64} # +1.5 gives the age in years

def merge_two_dicts(x, y):
    z = x.copy()   # start with x's keys and values
    z.update(y)    # modifies z with y's keys and values & returns None
    return z

if __name__ == '__main__':

    parser = argparse.ArgumentParser()

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])

    args = parser.parse_args()

    # Take the set of files and read them all into a single pandas dataframe
    input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))
    
    raw_data = [ pd.read_csv(
        file, 
        header=None, 
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)) for file in input_files ]
    concat_data = pd.concat(raw_data)
    
    # This section is adapted from the scikit-learn example of using preprocessing pipelines:
    #
    # https://scikit-learn.org/stable/auto_examples/compose/plot_column_transformer_mixed_types.html
    #
    # We will train our classifier with the following features:
    # Numeric Features:
    # - length:  Longest shell measurement
    # - diameter: Diameter perpendicular to length
    # - height:  Height with meat in shell
    # - whole_weight: Weight of whole abalone
    # - shucked_weight: Weight of meat
    # - viscera_weight: Gut weight (after bleeding)
    # - shell_weight: Weight after being dried
    # Categorical Features:
    # - sex: categories encoded as strings {'M', 'F', 'I'} where 'I' is Infant
    numeric_features = list(feature_columns_names)
    numeric_features.remove('sex')
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())])

    categorical_features = ['sex']
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)],
        remainder="drop")
    
    preprocessor.fit(concat_data)

    joblib.dump(preprocessor, os.path.join(args.model_dir, "model.joblib"))

    print("saved model!")
    
    
def input_fn(input_data, content_type):
    """Parse input data payload
    
    We currently only take csv input. Since we need to process both labelled
    and unlabelled data we first determine whether the label column is present
    by looking at how many columns were provided.
    """
    if content_type == 'text/csv':
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data), 
                         header=None)
        
        if len(df.columns) == len(feature_columns_names) + 1:
            # This is a labelled example, includes the ring label
            df.columns = feature_columns_names + [label_column]
        elif len(df.columns) == len(feature_columns_names):
            # This is an unlabelled example.
            df.columns = feature_columns_names
            
        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))
        

def output_fn(prediction, accept):
    """Format prediction output
    
    The default accept/content-type between containers for serial inference is JSON.
    We also want to set the ContentType or mimetype as the same value as accept so the next
    container can read the response payload correctly.
    """
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append({"features": row})

        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), accept, mimetype=accept)
    elif accept == 'text/csv':
        return worker.Response(encoders.encode(prediction, accept), accept, mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))


def predict_fn(input_data, model):
    """Preprocess input data
    
    We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
    so we want to use .transform().

    The output is returned in the following order:
    
        rest of features either one hot encoded or standardized
    """
    features = model.transform(input_data)
    
    if label_column in input_data:
        # Return the label (as the first column) and the set of features.
        return np.insert(features, 0, input_data[label_column], axis=1)
    else:
        # Return only the set of features
        return features
    

def model_fn(model_dir):
    """Deserialize fitted model
    """
    preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
    return preprocessor
```

In [5]:
import pandas as pd
split_df = pd.read_csv('abalone_data/abalone_split_1.csv', header=None)
split_columns = [
    'ID',
    'sex', # M, F, and I (infant)
    'length', # Longest shell measurement
    'diameter', # perpendicular to length
    'height' # with meat in shell
]
split_df.columns = split_columns
split_df.ID.to_list()

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,
 185

## Create SageMaker Scikit Estimator <a class="anchor" id="create_sklearn_estimator"></a>

To run our Scikit-learn training script on SageMaker, we construct a `sagemaker.sklearn.estimator.sklearn` estimator, which accepts several constructor arguments:

* __entry_point__: The path to the Python script SageMaker runs for training and prediction.
* __role__: Role ARN
* __train_instance_type__ *(optional)*: The type of SageMaker instances for training. __Note__: Because Scikit-learn does not natively support GPU training, Sagemaker Scikit-learn does not currently support training on GPU instance types.
* __sagemaker_session__ *(optional)*: The session used to train on Sagemaker.

To see the code for the SKLearn Estimator, see here: https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/sklearn

In [8]:
from sagemaker.sklearn.estimator import SKLearn

script_path = 'sklearn_abalone_featurizer.py'

sklearn_preprocessor = SKLearn(
    entry_point=script_path,
    role=role,
    train_instance_type="ml.c4.xlarge",
    sagemaker_session=sagemaker_session)


In [9]:
sklearn_preprocessor.fit({'train': train_input})

2020-04-27 03:18:00 Starting - Starting the training job...
2020-04-27 03:18:01 Starting - Launching requested ML instances......
2020-04-27 03:19:04 Starting - Preparing the instances for training...
2020-04-27 03:19:47 Downloading - Downloading input data...
2020-04-27 03:20:18 Training - Downloading the training image..[34m2020-04-27 03:20:38,653 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2020-04-27 03:20:38,655 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2020-04-27 03:20:38,666 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2020-04-27 03:20:38,935 sagemaker-containers INFO     Module sklearn_abalone_featurizer does not provide a setup.py. [0m
[34mGenerating setup.py[0m
[34m2020-04-27 03:20:38,935 sagemaker-containers INFO     Generating setup.cfg[0m
[34m2020-04-27 03:20:38,936 sagemaker-containers INFO     Generating MANIFEST.in[0m
[34m2020-0

In [10]:
import boto3
import json
import decimal

from botocore.exceptions import ClientError
dynamodb = boto3.resource("dynamodb", region_name='us-west-1')
table = dynamodb.Table('feature_store')

class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if abs(o) % 1 > 0:
                return float(o)
            else:
                return int(o)
        return super(DecimalEncoder, self).default(o)
    
try:
    response = dynamodb.batch_get_item(
        RequestItems={
            'feature_store': {
                'Keys': [
                    {
                        'ID': '1234'
                    }
                ]
            }
        }
    )
except ClientError as e:
    print(e.response['Error']['Message'])
else:
    print(json.dumps(response['Responses']['feature_store'], indent=4, cls=DecimalEncoder))
    #item = response['Item']
    #print("GetItem succeeded:")
    #print(json.dumps(item, indent=4, cls=DecimalEncoder))

[
    {
        "shell_weight": "6",
        "shucked_weight": "0.035",
        "ID": "1234",
        "whole_weight": "0.1095",
        "viscera_weight": "0.062"
    }
]


## Batch transform our training data <a class="anchor" id="preprocess_train_data"></a>
Now that our proprocessor is properly fitted, let's go ahead and preprocess our training data. Let's use batch transform to directly preprocess the raw data and store right back into s3.

In [11]:
# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn_preprocessor.transformer(
    instance_count=1, 
    instance_type='ml.m4.xlarge',
    assemble_with = 'Line',
    accept = 'text/csv')

In [12]:
# Preprocess training input
transformer.transform(train_input, content_type='text/csv')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path
print(preprocessed_train)

Waiting for transform job: sagemaker-scikit-learn-2020-04-27-03-21-11-929
.....................[34mProcessing /opt/ml/code[0m
[34mBuilding wheels for collected packages: sklearn-abalone-featurizer
  Building wheel for sklearn-abalone-featurizer (setup.py): started
  Building wheel for sklearn-abalone-featurizer (setup.py): finished with status 'done'
  Created wheel for sklearn-abalone-featurizer: filename=sklearn_abalone_featurizer-1.0.0-py2.py3-none-any.whl size=11523 sha256=c343dcdcab9368919413a279cb4d2c623cef90bacf85ed9fcb4c0df671e79836
  Stored in directory: /tmp/pip-ephem-wheel-cache-qg63w6pk/wheels/35/24/16/37574d11bf9bde50616c67372a334f94fa8356bc7164af8ca3[0m
[34mSuccessfully built sklearn-abalone-featurizer[0m
[34mInstalling collected packages: sklearn-abalone-featurizer[0m
[34mSuccessfully installed sklearn-abalone-featurizer-1.0.0[0m
  import imp[0m
[34m[2020-04-27 03:24:17 +0000] [39] [INFO] Starting gunicorn 19.9.0[0m
[34m[2020-04-27 03:24:17 +0000] [39] [INF

In [13]:
#print("{}/{}/output/model.tar.gz".format(s3_ll_output_location, ll_estimator.latest_training_job.name))
print(sklearn_preprocessor.output_path)
print(sklearn_preprocessor.latest_training_job.name)
print("{}/{}/output/model.tar.gz".format(sklearn_preprocessor.output_path,sklearn_preprocessor.latest_training_job.name))

s3://sagemaker-us-west-1-305752278501/
sagemaker-scikit-learn-2020-04-27-03-17-59-725
s3://sagemaker-us-west-1-305752278501//sagemaker-scikit-learn-2020-04-27-03-17-59-725/output/model.tar.gz


## Fit a LinearLearner Model with the preprocessed data <a class="anchor" id="training_model"></a>
Let's take the preprocessed training data and fit a LinearLearner Model. Sagemaker provides prebuilt algorithm containers that can be used with the Python SDK. The previous Scikit-learn job preprocessed the raw Titanic dataset into labeled, useable data that we can now use to fit a binary classifier Linear Learner model.

For more on Linear Learner see: https://docs.aws.amazon.com/sagemaker/latest/dg/linear-learner.html

In [14]:
import boto3
from sagemaker.amazon.amazon_estimator import get_image_uri
ll_image = get_image_uri(boto3.Session().region_name, 'linear-learner')

In [15]:
s3_ll_output_key_prefix = "ll_training_output"
s3_ll_output_location = 's3://{}/{}/{}/{}'.format(bucket, prefix, s3_ll_output_key_prefix, 'll_model')

ll_estimator = sagemaker.estimator.Estimator(
    ll_image,
    role, 
    train_instance_count=1, 
    train_instance_type='ml.m4.2xlarge',
    train_volume_size = 20,
    train_max_run = 3600,
    input_mode= 'File',
    output_path=s3_ll_output_location,
    sagemaker_session=sagemaker_session)

ll_estimator.set_hyperparameters(feature_dim=10, predictor_type='regressor', mini_batch_size=32)

ll_train_data = sagemaker.session.s3_input(
    preprocessed_train, 
    distribution='FullyReplicated',
    content_type='text/csv', 
    s3_data_type='S3Prefix')

data_channels = {'train': ll_train_data}
ll_estimator.fit(inputs=data_channels, logs=True)

2020-04-27 03:25:24 Starting - Starting the training job...
2020-04-27 03:25:25 Starting - Launching requested ML instances......
2020-04-27 03:26:27 Starting - Preparing the instances for training...
2020-04-27 03:27:19 Downloading - Downloading input data...
2020-04-27 03:27:49 Training - Downloading the training image..[34mDocker entrypoint called with argument(s): train[0m
[34mRunning default environment configuration script[0m
[34m[04/27/2020 03:28:04 INFO 140374852036416] Reading default configuration from /opt/amazon/lib/python2.7/site-packages/algorithm/resources/default-input.json: {u'loss_insensitivity': u'0.01', u'epochs': u'15', u'feature_dim': u'auto', u'init_bias': u'0.0', u'lr_scheduler_factor': u'auto', u'num_calibration_samples': u'10000000', u'accuracy_top_k': u'3', u'_num_kv_servers': u'auto', u'use_bias': u'true', u'num_point_for_scaler': u'10000', u'_log_level': u'info', u'quantile': u'0.5', u'bias_lr_mult': u'auto', u'lr_scheduler_step': u'auto', u'init_metho


2020-04-27 03:28:02 Training - Training image download completed. Training in progress.[34m#metrics {"Metrics": {"train_mse_objective": {"count": 1, "max": 0.7001717660576106, "sum": 0.7001717660576106, "min": 0.7001717660576106}}, "EndTime": 1587958090.878585, "Dimensions": {"model": 0, "Host": "algo-1", "Operation": "training", "Algorithm": "Linear Learner", "epoch": 1}, "StartTime": 1587958090.878521}
[0m
[34m#metrics {"Metrics": {"train_mse_objective": {"count": 1, "max": 0.6973204490083914, "sum": 0.6973204490083914, "min": 0.6973204490083914}}, "EndTime": 1587958090.878666, "Dimensions": {"model": 1, "Host": "algo-1", "Operation": "training", "Algorithm": "Linear Learner", "epoch": 1}, "StartTime": 1587958090.878652}
[0m
[34m#metrics {"Metrics": {"train_mse_objective": {"count": 1, "max": 0.7065225136108123, "sum": 0.7065225136108123, "min": 0.7065225136108123}}, "EndTime": 1587958090.878705, "Dimensions": {"model": 2, "Host": "algo-1", "Operation": "training", "Algorithm":

[34m#metrics {"Metrics": {"train_mse_objective": {"count": 1, "max": 0.5715400433884217, "sum": 0.5715400433884217, "min": 0.5715400433884217}}, "EndTime": 1587958100.738694, "Dimensions": {"model": 0, "Host": "algo-1", "Operation": "training", "Algorithm": "Linear Learner", "epoch": 5}, "StartTime": 1587958100.738631}
[0m
[34m#metrics {"Metrics": {"train_mse_objective": {"count": 1, "max": 0.5749786387842435, "sum": 0.5749786387842435, "min": 0.5749786387842435}}, "EndTime": 1587958100.738783, "Dimensions": {"model": 1, "Host": "algo-1", "Operation": "training", "Algorithm": "Linear Learner", "epoch": 5}, "StartTime": 1587958100.738762}
[0m
[34m#metrics {"Metrics": {"train_mse_objective": {"count": 1, "max": 0.5752306820681462, "sum": 0.5752306820681462, "min": 0.5752306820681462}}, "EndTime": 1587958100.738852, "Dimensions": {"model": 2, "Host": "algo-1", "Operation": "training", "Algorithm": "Linear Learner", "epoch": 5}, "StartTime": 1587958100.738832}
[0m
[34m#metrics {"Met

[34m#metrics {"Metrics": {"train_mse_objective": {"count": 1, "max": 0.527829533471511, "sum": 0.527829533471511, "min": 0.527829533471511}}, "EndTime": 1587958110.612201, "Dimensions": {"model": 0, "Host": "algo-1", "Operation": "training", "Algorithm": "Linear Learner", "epoch": 9}, "StartTime": 1587958110.612137}
[0m
[34m#metrics {"Metrics": {"train_mse_objective": {"count": 1, "max": 0.5396614836385617, "sum": 0.5396614836385617, "min": 0.5396614836385617}}, "EndTime": 1587958110.612282, "Dimensions": {"model": 1, "Host": "algo-1", "Operation": "training", "Algorithm": "Linear Learner", "epoch": 9}, "StartTime": 1587958110.612269}
[0m
[34m#metrics {"Metrics": {"train_mse_objective": {"count": 1, "max": 0.5298901002280987, "sum": 0.5298901002280987, "min": 0.5298901002280987}}, "EndTime": 1587958110.612348, "Dimensions": {"model": 2, "Host": "algo-1", "Operation": "training", "Algorithm": "Linear Learner", "epoch": 9}, "StartTime": 1587958110.612336}
[0m
[34m#metrics {"Metric


2020-04-27 03:28:55 Uploading - Uploading generated training model
2020-04-27 03:28:55 Completed - Training job completed
[34m#metrics {"Metrics": {"train_mse_objective": {"count": 1, "max": 0.5081580022493234, "sum": 0.5081580022493234, "min": 0.5081580022493234}}, "EndTime": 1587958120.661292, "Dimensions": {"model": 0, "Host": "algo-1", "Operation": "training", "Algorithm": "Linear Learner", "epoch": 13}, "StartTime": 1587958120.661227}
[0m
[34m#metrics {"Metrics": {"train_mse_objective": {"count": 1, "max": 0.5286210498557641, "sum": 0.5286210498557641, "min": 0.5286210498557641}}, "EndTime": 1587958120.661374, "Dimensions": {"model": 1, "Host": "algo-1", "Operation": "training", "Algorithm": "Linear Learner", "epoch": 13}, "StartTime": 1587958120.66136}
[0m
[34m#metrics {"Metrics": {"train_mse_objective": {"count": 1, "max": 0.5094138070081289, "sum": 0.5094138070081289, "min": 0.5094138070081289}}, "EndTime": 1587958120.661422, "Dimensions": {"model": 2, "Host": "algo-1", "

Training seconds: 96
Billable seconds: 96


# Serial Inference Pipeline with Scikit preprocessor and Linear Learner <a class="anchor" id="serial_inference"></a>


## Set up the inference pipeline <a class="anchor" id="pipeline_setup"></a>
Setting up a Machine Learning pipeline can be done with the Pipeline Model. This sets up a list of models in a single endpoint; in this example, we configure our pipeline model with the fitted Scikit-learn inference model and the fitted Linear Learner model. Deploying the model follows the same ```deploy``` pattern in the SDK.

In [16]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

scikit_learn_inferencee_model = sklearn_preprocessor.create_model()
linear_learner_model = ll_estimator.create_model()

model_name = 'inference-pipeline-' + timestamp_prefix
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model = PipelineModel(
    name=model_name, 
    role=role, 
    models=[
        scikit_learn_inferencee_model, 
        linear_learner_model])

sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

---------------!

In [17]:
import numpy as np
split_1_feature_columns_names = [
    'ID',
    'sex', # M, F, and I (infant)
    'length', # Longest shell measurement
    'diameter', # perpendicular to length
    'height' # with meat in shell
] # after being dried

split_2_feature_columns_names = [
    'ID',
    'whole_weight', # whole abalone
    'shucked_weight', # weight of meat
    'viscera_weight', # gut weight (after bleeding)
    'shell_weight'] # after being dried

split_1_feature_columns_dtype = {
    'ID': np.int64,
    'sex': str,
    'length': np.float64,
    'diameter': np.float64,
    'height': np.float64}

split_2_feature_columns_dtype = {
    'ID': np.int64,
    'whole_weight': np.float64,
    'shucked_weight': np.float64,
    'viscera_weight': np.float64,
    'shell_weight': np.float64}

split_1_df = pd.read_csv('abalone_data/abalone_split_1.csv', header=None, dtype=split_1_feature_columns_dtype)
split_1_df.columns = split_1_feature_columns_names
id_array = split_1_df.ID.tolist()
batch_size = 25
id_array_batched = [id_array[i * batch_size:(i + 1) * batch_size] for i in range((len(id_array) + batch_size - 1) // batch_size )]
#print(id_array_batched)

"""
        RequestItems={
            'feature_store': {
                'Keys': [
                    {
                        'ID': '1234'
                    }
                ]
            }
        }
"""

def ddb_request_transform(item_ids):
    keys = [{'ID': str(id)} for id in item_ids]
    return {'Keys': keys}

print(ddb_request_transform(id_array_batched[0]))

import boto3
import json
import decimal
import time

from botocore.exceptions import ClientError
dynamodb = boto3.resource("dynamodb", region_name='us-west-1')
table = dynamodb.Table('feature_store')

responses = []

class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if abs(o) % 1 > 0:
                return float(o)
            else:
                return int(o)
        return super(DecimalEncoder, self).default(o)
    
try:
    for arr in id_array_batched:
        response = dynamodb.batch_get_item(
            RequestItems={
                'feature_store': ddb_request_transform(arr)
            }
        )
        responses.extend(response['Responses']['feature_store'])
        time.sleep(0.5)
        
        
except ClientError as e:
    print(e.response['Error']['Message'])
else:
    #print(json.dumps(response['Responses']['feature_store'], indent=4, cls=DecimalEncoder))
    #item = response['Item']
    #print("GetItem succeeded:")
    #print(json.dumps(item, indent=4, cls=DecimalEncoder))    
    print(json.dumps(responses, indent=4, cls=DecimalEncoder))


{'Keys': [{'ID': '1'}, {'ID': '2'}, {'ID': '3'}, {'ID': '4'}, {'ID': '5'}, {'ID': '6'}, {'ID': '7'}, {'ID': '8'}, {'ID': '9'}, {'ID': '10'}, {'ID': '11'}, {'ID': '12'}, {'ID': '13'}, {'ID': '14'}, {'ID': '15'}, {'ID': '16'}, {'ID': '17'}, {'ID': '18'}, {'ID': '19'}, {'ID': '20'}, {'ID': '21'}, {'ID': '22'}, {'ID': '23'}, {'ID': '24'}, {'ID': '25'}]}
[
    {
        "shell_weight": "12",
        "shucked_weight": "0.133",
        "ID": "16",
        "whole_weight": "0.258",
        "viscera_weight": "0.24"
    },
    {
        "shell_weight": "10",
        "shucked_weight": "0.301",
        "ID": "25",
        "whole_weight": "0.513",
        "viscera_weight": "0.305"
    },
    {
        "shell_weight": "11",
        "shucked_weight": "0.062",
        "ID": "21",
        "whole_weight": "0.0955",
        "viscera_weight": "0.075"
    },
    {
        "shell_weight": "8",
        "shucked_weight": "0.0775",
        "ID": "6",
        "whole_weight": "0.141",
        "viscera_weight": "0

In [18]:
#split_1_df.head()
#split_1_df.dtypes
split_2_df = pd.DataFrame(responses, columns=split_2_feature_columns_names).astype(split_2_feature_columns_dtype)
#split_2_df = pd.DataFrame(responses)
#split_2_df.head()
split_2_df.dtypes



ID                  int64
whole_weight      float64
shucked_weight    float64
viscera_weight    float64
shell_weight      float64
dtype: object

In [19]:
#split_1_df.dtypes
#split_1_df.set_index('ID')
#split_2_df.set_index('ID')
joined_df = split_1_df.merge(split_2_df, left_on='ID', right_on='ID').drop('ID', axis=1)
joined_df.head()

Unnamed: 0,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight
0,M,0.455,0.365,0.095,0.2245,0.101,0.15,15.0
1,M,0.35,0.265,0.09,0.0995,0.0485,0.07,7.0
2,F,0.53,0.42,0.135,0.2565,0.1415,0.21,9.0
3,M,0.44,0.365,0.125,0.2155,0.114,0.155,10.0
4,I,0.33,0.255,0.08,0.0895,0.0395,0.055,7.0


In [20]:
sklearn_image = sklearn_preprocessor.image_name
print(sklearn_image)
print(ll_image)

746614075791.dkr.ecr.us-west-1.amazonaws.com/sagemaker-scikit-learn:0.20.0-cpu-py3
632365934929.dkr.ecr.us-west-1.amazonaws.com/linear-learner:1


In [21]:
print("{}/output/model.tar.gz".format(preprocessed_train))
print("{}/{}/output/model.tar.gz".format(s3_ll_output_location, ll_estimator.latest_training_job.name))
print("{}{}/output/model.tar.gz".format(sklearn_preprocessor.output_path,sklearn_preprocessor.latest_training_job.name))


s3://sagemaker-us-west-1-305752278501/sagemaker-scikit-learn-2020-04-27-03-21-11-929/output/model.tar.gz
s3://sagemaker-us-west-1-305752278501/Scikit-LinearLearner-pipeline-abalone-example/ll_training_output/ll_model/linear-learner-2020-04-27-03-25-24-330/output/model.tar.gz
s3://sagemaker-us-west-1-305752278501/sagemaker-scikit-learn-2020-04-27-03-17-59-725/output/model.tar.gz


## Make a request to our pipeline endpoint <a class="anchor" id="pipeline_inference_request"></a>

Here we just grab the first line from the test data (you'll notice that the inference python script is very particular about the ordering of the inference request data). The ```ContentType``` field configures the first container, while the ```Accept``` field configures the last container. You can also specify each container's ```Accept``` and ```ContentType``` values using environment variables.

We make our request with the payload in ```'text/csv'``` format, since that is what our script currently supports. If other formats need to be supported, this would have to be added to the ```output_fn()``` method in our entry point. Note that we set the ```Accept``` to ```application/json```, since Linear Learner does not support ```text/csv``` ```Accept```. The prediction output in this case is trying to guess the number of rings the abalone specimen would have given its other physical features; the actual number of rings is 10.

In [22]:
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
payload = 'M, 0.44, 0.365, 0.125, 0.516, 0.2155, 0.114, 0.155'
actual_rings = 10
predictor = RealTimePredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=csv_serializer,
    content_type=CONTENT_TYPE_CSV,
    accept=CONTENT_TYPE_JSON)

print(predictor.predict(payload))

b'{"predictions": [{"score": 9.528051376342773}]}'


In [32]:
id_payload = '1,M,0.455,0.365,0.095'
#id_payload = 'M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15'
print(predictor.predict(id_payload))
#print(joined_df.head())

b'{"predictions": [{"score": 8.982425689697266}]}'


b'{"predictions": [{"score": 9.528051376342773}]}'

## Delete Endpoint <a class="anchor" id="delete_endpoint"></a>
Once we are finished with the endpoint, we clean up the resources!

In [24]:
#sm_client = sagemaker_session.boto_session.client('sagemaker')
#sm_client.delete_endpoint(EndpointName=endpoint_name)

### Create SageMaker endpoint with pipeline
from botocore.exceptions import ClientError

# Image locations are published at: https://github.com/aws/sagemaker-sparkml-serving-container
sparkml_images = {
    'us-west-1': '746614075791.dkr.ecr.us-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-west-2': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-east-1': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-east-2': '257758044811.dkr.ecr.us-east-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-northeast-1': '354813040037.dkr.ecr.ap-northeast-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-northeast-2': '366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-southeast-1': '121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-southeast-2': '783357654285.dkr.ecr.ap-southeast-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-south-1': '720646828776.dkr.ecr.ap-south-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'eu-west-1': '141502667606.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'eu-west-2': '764974769150.dkr.ecr.eu-west-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'eu-central-1': '492215442770.dkr.ecr.eu-central-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ca-central-1': '341280168497.dkr.ecr.ca-central-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-gov-west-1': '414596584902.dkr.ecr.us-gov-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2'
}



try:
    sparkml_image = sparkml_images[region]

    response = sagemaker.create_model(
        ModelName='pipeline-xgboost',
        Containers=[
            {
                'Image': sparkml_image,
                'ModelDataUrl': 's3://{}/model/model.tar.gz'.format(bucket_name),
                'Environment': {
                    'SAGEMAKER_SPARKML_SCHEMA': '{"input":[{"type":"string","name":"buying"},{"type":"string","name":"maint"},{"type":"string","name":"doors"},{"type":"string","name":"persons"},{"type":"string","name":"lug_boot"},{"type":"string","name":"safety"}],"output":{"type":"double","name":"features","struct":"vector"}}'
                }
            },
            {
                'Image': training_image,
                'ModelDataUrl': 's3://{}/xgb/{}/output/model.tar.gz'.format(bucket_name, training_job_name)
            },
            {
                'Image': sparkml_image,
                'ModelDataUrl': 's3://{}/model/postprocess.tar.gz'.format(bucket_name),
                'Environment': {
                    'SAGEMAKER_SPARKML_SCHEMA': '{"input": [{"type": "double", "name": "label"}], "output": {"type": "string", "name": "cat"}}'
                }

            },
        ],
        ExecutionRoleArn=role
    )

    print('{}\n'.format(response))
    
except ClientError:
    print('Model already exists, continuing...')


try:
    response = sagemaker.create_endpoint_config(
        EndpointConfigName='pipeline-xgboost',
        ProductionVariants=[
            {
                'VariantName': 'DefaultVariant',
                'ModelName': 'pipeline-xgboost',
                'InitialInstanceCount': 1,
                'InstanceType': 'ml.m4.xlarge',
            },
        ],
    )
    print('{}\n'.format(response))

except ClientError:
    print('Endpoint config already exists, continuing...')


try:
    response = sagemaker.create_endpoint(
        EndpointName='pipeline-xgboost',
        EndpointConfigName='pipeline-xgboost',
    )
    print('{}\n'.format(response))

except ClientError:
    print("Endpoint already exists, continuing...")


# Monitor the status until completed
endpoint_status = sagemaker.describe_endpoint(EndpointName='pipeline-xgboost')['EndpointStatus']
while endpoint_status not in ('OutOfService','InService','Failed'):
    endpoint_status = sagemaker.describe_endpoint(EndpointName='pipeline-xgboost')['EndpointStatus']
    print(endpoint_status)
    time.sleep(30)