In [2]:
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr connect --cluster-id j-FH2OVSN51AM0 --auth-type None  

Successfully read emr cluster(j-FH2OVSN51AM0) details
Initiating EMR connection..
Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,application_1647475657060_0008,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
{"namespace": "sagemaker-analytics", "cluster_id": "j-FH2OVSN51AM0", "error_message": null, "success": true, "service": "emr", "operation": "connect"}


### Import SparkML dependencies 

In [3]:
from __future__ import print_function

import os
import shutil
import boto3

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from pyspark.ml.feature import (
    StringIndexer,
    VectorIndexer,
    OneHotEncoderEstimator,
    VectorAssembler,
    IndexToString,
)
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import *
from mleap.pyspark.spark_support import SimpleSparkSerializer

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Read input dataset from S3

In [4]:
schema = StructType(
    [
        StructField("sex", StringType(), True),
        StructField("length", DoubleType(), True),
        StructField("diameter", DoubleType(), True),
        StructField("height", DoubleType(), True),
        StructField("whole_weight", DoubleType(), True),
        StructField("shucked_weight", DoubleType(), True),
        StructField("viscera_weight", DoubleType(), True),
        StructField("shell_weight", DoubleType(), True),
        StructField("rings", DoubleType(), True),
    ]
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
total_df = spark.read.csv("s3://vasveena-test-demo/emr-sagemaker-ml/smstudio/inputdata/abalone.csv", header=False, schema=schema)
total_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+------+--------+------+------------+--------------+--------------+------------+-----+
|sex|length|diameter|height|whole_weight|shucked_weight|viscera_weight|shell_weight|rings|
+---+------+--------+------+------------+--------------+--------------+------------+-----+
|  M| 0.455|   0.365| 0.095|       0.514|        0.2245|         0.101|        0.15| 15.0|
|  M|  0.35|   0.265|  0.09|      0.2255|        0.0995|        0.0485|        0.07|  7.0|
|  F|  0.53|    0.42| 0.135|       0.677|        0.2565|        0.1415|        0.21|  9.0|
|  M|  0.44|   0.365| 0.125|       0.516|        0.2155|         0.114|       0.155| 10.0|
|  I|  0.33|   0.255|  0.08|       0.205|        0.0895|        0.0395|       0.055|  7.0|
+---+------+--------+------+------------+--------------+--------------+------------+-----+
only showing top 5 rows

Split input data into train and test (70:30)

In [6]:
(train_df, validation_df) = total_df.randomSplit([0.7, 0.3])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Abalone dataset has one categorical column - "sex" which needs to be converted to integer format before it can be passed to the Random Forest algorithm.

For that, we are using StringIndexer and OneHotEncoder from Spark to transform the categorical column and then use a VectorAssembler to produce a flat one dimensional vector for each data-point so that it can be used with the Random Forest algorithm.

In [7]:
sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")

sex_encoder = OneHotEncoderEstimator(inputCols=["indexed_sex"], outputCols=["sex_vec"])

assembler = VectorAssembler(
    inputCols=[
        "sex_vec",
        "length",
        "diameter",
        "height",
        "whole_weight",
        "shucked_weight",
        "viscera_weight",
        "shell_weight",
    ],
    outputCol="features",
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Define Random Forest model and perform training
After the data is preprocessed, we define a RandomForestClassifier, define our Pipeline comprising of both feature transformation and training stages and train the Pipeline calling .fit().

In [8]:
rf = RandomForestRegressor(labelCol="rings", featuresCol="features", maxDepth=6, numTrees=18)

pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler, rf])

