## TensorFlow 2 Complete Project Workflow in Amazon SageMaker
### Data Preprocessing
    
1. [Introduction](#Introduction)
3. [Exploratory Data Analysis](#EDA)
2. [SageMaker Processing for dataset transformation](#SageMakerProcessing)
    
## Introduction <a class="anchor" id="Introduction">

If you are using TensorFlow 2, you can use the Amazon SageMaker prebuilt TensorFlow 2 container with training scripts similar to those you would use outside SageMaker. This feature is named Script Mode.  Using Script Mode and other SageMaker features, you can build a complete workflow for a TensorFlow 2 project.  These notebooks presents such a workflow, including all key steps such as preprocessing data with SageMaker Processing, code prototyping with SageMaker Local Mode training and inference, and production-ready model training and deployment with SageMaker hosted training and inference. Automatic Model Tuning in SageMaker is used to tune the model's hyperparameters.  Additionally, the [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/readmelink.html) is used to automate the main training and deployment steps for use in a production workflow outside notebooks.    

To enable you to run these notebooks within a reasonable time (typically less than an hour), the use case is a straightforward regression task:  predicting house prices based on the well-known Boston Housing dataset. This public dataset contains 13 features regarding housing stock of towns in the Boston area.  Features include average number of rooms, accessibility to radial highways, adjacency to the Charles River, etc.  

To begin, we'll import some necessary packages and set up directories for local training and test data.  We'll also set up a SageMaker Session to perform various operations, and specify an Amazon S3 bucket to hold input data and output.  The default bucket used here is created by SageMaker if it doesn't already exist, and named in accordance with the AWS account ID and AWS Region.  

In [None]:
import os
import sagemaker
import tensorflow as tf

sess = sagemaker.Session()
bucket = sess.default_bucket() 

data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)

train_dir = os.path.join(os.getcwd(), 'data/train')
os.makedirs(train_dir, exist_ok=True)

test_dir = os.path.join(os.getcwd(), 'data/test')
os.makedirs(test_dir, exist_ok=True)

raw_dir = os.path.join(os.getcwd(), 'data/raw')
os.makedirs(raw_dir, exist_ok=True)

## Exploratory Data Analysis <a class="anchor" id="EDA">
    
Here we'll explore the data a bit with some visualizations.

In [None]:
from tensorflow.python.keras.datasets import boston_housing
import numpy as np
import pandas as pd

(x_train, y_train), (x_test, y_test) = boston_housing.load_data()
data = np.concatenate((x_train, x_test))
columns = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO',
           'B', 'LSTAT']
data_target = np.concatenate((y_train, y_test))
bh_df = pd.DataFrame(data=data, columns=columns)
bh_df['MEDV'] = data_target
bh_df.head()

In [None]:
bh_df.shape

In [None]:
bh_df.describe()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from scipy import stats

fig, axs = plt.subplots(ncols=7, nrows=2, figsize=(20, 10))
index = 0
axs = axs.flatten()
for k,v in bh_df.items():
    sns.boxplot(y=k, data=bh_df, ax=axs[index])
    index += 1
plt.tight_layout(pad=0.4, w_pad=0.5, h_pad=5.0)

Columns like CRIM, ZN, RM, B seems to have outliers. Let's see the outliers percentage in every column.

In [None]:
for k, v in bh_df.items():
    q1 = v.quantile(0.25)
    q3 = v.quantile(0.75)
    irq = q3 - q1
    v_col = v[(v <= q1 - 1.5 * irq) | (v >= q3 + 1.5 * irq)]
    perc = np.shape(v_col)[0] * 100.0 / np.shape(data)[0]
    print("Column %s outliers = %.2f%%" % (k, perc))

From the original data description, it says: Variable #14 seems to be censored at 50.00 (corresponding to a median price of $50,000). Based on that, values above 50.00 may not help to predict MEDV.

Let's remove MEDV outliers (MEDV >= 50.0) before plotting more distributions

In [None]:
bh_df = bh_df[~(bh_df['MEDV'] >= 50.0)].reset_index(drop=True)
print(np.shape(bh_df))

Now let's look at the distribution of features.

In [None]:
fig, axs = plt.subplots(ncols=7, nrows=2, figsize=(20, 10))
index = 0
axs = axs.flatten()
for k,v in bh_df.items():
    sns.distplot(v, ax=axs[index], kde_kws={'bw': .9})
    index += 1
plt.tight_layout(pad=0.4, w_pad=0.5, h_pad=5.0)

The histogram also shows that columns CRIM, ZN, B has highly skewed distributions. Also MEDV looks to have a normal distribution (the predictions) and other colums seem to have norma or bimodel ditribution of data except CHAS (which is a discrete variable).

Now let's plot the pairwise  correlation on data.

In [None]:
plt.figure(figsize=(20, 10))
sns.heatmap(bh_df.corr().abs(),  annot=True)

From correlation matrix, we see TAX and RAD are highly correlated features. The columns LSTAT, INDUS, RM, TAX, NOX, PTRAIO has a correlation score above 0.5 with MEDV which is a good indication of using as predictors. Let's plot these columns against MEDV. 

In [None]:
from sklearn import preprocessing

# Let's scale the columns before plotting them against MEDV
min_max_scaler = preprocessing.MinMaxScaler()
column_sels = ['LSTAT', 'INDUS', 'NOX', 'PTRATIO', 'RM', 'TAX', 'DIS', 'AGE']
x = bh_df.loc[:,column_sels]
y = bh_df['MEDV']
x = pd.DataFrame(data=min_max_scaler.fit_transform(x), columns=column_sels)
fig, axs = plt.subplots(ncols=4, nrows=2, figsize=(20, 10))
index = 0
axs = axs.flatten()
for i, k in enumerate(column_sels):
    sns.regplot(y=y, x=x[k], ax=axs[i])
plt.tight_layout(pad=0.4, w_pad=0.5, h_pad=5.0)

In [None]:
bh_df.head()

So with these analsis, we may try predict MEDV with 'LSTAT', 'INDUS', 'NOX', 'PTRATIO', 'RM', 'TAX', 'DIS', 'AGE' features. Let's try to remove the skewness of the data trough log transformation.

In [None]:
y =  np.log1p(y)
for col in x.columns:
    if np.abs(x[col].skew()) > 0.3:
        x[col] = np.log1p(x[col])

So far, our analysis has given us insight into which features to choose and how to transform our data. In the next section, we'll use SageMaker Processing to choose our features and transform them.

# SageMaker Processing for dataset transformation <a class="anchor" id="SageMakerProcessing">

Next, we'll import the dataset and transform it with SageMaker Processing, which can be used to process terabytes of data in a SageMaker-managed cluster separate from the instance running your notebook server. In a typical SageMaker workflow, notebooks are only used for prototyping and can be run on relatively inexpensive and less powerful instances, while processing, training and model hosting tasks are run on separate, more powerful SageMaker-managed instances.  SageMaker Processing includes off-the-shelf support for Scikit-learn, as well as a Bring Your Own Container option, so it can be used with many different data transformation technologies and tasks.    

First we'll load the Boston Housing dataset, save the raw feature data and upload it to Amazon S3 for transformation by SageMaker Processing.  We'll also save the labels for training and testing.

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split

# Remember bh_df is the original housing data where MEDV < 50
features = bh_df[bh_df.columns[bh_df.columns != 'MEDV']]
test = bh_df['MEDV']

# We'll transform the target variable like we did in our analysis,
# but we'll leave the transformations of features to SageMaker Processing
test = np.log1p(test)

x_train, x_test, y_train, y_test = train_test_split(features, test, test_size=0.33, random_state=42)

np.save(os.path.join(raw_dir, 'x_train.npy'), x_train)
np.save(os.path.join(raw_dir, 'x_test.npy'), x_test)
np.save(os.path.join(train_dir, 'y_train.npy'), y_train)
np.save(os.path.join(test_dir, 'y_test.npy'), y_test)
s3_prefix = 'tf-2-workflow'
rawdata_s3_prefix = '{}/data/raw'.format(s3_prefix)
raw_s3 = sess.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)

