# End to end

In [1]:
import sagemaker
from time import gmtime, strftime

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

prefix = 'sagemaker/spark-preprocess-demo/' + timestamp_prefix
input_prefix = prefix + '/input/raw/abalone'
input_preprocessed_prefix = prefix + '/input/preprocessed/abalone'
model_prefix = prefix + '/model'
#Jay Change
mleap_model_prefix = prefix + '/mleap-model'

In [2]:
# Fetch the dataset from the SageMaker bucket
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv

# Uploading the training data to S3
sagemaker_session.upload_data(path='abalone.csv', bucket=bucket, key_prefix=input_prefix)

--2020-06-12 19:58:14--  https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv
Resolving s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)... 52.218.220.56
Connecting to s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)|52.218.220.56|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 191873 (187K) [binary/octet-stream]
Saving to: ‘abalone.csv.1’


2020-06-12 19:58:15 (772 KB/s) - ‘abalone.csv.1’ saved [191873/191873]



's3://sagemaker-us-east-1-885332847160/sagemaker/spark-preprocess-demo/2020-06-12-19-58-14/input/raw/abalone/abalone.csv'

In [3]:
%cd container
!docker build -t sagemaker-spark-example .
%cd ../

/home/ec2-user/SageMaker/airflow-sagemaker/end-to-end-pipeline/container
Sending build context to Docker daemon  17.36MB
Step 1/34 : FROM openjdk:8-jre-slim
 ---> 73c63778326a
Step 2/34 : RUN apt-get update
 ---> Using cache
 ---> 4e81f4104846
Step 3/34 : RUN apt-get install -y curl unzip python3 python3-setuptools python3-pip python-dev python3-dev python-psutil
 ---> Using cache
 ---> cc99c960ee50
Step 4/34 : RUN pip3 install py4j psutil==5.6.5 mleap==0.8.1 boto3
 ---> Using cache
 ---> a167dfbb669b
Step 5/34 : RUN apt-get clean
 ---> Using cache
 ---> 16778c248737