model = pipeline.fit(train_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Use the trained Model to transform train and validation dataset

Next we will use this trained Model to convert our training and validation dataset to see some sample output and also measure the performance scores.The Model will apply the feature transformers on the data before passing it to the Random Forest.

In [9]:
transformed_train_df = model.transform(train_df)

transformed_validation_df = model.transform(validation_df)

transformed_validation_df.select("prediction").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+
|       prediction|
+-----------------+
|7.111950248271179|
|6.278434008717797|
|7.684682253679232|
|7.571505070167983|
|7.477313150976063|
+-----------------+
only showing top 5 rows

### Evaluating the model on train and validation dataset

Using Spark’s RegressionEvaluator, we can calculate the rmse (Root-Mean-Squared-Error) on our train and validation dataset to evaluate its performance. If the performance numbers are not satisfactory, we can train the model again and again by changing parameters of Random Forest or add/remove feature transformers.

In [10]:
evaluator = RegressionEvaluator(labelCol="rings", predictionCol="prediction", metricName="rmse")
train_rmse = evaluator.evaluate(transformed_train_df)
validation_rmse = evaluator.evaluate(transformed_validation_df)
print("Train RMSE = %g" % train_rmse)
print("Validation RMSE = %g" % validation_rmse)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Train RMSE = 2.06362
Validation RMSE = 2.31196

### Using MLeap to serialize the model

By calling the serializeToBundle method from the MLeap library, we can store the Model in a specific serialization format that can be later used for inference by sagemaker-sparkml-serving.

In [11]:
model.serializeToBundle("jar:file:/tmp/model.zip", transformed_validation_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Convert the model to tar.gz format
SageMaker expects any model format to be present in tar.gz format, but MLeap produces the model zip format. In the next cell, we unzip the model artifacts and store it in tar.gz format.

In [12]:
import zipfile

with zipfile.ZipFile("/tmp/model.zip") as zf:
    zf.extractall("/tmp/model")

import tarfile

with tarfile.open("/tmp/model.tar.gz", "w:gz") as tar:
    tar.add("/tmp/model/bundle.json", arcname="bundle.json")
    tar.add("/tmp/model/root", arcname="root")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Upload the trained model artifacts to S3
At the end, we need to upload the trained and serialized model artifacts to S3 so that it can be used for inference in SageMaker.

Please note down the S3 location to where you are uploading your model.

In [13]:
s3 = boto3.resource("s3")
file_name = os.path.join("emr/abalone/mleap", "model.tar.gz")
s3.Bucket("sagemaker-us-east-2-620614497509").upload_file("/tmp/model.tar.gz", file_name)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Delete model artifacts from local disk (optional)
If you are training multiple ML models on the same host and using the same location to save the MLeap serialized model, then you need to delete the model on the local disk to prevent MLeap library failing with an error - file already exists.

In [14]:
os.remove("/tmp/model.zip")
os.remove("/tmp/model.tar.gz")
shutil.rmtree("/tmp/model")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Hosting the model in SageMaker
Now the second phase of this Notebook begins, where we will host this model in SageMaker and perform predictions against it.

For this, we are going to switch to Python3 kernel.

### Hosting a model in SageMaker requires two components

A Docker image residing in ECR.
A trained Model residing in S3.

For SparkML, Docker image for MLeap based SparkML serving has already been prepared and uploaded to ECR by SageMaker team which anyone can use for hosting. For more information on this, please see SageMaker SparkML Serving.

MLeap serialized model was uploaded to S3 as part of the Spark job we executed in EMR in the previous steps.

### Creating the endpoint for prediction
Next we’ll create the SageMaker endpoint which will be used for performing online prediction.

For this, we have to create an instance of SparkMLModel from sagemaker-python-sdk which will take the location of the model artifacts that we uploaded to S3 as part of the EMR job.

### Passing the schema of the payload via environment variable

SparkML server also needs to know the payload of the request that’ll be passed to it while calling the predict method. In order to alleviate the pain of not having to pass the schema with every request, sagemaker-sparkml-serving lets you to pass it via an environment variable while creating the model definitions.

We will see later that you can overwrite this schema on a per request basis by passing it as part of the individual request payload as well.

This schema definition should also be passed while creating the instance of SparkMLModel.

In [2]:
import json

schema = {
    "input": [
        {"name": "sex", "type": "string"},
        {"name": "length", "type": "double"},
        {"name": "diameter", "type": "double"},
        {"name": "height", "type": "double"},
        {"name": "whole_weight", "type": "double"},
        {"name": "shucked_weight", "type": "double"},
        {"name": "viscera_weight", "type": "double"},
        {"name": "shell_weight", "type": "double"},
    ],
    "output": {"name": "prediction", "type": "double"},
}
schema_json = json.dumps(schema, indent=2)
print(schema_json)

{
  "input": [
    {
      "name": "sex",
      "type": "string"
    },
    {
      "name": "length",
      "type": "double"
    },
    {
      "name": "diameter",
      "type": "double"
    },
    {
      "name": "height",
      "type": "double"
    },
    {
      "name": "whole_weight",
      "type": "double"
    },
    {
      "name": "shucked_weight",
      "type": "double"
    },
    {
      "name": "viscera_weight",
      "type": "double"
    },
    {
      "name": "shell_weight",
      "type": "double"
    }
  ],
  "output": {
    "name": "prediction",
    "type": "double"
  }
}


In [None]:
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sparkml.model import SparkMLModel

boto3_session = boto3.session.Session()
sagemaker_client = boto3.client("sagemaker")
sagemaker_runtime_client = boto3.client("sagemaker-runtime")

# Initialize sagemaker session
session = sagemaker.Session(
    boto_session=boto3_session,
    sagemaker_client=sagemaker_client,
    sagemaker_runtime_client=sagemaker_runtime_client,
)

role = get_execution_role()

# S3 location of where you uploaded your trained and serialized SparkML model
sparkml_data = "s3://{}/{}/{}".format(
    "sagemaker-us-east-2-620614497509", "emr/abalone/mleap", "model.tar.gz"
)
model_name = "sparkml-abalone-" + timestamp_prefix
sparkml_model = SparkMLModel(
    model_data=sparkml_data,
    role=role,
    sagemaker_session=session,
    name=model_name,
    # passing the schema defined above by using an environment
    # variable that sagemaker-sparkml-serving understands
    env={"SAGEMAKER_SPARKML_SCHEMA": schema_json},
)


endpoint_name = "sparkml-abalone-ep-" + timestamp_prefix
sparkml_model.deploy(
    initial_instance_count=1, instance_type="ml.c4.xlarge", endpoint_name=endpoint_name
)

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.


---------------------------------------------------

### Invoking the newly created inference endpoint with a payload to transform the data
Now we will invoke the endpoint with a valid payload that sagemaker-sparkml-serving can recognize. There are three ways in which input payload can be passed to the request:

Pass it as a valid CSV string. In this case, the schema passed via the environment variable will be used to determine the schema. For CSV format, every column in the input has to be a basic datatype (e.g. int, double, string) and it can not be a Spark Array or Vector.

Pass it as a valid JSON string. In this case as well, the schema passed via the environment variable will be used to infer the schema. With JSON format, every column in the input can be a basic datatype or a Spark Vector or Array provided that the corresponding entry in the schema mentions the correct value.

Pass the request in JSON format along with the schema and the data. In this case, the schema passed in the payload will take precedence over the one passed via the environment variable (if any).

### Passing the payload in CSV format
We will first see how the payload can be passed to the endpoint in CSV format.

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer, JSONSerializer
from sagemaker.deserializers import JSONDeserializer


payload = "F,0.515,0.425,0.14,0.766,0.304,0.1725,0.255"

predictor = Predictor(
    endpoint_name=endpoint_name, sagemaker_session=session, serializer=CSVSerializer()
)
print(predictor.predict(payload))

### Passing the payload in JSON format
We will now pass a different payload in JSON format.

In [None]:
payload = {"data": ["F", 0.515, 0.425, 0.14, 0.766, 0.304, 0.1725, 0.255]}

predictor = Predictor(
    endpoint_name=endpoint_name, sagemaker_session=session, serializer=JSONSerializer()
)
print(predictor.predict(payload))

### Passing the payload with both schema and the data
Next we will pass the input payload comprising of both the schema and the data. If you notice carefully, this schema will be slightly different than what we have passed via the environment variable. The locations of length and sex column have been swapped and so the data. The server now parses the payload with this schema and works properly.

In [None]:
payload = {
    "schema": {
        "input": [
            {"name": "length", "type": "double"},
            {"name": "sex", "type": "string"},
            {"name": "diameter", "type": "double"},
            {"name": "height", "type": "double"},
            {"name": "whole_weight", "type": "double"},
            {"name": "shucked_weight", "type": "double"},
            {"name": "viscera_weight", "type": "double"},
            {"name": "shell_weight", "type": "double"},
        ],
        "output": {"name": "prediction", "type": "double"},
    },
    "data": [0.515, "F", 0.425, 0.14, 0.766, 0.304, 0.1725, 0.255],
}

predictor = Predictor(
    endpoint_name=endpoint_name, sagemaker_session=session, serializer=JSONSerializer()
)
print(predictor.predict(payload))

### Deleting the Endpoint (Optional)
Next we will delete the endpoint so that you do not incur the cost of keeping it running.

In [None]:
session.delete_endpoint(endpoint_name)