To use SageMaker Processing, simply supply a Python data preprocessing script as shown below.  For this example, we're using a SageMaker prebuilt Scikit-learn container, which includes many common functions for processing data.  There are few limitations on what kinds of code and operations you can run, and only a minimal contract:  input and output data must be placed in specified directories.  If this is done, SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete.

In [None]:
%%writefile preprocessing.py

import glob
import numpy as np
import os
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

if __name__=='__main__':
    min_max_scaler = MinMaxScaler()
    input_files = glob.glob('{}/*.npy'.format('/opt/ml/processing/input'))
    print('\nINPUT FILE LIST: \n{}\n'.format(input_files))
    for file in input_files:
        raw = np.load(file)
        # Load as dataframe
        columns = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO',
                   'B', 'LSTAT']
        df = pd.DataFrame(data=raw, columns=columns)
        # Select the features we want
        column_sels = ['LSTAT', 'INDUS', 'NOX', 'PTRATIO', 'RM', 'TAX', 'DIS', 'AGE']
        df = df[column_sels]
        # Apply transformations
        for col in df.columns:
            if np.abs(df[col].skew()) > 0.3:
                df[col] = np.log1p(df[col])
        processed = min_max_scaler.fit_transform(df.to_numpy())
        if 'train' in file:
            output_path = os.path.join('/opt/ml/processing/train', 'x_train.npy')
            np.save(output_path, processed)
            print('SAVED TRANSFORMED TRAINING DATA FILE\n')
        else:
            output_path = os.path.join('/opt/ml/processing/test', 'x_test.npy')
            np.save(output_path, processed)
            print('SAVED TRANSFORMED TEST DATA FILE\n')