Step 6/34 : RUN rm -rf /var/lib/apt/lists/*
 ---> Using cache
 ---> 3554ce1d7583
Step 7/34 : ENV PYTHONHASHSEED 0
 ---> Using cache
 ---> 3ff32b88f6ad
Step 8/34 : ENV PYTHONIOENCODING UTF-8
 ---> Using cache
 ---> c4ba24c4a1fe
Step 9/34 : ENV PIP_DISABLE_PIP_VERSION_CHECK 1
 ---> Using cache
 ---> cd54353b9089
Step 10/34 : ENV HADOOP_VERSION 3.0.0
 ---> Using cache
 ---> 83d8f51e985c
Step 11/34 : ENV HADOOP_HOME /usr/hadoo

In [4]:
import boto3
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

ecr_repository = 'sagemaker-spark-example'
tag = ':latest'
uri_suffix = 'amazonaws.com'
if region in ['cn-north-1', 'cn-northwest-1']:
    uri_suffix = 'amazonaws.com.cn'
spark_repository_uri = '{}.dkr.ecr.{}.{}/{}'.format(account_id, region, uri_suffix, ecr_repository + tag)

# Create ECR repository and push docker image
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
!aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $spark_repository_uri
!docker push $spark_repository_uri

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded

An error occurred (RepositoryAlreadyExistsException) when calling the CreateRepository operation: The repository with name 'sagemaker-spark-example' already exists in the registry with id '885332847160'
The push refers to repository [885332847160.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-example]

[1Be3313c4d: Preparing 
[1B64df61d9: Preparing 
[1B79f1db19: Preparing 
[1B6a639273: Preparing 
[1Beba1bef5: Preparing 
[1Bfa011e85: Preparing 
[1Bbc7e7a20: Preparing 
[1B87ebe267: Preparing 
[1B98511571: Preparing 
[1Bdddeb7ee: Preparing 
[1B81dc20c2: Preparing 
[1B760baedf: Preparing 
[1B3663cf66: Preparing 
[1B29cec5e1: Preparing 
[2B29cec5e1: Layer already exists [11A[1K[K[6A[1K[K[3A[1K[Klatest: digest: sha256:2edc2d586f244e0211d305b1fc01ca257e113dc2a4d3c9dd4dc58469aaf45799 size: 3474


In [5]:
%%writefile preprocess.py
from __future__ import print_function
from __future__ import unicode_literals

import time
import sys
import os
import shutil
import csv

import boto3
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import *
from mleap.pyspark.spark_support import SimpleSparkSerializer

def csv_line(data):
    r = ','.join(str(d) for d in data[1])
    return str(data[0]) + "," + r


def main():
    spark = SparkSession.builder.appName("PySparkAbalone").getOrCreate()
    
    # Convert command line args into a map of args
    args_iter = iter(sys.argv[1:])
    args = dict(zip(args_iter, args_iter))
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    spark.sparkContext._jsc.hadoopConfiguration().set("mapred.output.committer.class",
                                                      "org.apache.hadoop.mapred.FileOutputCommitter")
    
    # Defining the schema corresponding to the input data. The input data does not contain the headers
    schema = StructType([StructField("sex", StringType(), True), 
                         StructField("length", DoubleType(), True),
                         StructField("diameter", DoubleType(), True),
                         StructField("height", DoubleType(), True),
                         StructField("whole_weight", DoubleType(), True),
                         StructField("shucked_weight", DoubleType(), True),
                         StructField("viscera_weight", DoubleType(), True), 
                         StructField("shell_weight", DoubleType(), True), 
                         StructField("rings", DoubleType(), True)])

    # Downloading the data from S3 into a Dataframe
    total_df = spark.read.csv(('s3a://' + os.path.join(args['s3_input_bucket'], args['s3_input_key_prefix'],
                                                   'abalone.csv')), header=False, schema=schema)

    #StringIndexer on the sex column which has categorical value
    sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")
    
    #one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
    sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

    #vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format
    assembler = VectorAssembler(inputCols=["sex_vec", 
                                           "length", 
                                           "diameter", 
                                           "height", 
                                           "whole_weight", 
                                           "shucked_weight", 
                                           "viscera_weight", 
                                           "shell_weight"], 
                                outputCol="features")
    
    # The pipeline comprises of the steps added above
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])
    
    # This step trains the feature transformers
    model = pipeline.fit(total_df)
    
    # This step transforms the dataset with information obtained from the previous fit
    transformed_total_df = model.transform(total_df)
    
    # Split the overall dataset into 80-20 training and validation
    (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])
    
    # Convert the train dataframe to RDD to save in CSV format and upload to S3
    train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
    train_lines = train_rdd.map(csv_line)
    train_lines.saveAsTextFile('s3a://' + os.path.join(args['s3_output_bucket'], args['s3_output_key_prefix'], 'train'))
    
    # Convert the validation dataframe to RDD to save in CSV format and upload to S3
    validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
    validation_lines = validation_rdd.map(csv_line)
    validation_lines.saveAsTextFile('s3a://' + os.path.join(args['s3_output_bucket'], args['s3_output_key_prefix'], 'validation'))
    
    # Serialize and store the model via MLeap  
    SimpleSparkSerializer().serializeToBundle(model, "jar:file:/opt/ml/model.zip", validation_df)    
    # Unzip the model as SageMaker expects a .tar.gz file but MLeap produces a .zip file
    import zipfile
    with zipfile.ZipFile("/opt/ml/model.zip") as zf:
        zf.extractall("/opt/ml/model")

    # Writw back the content as a .tar.gz file
    import tarfile
    with tarfile.open("/opt/ml/model.tar.gz", "w:gz") as tar:
        tar.add("/opt/ml/model/bundle.json", arcname='bundle.json')
        tar.add("/opt/ml/model/root", arcname='root')
    
    # Upload the model in tar.gz format to S3 so that it can be used with SageMaker for inference later
    s3 = boto3.resource('s3') 
    file_name = os.path.join(args['s3_mleap_model_prefix'], 'model.tar.gz')
    s3.Bucket(args['s3_model_bucket']).upload_file('/opt/ml/model.tar.gz', file_name)    

if __name__ == "__main__":
    main()

Overwriting preprocess.py


In [6]:
from sagemaker.processing import ScriptProcessor, ProcessingInput
spark_processor = ScriptProcessor(base_job_name='spark-preprocessor',
                                  image_uri=spark_repository_uri,
                                  command=['/opt/program/submit'],
                                  role=role,
                                  instance_count=2,
                                  instance_type='ml.r5.xlarge',
                                  max_runtime_in_seconds=1200,
                                  env={'mode': 'python'})

spark_processor.run(code='preprocess.py',
                    arguments=['s3_input_bucket', bucket,
                               's3_input_key_prefix', input_prefix,
                               's3_output_bucket', bucket,
                               's3_output_key_prefix', input_preprocessed_prefix,
                               's3_model_bucket', bucket,
                               's3_mleap_model_prefix', mleap_model_prefix],
                    logs=True)


Job Name:  spark-preprocessor-2020-06-12-19-58-18-035
Inputs:  [{'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-885332847160/spark-preprocessor-2020-06-12-19-58-18-035/input/code/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  []
[34m2020-06-12 20:02:17,638 INFO namenode.NameNode: STARTUP_MSG: [0m
[34m/************************************************************[0m
[34mSTARTUP_MSG: Starting NameNode[0m
[34mSTARTUP_MSG:   host = algo-1/10.0.202.182[0m
[34mSTARTUP_MSG:   args = [-format, -force][0m
[34mSTARTUP_MSG:   version = 3.0.0[0m
[34mSTARTUP_MSG:   classpath = /usr/hadoop-3.0.0/etc/hadoop:/usr/hadoop-3.0.0/share/hadoop/common/lib/junit-4.11.jar:/usr/hadoop-3.0.0/share/hadoop/common/lib/curator-framework-2.12.0.jar:/usr/hadoop-3.0.0/share/hadoop/common/lib/jsch-0.1.54.jar:/usr/hadoop-3.0.0/share/hadoop

In [7]:
print('Top 5 rows from s3://{}/{}/train/'.format(bucket, input_preprocessed_prefix))
!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix/train/part-00000 - | head -n5

Top 5 rows from s3://sagemaker-us-east-1-885332847160/sagemaker/spark-preprocess-demo/2020-06-12-19-58-14/input/preprocessed/abalone/train/
5.0,0.0,0.0,0.275,0.195,0.07,0.08,0.031,0.0215,0.025
6.0,0.0,0.0,0.29,0.21,0.075,0.275,0.113,0.0675,0.035
7.0,0.0,0.0,0.305,0.225,0.07,0.1485,0.0585,0.0335,0.045
7.0,0.0,0.0,0.305,0.23,0.08,0.156,0.0675,0.0345,0.048
7.0,0.0,0.0,0.325,0.26,0.09,0.1915,0.085,0.036,0.062


In [8]:
from sagemaker.amazon.amazon_estimator import get_image_uri

training_image = get_image_uri(sagemaker_session.boto_region_name, 'xgboost', repo_version="0.90-1")
print(training_image)

683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:0.90-1-cpu-py3


In [9]:
s3_train_data = 's3://{}/{}/{}'.format(bucket, input_preprocessed_prefix, 'train/part')
s3_validation_data = 's3://{}/{}/{}'.format(bucket, input_preprocessed_prefix, 'validation/part')
s3_output_location = 's3://{}/{}/{}'.format(bucket, prefix, 'xgboost_model')

xgb_model = sagemaker.estimator.Estimator(training_image,
                                          role, 
                                          train_instance_count=1, 
                                          train_instance_type='ml.m4.xlarge',
                                          train_volume_size = 20,
                                          train_max_run = 3600,
                                          input_mode= 'File',
                                          output_path=s3_output_location,
                                          sagemaker_session=sagemaker_session)

xgb_model.set_hyperparameters(objective = "reg:linear",
                              eta = .2,
                              gamma = 4,
                              max_depth = 5,
                              num_round = 10,
                              subsample = 0.7,
                              silent = 0,
                              min_child_weight = 6)

train_data = sagemaker.session.s3_input(s3_train_data, distribution='FullyReplicated', 
                        content_type='text/csv', s3_data_type='S3Prefix')
validation_data = sagemaker.session.s3_input(s3_validation_data, distribution='FullyReplicated', 
                             content_type='text/csv', s3_data_type='S3Prefix')

data_channels = {'train': train_data, 'validation': validation_data}

In [10]:
xgb_model.fit(inputs=data_channels, logs=True)

2020-06-12 20:05:06 Starting - Starting the training job...
2020-06-12 20:05:09 Starting - Launching requested ML instances.........
2020-06-12 20:06:53 Starting - Preparing the instances for training......
2020-06-12 20:08:04 Downloading - Downloading input data...
2020-06-12 20:08:29 Training - Downloading the training image...
2020-06-12 20:09:05 Uploading - Uploading generated training model
2020-06-12 20:09:05 Completed - Training job completed
[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter objective value reg:linear to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Running XGBoost Sagemaker in algorithm mode[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Deter

In [11]:
import json
schema = {
    "input": [
        {
            "name": "sex",
            "type": "string"
        }, 
        {
            "name": "length",
            "type": "double"
        }, 
        {
            "name": "diameter",
            "type": "double"
        }, 
        {
            "name": "height",
            "type": "double"
        }, 
        {
            "name": "whole_weight",
            "type": "double"
        }, 
        {
            "name": "shucked_weight",
            "type": "double"
        },
        {
            "name": "viscera_weight",
            "type": "double"
        }, 
        {
            "name": "shell_weight",
            "type": "double"
        }, 
    ],
    "output": 
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
}
schema_json = json.dumps(schema)
print(schema_json)

{"input": [{"name": "sex", "type": "string"}, {"name": "length", "type": "double"}, {"name": "diameter", "type": "double"}, {"name": "height", "type": "double"}, {"name": "whole_weight", "type": "double"}, {"name": "shucked_weight", "type": "double"}, {"name": "viscera_weight", "type": "double"}, {"name": "shell_weight", "type": "double"}], "output": {"name": "features", "type": "double", "struct": "vector"}}


In [12]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sparkml.model import SparkMLModel

sparkml_data = 's3://{}/{}/{}'.format(bucket, mleap_model_prefix, 'model.tar.gz')
# passing the schema defined above by using an environment variable that sagemaker-sparkml-serving understands
sparkml_model = SparkMLModel(model_data=sparkml_data, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json})
xgb_model = Model(model_data=xgb_model.model_data, image=training_image)

model_name = 'inference-pipeline-' + timestamp_prefix
sm_model = PipelineModel(name=model_name, role=role, models=[sparkml_model, xgb_model])

In [13]:
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

---------------!

In [14]:
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
payload = "F,0.515,0.425,0.14,0.766,0.304,0.1725,0.255"
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sagemaker_session, serializer=csv_serializer,
                                content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_CSV)
print(predictor.predict(payload))

b'11.162177085876465'


In [15]:
payload = {"data": ["F",0.515,0.425,0.14,0.766,0.304,0.1725,0.255]}
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sagemaker_session, serializer=json_serializer,
                                content_type=CONTENT_TYPE_JSON, accept=CONTENT_TYPE_CSV)

print(predictor.predict(payload))

b'11.162177085876465'


BATCH TRANSFORM

In [16]:
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/batch_input_abalone.csv
!printf "\n\nShowing first five lines\n\n"    
!head -n 5 batch_input_abalone.csv 
!printf "\n\nAs we can see, it is identical to the training file apart from the label being absent here.\n\n"  

--2020-06-12 20:16:51--  https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/batch_input_abalone.csv
Resolving s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)... 52.218.248.8
Connecting to s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)|52.218.248.8|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 654 [text/csv]
Saving to: ‘batch_input_abalone.csv.1’


2020-06-12 20:16:51 (17.6 MB/s) - ‘batch_input_abalone.csv.1’ saved [654/654]



Showing first five lines

M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15
M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07
F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21
M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155
I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055


As we can see, it is identical to the training file apart from the label being absent here.



In [17]:
batch_input_loc = sagemaker_session.upload_data(path='batch_input_abalone.csv', bucket=bucket, key_prefix=prefix+'/batch')

In [18]:
input_data_path = 's3://{}/{}/{}'.format(bucket, prefix + '/batch', 'batch_input_abalone.csv')
output_data_path = 's3://{}/{}'.format(bucket, prefix + '/batch_output/abalone')
job_name = 'serial-inference-batch-' + timestamp_prefix
transformer = sagemaker.transformer.Transformer(
    # This was the model created using PipelineModel and it contains feature processing and XGBoost
    model_name = model_name,
    instance_count = 1,
    instance_type = 'ml.m4.xlarge',
    strategy = 'SingleRecord',
    assemble_with = 'Line',
    output_path = output_data_path,
    base_transform_job_name='serial-inference-batch',
    sagemaker_session=sagemaker_session,
    accept = CONTENT_TYPE_CSV
)
transformer.transform(data = input_data_path,
                      job_name = job_name,
                      content_type = CONTENT_TYPE_CSV, 
                      split_type = 'Line')
transformer.wait()

..........................[34m  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \[0m
[35m  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \[0m
[34m( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 :: Spring Boot ::                  (v2.2)
[0m
[34m2020-06-12 20:20:58.238  INFO 7 --- [           main] com.amazonaws.sagemaker.App              : Starting App v2.2 on aa16cce3a6af with PID 7 (/usr/local/lib/sparkml-serving-2.2.jar started by root in /sagemaker-sparkml-model-server)[0m
[34m2020-06-12 20:20:58.254  INFO 7 --- [           main] com.amazonaws.sagemaker.App              : No active profile set, falling back to default profiles: default[0m
[35m( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 :: Spring Boot ::                  (v2.2)
[0m
[35m2020-06-12 20:2

In [20]:
s3 = boto3.resource('s3')

KEY = prefix + '/batch_output/abalone/batch_input_abalone.csv.out'
s3.Bucket(bucket).download_file(KEY, 'batch_output_abalone.csv')

!head batch_output_abalone.csv

8.722335815429688
7.243444442749023
9.691205978393555
9.056212425231934
5.777616500854492
6.850498199462891
12.855364799499512
10.699277877807617
9.3707914352417
11.894083976745605
