# EMR Notebook SageMaker Custom Abalone Ring Estimator

1. [Setup](#Setup)
2. [Load the Data](#Load-the-Data)
3. [Train the Model](#Train-the-Model)
4. [Inference Results](#Inference-Results)
5. [Wrap-Up](#Wrap-Up)

## Setup
Each EMR notebook is launched with its own Spark context (variable sc). First we need to install the Python packages that we'll use throughout the notebook. EMR notebooks come with a default set of libraries for data processing. You can see which libraries are installed on the notebook by calling the Spark Context's list_packages() function. 

In [1]:
sc.list_packages()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
17,application_1573168723671_0018,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version
-------------------------- -------
beautifulsoup4             4.8.0  
boto                       2.49.0 
jmespath                   0.9.4  
lxml                       4.4.1  
mysqlclient                1.4.4  
nltk                       3.4.5  
nose                       1.3.4  
numpy                      1.14.5 
pip                        19.3.1 
py-dateutil                2.2    
python36-sagemaker-pyspark 1.2.4  
pytz                       2019.2 
PyYAML                     3.11   
setuptools                 41.6.0 
six                        1.12.0 
soupsieve                  1.9.3  
wheel                      0.33.6 
windmill                   1.6

To comunicate with SageMaker we need to install notebook scoped libraries. These libraries are available only during the notebook session. After the session ends, the libraries are deleted. 

We install [boto3 (the AWS Python 3 SDK)](https://aws.amazon.com/sdk-for-python/) and the [high level SageMaker SDK](https://sagemaker.readthedocs.io/en/stable/). 

In [2]:
sc.install_pypi_package("boto3");
sc.install_pypi_package('sagemaker');

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting boto3
  Downloading https://files.pythonhosted.org/packages/9b/11/7e6470f5d7d7d23fc5eaae64f8a3f4b844ca08234cc3207df027267b65c4/boto3-1.10.26-py2.py3-none-any.whl (128kB)
Collecting s3transfer<0.3.0,>=0.2.0
  Using cached https://files.pythonhosted.org/packages/16/8a/1fc3dba0c4923c2a76e1ff0d52b305c44606da63f718d14d3231e21c51b0/s3transfer-0.2.1-py2.py3-none-any.whl
Collecting botocore<1.14.0,>=1.13.26
  Downloading https://files.pythonhosted.org/packages/8a/93/ea2ec042794dfda186348df02c6057223a8bbc21c055124fbe3e16925441/botocore-1.13.26-py2.py3-none-any.whl (5.6MB)
Collecting docutils<0.16,>=0.10
  Using cached https://files.pythonhosted.org/packages/22/cd/a6aa959dca619918ccb55023b4cb151949c64d4d5d55b3f4ffd7eee0c6e8/docutils-0.15.2-py3-none-any.whl
Collecting python-dateutil<2.8.1,>=2.1; python_version >= "2.7"
  Using cached https://files.pythonhosted.org/packages/41/17/c62faccbfbd163c7f57f3844689e3a78bae1f403648a6afb1d0866d87fbb/python_dateutil-2.8.0-py2.py3-none-any.whl
Col

Now we will specify the user specific parameters. Make sure to put in the SageMaker Execution Role ARNthat you created earlier in the IAM console.

In [3]:
#define user specific parameters
region = 'us-west-2'
source_bucket = 's3://emr-lab-income-dataset/'
#The IAM role that SageMaker will use to access other AWS resources.
sagemaker_execution_role = ''
#The number of EMR nodes to process the data.
num_workers = 12

if (region and source_bucket and sagemaker_execution_role and num_workers):
    print('All necessary user parameters are entered.')
else:
    print('Please check to make sure you entered all default parameters!')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

All necessary user parameters are entered.

In [8]:
import boto3
import sagemaker

#We initiate a session for the boto3 and sagemaker APIs. The session includes information necessary to call the
#AWS APIs, such as AWS credentials and default AWS region. For this lab we will leverage the IAM role attached to
#the EMR notebook, so we only need to provide a region.
boto_sess = boto3.Session(region_name=region)
sage_sdk_session = sagemaker.Session(boto_session=boto_sess)
bucket = sage_sdk_session.default_bucket()

print('A SageMaker session was initiated! You are using {} as your S3 bucket for intermediate files.'.format(bucket))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

A SageMaker session was initiated! You are using sagemaker-us-west-2-883624334343 as your S3 bucket for intermediate files.

## Load the Data

We will use the public abalone data set from the [UC Irvine Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Abalone)
to train and test a regression model.

   Given in the dataset is the attribute name, attribute type, the measurement unit and a
   brief description.  The number of rings is the value to predict: either
   as a continuous value or as a classification problem.
   
   The age of an abalone is the number of rings in the shell + 1.5 years. Without a model researchers must cut through the abalone shell
   and use a microscope to count the rings. Using a model to predict rings eliminates this time consuming process.

	Name			Data Type		Meas.	Description
	----			---------		-----	-----------
	Rings			integer					+1.5 gives the age in years
	Length			continuous		mm		Longest shell measurement
	Diameter		continuous		mm		perpendicular to length
	Height			continuous		mm		with meat in shell
	Whole weight	continuous		grams	whole abalone
	Shucked weight	continuous		grams	weight of meat
	Viscera weight	continuous		grams	gut weight (after bleeding)
	Shell weight	continuous		grams	after being dried
	Male			integer			1/0 	1 encodes true, 0 false
	Female			integer			1/0 	1 encodes true, 0 false
	Infant			integer			1/0 	1 encodes true, 0 false

In [4]:
#Download down dataset from S3
abalone_data = spark.read.load(source_bucket + 'clean/', format='csv', inferSchema=True, header=True).repartition(num_workers)
abalone_data.show(n=5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------+--------+------+------------+--------------+--------------+------------+----+------+------+
|Rings|Length|Diameter|Height|Whole_weight|Shucked_weight|Viscera_weight|Shell_weight|Male|Female|Infant|
+-----+------+--------+------+------------+--------------+--------------+------------+----+------+------+
|    9| 0.665|   0.505| 0.165|       1.349|        0.5985|        0.3175|        0.36|   0|     1|     0|
|    7| 0.505|    0.39| 0.185|      0.6125|         0.267|         0.142|       0.172|   0|     0|     1|
|   13| 0.575|   0.475|  0.17|       0.967|        0.3775|         0.284|       0.275|   0|     0|     1|
|    9| 0.455|   0.355| 0.105|       0.372|         0.138|        0.0765|       0.135|   0|     0|     1|
|   17|  0.52|   0.425| 0.155|      0.7735|         0.297|         0.123|       0.255|   1|     0|     0|
+-----+------+--------+------+------------+--------------+--------------+------------+----+------+------+
only showing top 5 rows

Now that the data is in Spark we can take advantage of Spark to modify and enhance our data. As an example, including all four abalone weights may be unnecessary, as they capture a lot of the same information. Let's try dropping all weight columns except for the shucked weight.

In [6]:
abalone_data = abalone_data.drop('Whole_weight', 'Viscera_weight', 'Shell_weight')
abalone_data.show(n=5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------+--------+------+--------------+----+------+------+
|Rings|Length|Diameter|Height|Shucked_weight|Male|Female|Infant|
+-----+------+--------+------+--------------+----+------+------+
|   10|  0.73|   0.555|  0.18|        0.6555|   0|     1|     0|
|    8| 0.435|    0.34|  0.11|        0.1495|   0|     0|     1|
|   16| 0.565|   0.465|  0.15|         0.377|   1|     0|     0|
|    9|  0.61|    0.47|  0.16|         0.449|   1|     0|     0|
|   13|   0.6|    0.48| 0.175|        0.4125|   1|     0|     0|
+-----+------+--------+------+--------------+----+------+------+
only showing top 5 rows

12

In [10]:
#Split the dataframe in to training and validation data.
#The training will be used to refine our model.
#The test data will be used to measure the model's accuracy.
train_data, test_data = abalone_data.randomSplit([.75,.25])

s3_train = 's3://'+ bucket + '/train/'
s3_test = 's3://'+ bucket + '/test/'
data_format = 'csv'

#Save the data in to S3 for training by SageMaker
train_data.write.save(s3_train, format=data_format, mode='overwrite')
test_data.write.save(s3_test, format=data_format, mode='overwrite')

print('Training dataset saved in {} format to {}!'.format(data_format, s3_train))
print('Testing dataset saved in {} format to {}!'.format(data_format, s3_test))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Training dataset saved in csv format to s3://sagemaker-us-west-2-883624334343/train/!
Testing dataset saved in csv format to s3://sagemaker-us-west-2-883624334343/test/!

## Train the Model
SageMaker contains several common built-in algorithms. For this lab you have the choice of using either the [LinearLearner](https://docs.aws.amazon.com/sagemaker/latest/dg/linear-learner.html) or [XGBoost](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) built-in algorithms.

In [11]:
#Uncomment the LinearLearner line to use the LinearLearner algorithm. 
model = 'XGBoost'
#model = 'LinearLeaner'

print('The SageMaker {} model will be used.'.format(model))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The SageMaker XGBoost model will be used.

The following cell defines the hyperparameters for each algorithn. You may leave them as the defaults, but if you are interested you could try changing a few to see if it improves model performance.

In [12]:
#Set the regularization weights. Increasing these will reduce how closely the model fits to the training data.
l1 = .25
l2 = .25

#Each SageMaker algorithm has a container that SageMaker uses for training/inference.
#These are the paths to the public container images for the SageMaker built in algorithms.
images = {
    'XGBoost': '433757028032.dkr.ecr.{}.amazonaws.com/xgboost:latest'.format(region),
    'LinearLearner': '174872318107.dkr.ecr.{}.amazonaws.com/linear-learner:latest'.format(region)
}

#Hyperparameters for XGBoost algorithm
xgboost_params = {
    'num_round':100,
    'objective': 'reg:linear',
    'alpha': l1,
    'lambda': l2
}

#Hyperparameters for LinearLearner algorithm
linear_params = {
    'feature_dim':len(abalone_data.columns)-1,
    'predictor_type': 'regressor',
    'loss': 'squared_loss',
    'l1': l1,
    'wd': l2
}

hyperparams = {
    'LinearLearner': linear_params,
    'XGBoost': xgboost_params
}

print('All model parameters have been set!')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

All model parameters have been set!

In [13]:
estimator = sagemaker.estimator.Estimator(
    image_name=images[model],
    role=sagemaker_execution_role, 
    train_instance_count=1, 
    train_instance_type='ml.m5.large',
    sagemaker_session=sage_sdk_session, 
    hyperparameters=hyperparams[model]
)

print('The SageMaker model was constructed with parameters: {}.'.format(estimator.hyperparameters()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The SageMaker model was constructed with parameters: {'num_round': 100, 'objective': 'reg:linear', 'alpha': 0.25, 'lambda': 0.25}.

Now that we initialized the model, we can train the model by calling the fit() function. After calling fit(), SageMaker will create a training instance, train a model on the instance, save the model artifacts to S3, then take down the training instance.

This usually takes about 3 minutes. 

(**Optional**) While you wait, you may check the model training progress through the SageMaker console by following these instructions:  
a.	Open SageMaker console in AWS.  
b.	On the left panel, scroll until you see ‘training jobs’ beneath the ‘Training’ section.  
c.	Click into the job to examine further details; wait until you see the status change to ‘Completed’.


In [19]:
train_channel = sagemaker.session.s3_input(s3_train + 'part', content_type='text/csv')
estimator.fit({'train': train_channel, 'validation': valid_channel})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2019-11-22 22:24:19 Starting - Starting the training job...
2019-11-22 22:24:23 Starting - Launching requested ML instances......
2019-11-22 22:25:25 Starting - Preparing the instances for training...
2019-11-22 22:26:15 Downloading - Downloading input data...
2019-11-22 22:26:49 Training - Downloading the training image...
2019-11-22 22:27:15 Uploading - Uploading generated training model
2019-11-22 22:27:15 Completed - Training job completed
Arguments: train
[2019-11-22:22:27:03:INFO] Running standalone xgboost training.
[2019-11-22:22:27:03:INFO] File size need to be processed in the node: 0.12mb. Available memory size in the node: 275.8mb
[2019-11-22:22:27:03:INFO] Determined delimiter of CSV input is ','
[22:27:03] S3DistributionType set as FullyReplicated
[22:27:03] 3107x7 matrix with 21749 entries loaded from /opt/ml/input/data/train?format=csv&label_column=0&delimiter=,
[2019-11-22:22:27:03:INFO] Determined delimiter of CSV input is ','
[22:27:03] S3DistributionType set as Full

## Inference Results

How well did our model perform? Let's see how it does on the test data set we saved to S3 earlier. We'll use [SageMaker batch transform](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html) to run our test data set through our model. Batch transform creates a SageMaker instance, deploys the model, runs the dataset through the model, then takes down the instance. 

In [15]:
s3_inference = s3_train.replace('train', 'inference')

transformer = estimator.transformer(
    instance_count = 1,
    instance_type = 'ml.m5.large',
    strategy = 'MultiRecord',
    output_path = s3_inference,
    assemble_with= 'Line',
    accept=('text/'+data_format)
)

print('SageMaker batch transform initialized with the following parameters:')
for key in transformer.__dict__:
    print('{}:{}'.format(key, transformer.__dict__[key]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SageMaker batch transform initialized with the following parameters:
model_name:xgboost-2019-11-22-21-57-53-853
strategy:MultiRecord
env:None
output_path:s3://sagemaker-us-west-2-883624334343/inference/
output_kms_key:None
accept:text/csv
assemble_with:Line
instance_count:1
instance_type:ml.m5.large
volume_kms_key:None
max_concurrent_transforms:None
max_payload:None
tags:None
base_transform_job_name:None
_current_job_name:None
latest_transform_job:None
_reset_output_path:False
sagemaker_session:<sagemaker.session.Session object at 0x7f6b9fe02da0>

The transform() function initiates the SageMaker batch transform job. SageMaker will create an inference instance, run the specified test set through the model, save the results to S3, and take down the inference instance. Batch transform is a great option if you require inference for large datasets and don't need sub-second response time.

This usually takes 3 minutes. 

(**Optional**) While you wait, you may check the batch transform progress through the SageMaker console by following these instructions:  
a.	Open SageMaker console in AWS.  
b.	On the left panel, scroll until you see ‘Batch transform jobs’ beneath the ‘Inference’ section.  
c.	Click into the job to examine further details; wait until you see the status change to ‘Completed’.

In [16]:
#The test data set still contains the "Rings" column the model tries to predict. 
#We do not want to send this column to the model, though. We use the SageMaker
#input_filter to filter out that column before sending to the model. We then
#join the model output with the input so we can compare the actual Rings count
#to the predicted count.
transformer.transform(
    data=s3_test,
    content_type='text/csv',
    split_type='Line',
    input_filter='$[1:]',
    join_source='Input',
    wait=True
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

....................................!

SageMaker batch transform completed and saved the model inference results to S3. Now let's pull the results in to Spark for analysis.

In [17]:
from pyspark.sql.types import FloatType
from copy import deepcopy

#Read the schema from the initial dataset so you can apply it to the inference data.
schema = deepcopy(abalone_data.schema)
schema.add("Estimated_rings", FloatType())

#Pull down the inference data from S3
inference_data = spark.read.load(s3_inference, format=data_format, schema=schema).repartition(num_workers)
inference_data.show(n=5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------+--------+------+--------------+----+------+------+---------------+
|Rings|Length|Diameter|Height|Shucked_weight|Male|Female|Infant|Estimated_rings|
+-----+------+--------+------+--------------+----+------+------+---------------+
|    7| 0.505|    0.38|  0.12|        0.2595|   0|     0|     1|       8.981371|
|    9|  0.65|     0.5|  0.16|         0.702|   0|     1|     0|       9.416534|
|   10|  0.41|   0.315|  0.11|        0.1255|   0|     1|     0|       10.04625|
|    8| 0.435|    0.34|  0.11|        0.1495|   0|     0|     1|       8.155085|
|    8|  0.48|   0.355| 0.115|          0.25|   0|     0|     1|      7.1054344|
+-----+------+--------+------+--------------+----+------+------+---------------+
only showing top 5 rows

Now that we have our results, we need to quantify our model's performance. We will use root mean square error (RMSE) to measure how close Estimated_rings is to the actual Rings value.

RMSE is a popular way to measure how closely a regression model predicts a response. A lower RMSE indicates a closer prediction.

Here is the equation for RMSE:

\begin{equation*}
RMSE = \sqrt{\frac{\sum_{i=1}^n (\hat{y_i}-y_i)^2}{N}}
\end{equation*}

where $\hat{y_i}$ is the number of predicted rings, $y_i$ is the observed number of rings, and N is the number of rows in the test data set.

We'll use Spark SQL to run a SQL query on our data to calculate the RMSE.

In [18]:
rings = inference_data.schema.names[0]
predicted_rings = inference_data.schema.names[-1]
table_name = 'inference'

inference_data.registerTempTable(table_name)
sql_rmse = 'SELECT SQRT(AVG(POWER({}-{}, 2))) AS RMSE FROM {}'.format(rings, predicted_rings, table_name)

rmse_results = spark.sql(sql_rmse)
rmse_results.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+
|              RMSE|
+------------------+
|2.6602393658357797|
+------------------+

## Wrap-Up
Congratulations! You processed data in Apache Spark on EMR and trained and deployed a machine learning model in Amazon SageMaker! Feel free to try different combinations of models and hyperparameters to see if you can reduce your model's RMSE.