Before starting the SageMaker Processing job, we instantiate a `SKLearnProcessor` object.  This object allows you to specify the instance type to use in the job, as well as how many instances.  Although the Boston Housing dataset is quite small, we'll use two instances to showcase how easy it is to spin up a cluster for SageMaker Processing.  

In [None]:
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=get_execution_role(),
                                     instance_type='ml.m5.xlarge',
                                     instance_count=2)

We're now ready to run the Processing job.  To enable distributing the data files equally among the instances, we specify the `ShardedByS3Key` distribution type in the `ProcessingInput` object.  This ensures that if we have `n` instances, each instance will receive `1/n` files from the specified S3 bucket.  It may take around 3 minutes for the following code cell to run, mainly to set up the cluster.  At the end of the job, the cluster automatically will be torn down by SageMaker.  

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from time import gmtime, strftime 

processing_job_name = "tf-2-workflow-{}".format(strftime("%d-%H-%M-%S", gmtime()))
output_destination = 's3://{}/{}/data'.format(bucket, s3_prefix)

sklearn_processor.run(code='preprocessing.py',
                      job_name=processing_job_name,
                      inputs=[ProcessingInput(
                        source=raw_s3,
                        destination='/opt/ml/processing/input',
                        s3_data_distribution_type='ShardedByS3Key')],
                      outputs=[ProcessingOutput(output_name='train',
                                                destination='{}/train'.format(output_destination),
                                                source='/opt/ml/processing/train'),
                               ProcessingOutput(output_name='test',
                                                destination='{}/test'.format(output_destination),
                                                source='/opt/ml/processing/test')])

preprocessing_job_description = sklearn_processor.jobs[-1].describe()

In the log output of the SageMaker Processing job above, you should be able to see logs in two different colors for the two different instances, and that each instance received different files.  Without the `ShardedByS3Key` distribution type, each instance would have received a copy of **all** files.  By spreading the data equally among `n` instances, you should receive a speedup by approximately a factor of `n` for most stateless data transformations.  After saving the job results locally, we'll move on to prototyping training and inference code with Local Mode.

In [None]:
train_in_s3 = '{}/train/x_train.npy'.format(output_destination)
test_in_s3 = '{}/test/x_test.npy'.format(output_destination)
!aws s3 cp {train_in_s3} ./data/train/x_train.npy
!aws s3 cp {test_in_s3} ./data/test/x_test.npy

We'll use the data and artifacts created in this notebook in downstream notebooks for model training and deployment. Store them here for later retrieval.

In [None]:
#store results to use in later notebooks
%store bucket
%store s3_prefix
%store train_dir
%store test_dir
%store x_test
%store y_test
%store raw_s3
%store processing_job_name