In [None]:

# Import SageMaker Python SDK to get the Session and execution_role
import sagemaker
from sagemaker import get_execution_role
sess = sagemaker.Session()
role = get_execution_role()
print(role[role.rfind('/') + 1:])

In [None]:
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv

In [None]:
import boto3
import botocore
from botocore.exceptions import ClientError

boto_session = sess.boto_session
s3 = boto_session.resource('s3')
account = boto_session.client('sts').get_caller_identity()['Account']
region = boto_session.region_name
default_bucket = 'aws-glue-{}-{}'.format(account, region)

try:
    if region == 'us-east-1':
        s3.create_bucket(Bucket=default_bucket)
    else:
        s3.create_bucket(Bucket=default_bucket, CreateBucketConfiguration={'LocationConstraint': region})
except ClientError as e:
    error_code = e.response['Error']['Code']
    message = e.response['Error']['Message']
    if error_code == 'BucketAlreadyOwnedByYou':
        print ('A bucket with the same name already exists in your account - using the same bucket.')
        pass        

# Uploading the training data to S3
sess.upload_data(path='abalone.csv', bucket=default_bucket, key_prefix='input/abalone')

In [None]:
script_location = sess.upload_data(path='abalone_processing.py', bucket=default_bucket, key_prefix='codes')

In [None]:

!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/python/python.zip
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/jar/mleap_spark_assembly.jar

In [None]:
python_dep_location = sess.upload_data(path='python.zip', bucket=default_bucket, key_prefix='dependencies/python')
jar_dep_location = sess.upload_data(path='mleap_spark_assembly.jar', bucket=default_bucket, key_prefix='dependencies/jar')

In [None]:
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

# Input location of the data, We uploaded our train.csv file to input key previously
s3_input_bucket = default_bucket
s3_input_key_prefix = 'input/abalone'

# Output location of the data. The input data will be split, transformed, and 
# uploaded to output/train and output/validation
s3_output_bucket = default_bucket
s3_output_key_prefix = timestamp_prefix + '/abalone'

# the MLeap serialized SparkML model will be uploaded to output/mleap
s3_model_bucket = default_bucket
s3_model_key_prefix = s3_output_key_prefix + '/mleap'

In [None]:

glue_client = boto_session.client('glue')
job_name = 'sparkml-abalone-' + timestamp_prefix
response = glue_client.create_job(
    Name=job_name,
    Description='PySpark job to featurize the Abalone dataset',
    Role=role, # you can pass your existing AWS Glue role here if you have used Glue before
    ExecutionProperty={
        'MaxConcurrentRuns': 1
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_location
    },
    DefaultArguments={
        '--job-language': 'python',
        '--extra-jars' : jar_dep_location,
        '--extra-py-files': python_dep_location
    },
    AllocatedCapacity=5,
    Timeout=60,
)
glue_job_name = response['Name']
print(glue_job_name)

In [None]:

job_run_id = glue_client.start_job_run(JobName=job_name,
                                       Arguments = {
                                        '--S3_INPUT_BUCKET': s3_input_bucket,
                                        '--S3_INPUT_KEY_PREFIX': s3_input_key_prefix,
                                        '--S3_OUTPUT_BUCKET': s3_output_bucket,
                                        '--S3_OUTPUT_KEY_PREFIX': s3_output_key_prefix,
                                        '--S3_MODEL_BUCKET': s3_model_bucket,
                                        '--S3_MODEL_KEY_PREFIX': s3_model_key_prefix
                                       })['JobRunId']
print(job_run_id)

In [None]:
job_run_status = glue_client.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
while job_run_status not in ('FAILED', 'SUCCEEDED', 'STOPPED'):
    job_run_status = glue_client.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
    print (job_run_status)
    time.sleep(30)

In [None]:

from sagemaker.amazon.amazon_estimator import get_image_uri

training_image = get_image_uri(sess.boto_region_name, 'xgboost', repo_version="latest")
print (training_image)

In [None]:
s3_train_data = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key_prefix, 'train')
s3_validation_data = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key_prefix, 'validation')
s3_output_location = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key_prefix, 'xgboost_model')

xgb_model = sagemaker.estimator.Estimator(training_image,
                                         role, 
                                         train_instance_count=1, 
                                         train_instance_type='ml.m5.xlarge',
                                         train_volume_size = 20,
                                         train_max_run = 3600,
                                         input_mode= 'File',
                                         output_path=s3_output_location,
                                         sagemaker_session=sess)

xgb_model.set_hyperparameters(objective = "reg:linear",
                              eta = .2,
                              gamma = 4,
                              max_depth = 5,
                              num_round = 10,
                              subsample = 0.7,
                              silent = 0,
                              min_child_weight = 6)

train_data = sagemaker.session.s3_input(s3_train_data, distribution='FullyReplicated', 
                        content_type='text/csv', s3_data_type='S3Prefix')
validation_data = sagemaker.session.s3_input(s3_validation_data, distribution='FullyReplicated', 
                             content_type='text/csv', s3_data_type='S3Prefix')

data_channels = {'train': train_data, 'validation': validation_data}

In [None]:
xgb_model.fit(inputs=data_channels, logs=True)

In [None]:
import json
schema = {
    "input": [
        {
            "name": "sex",
            "type": "string"
        }, 
        {
            "name": "length",
            "type": "double"
        }, 
        {
            "name": "diameter",
            "type": "double"
        }, 
        {
            "name": "height",
            "type": "double"
        }, 
        {
            "name": "whole_weight",
            "type": "double"
        }, 
        {
            "name": "shucked_weight",
            "type": "double"
        },
        {
            "name": "viscera_weight",
            "type": "double"
        }, 
        {
            "name": "shell_weight",
            "type": "double"
        }, 
    ],
    "output": 
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
}
schema_json = json.dumps(schema)
print(schema_json)

In [None]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sparkml.model import SparkMLModel

sparkml_data = 's3://{}/{}/{}'.format(s3_model_bucket, s3_model_key_prefix, 'model.tar.gz')
# passing the schema defined above by using an environment variable that sagemaker-sparkml-serving understands
sparkml_model = SparkMLModel(model_data=sparkml_data, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json})
xgb_model = Model(model_data=xgb_model.model_data, image=training_image)

model_name = 'inference-pipeline-' + timestamp_prefix
sm_model = PipelineModel(name=model_name, role=role, models=[sparkml_model, xgb_model])

In [None]:

endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

In [None]:
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
payload = "F,0.515,0.425,0.14,0.766,0.304,0.1725,0.255"
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=csv_serializer,
                                content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_CSV)
print(predictor.predict(payload))

In [None]:

payload = {"data": ["F",0.515,0.425,0.14,0.766,0.304,0.1725,0.255]}
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=json_serializer,
                                content_type=CONTENT_TYPE_JSON, accept=CONTENT_TYPE_CSV)

print(predictor.predict(payload))

In [None]:
payload = {
    "schema": {
        "input": [
        {
            "name": "length",
            "type": "double"
        }, 
        {
            "name": "sex",
            "type": "string"
        }, 
        {
            "name": "diameter",
            "type": "double"
        }, 
        {
            "name": "height",
            "type": "double"
        }, 
        {
            "name": "whole_weight",
            "type": "double"
        }, 
        {
            "name": "shucked_weight",
            "type": "double"
        },
        {
            "name": "viscera_weight",
            "type": "double"
        }, 
        {
            "name": "shell_weight",
            "type": "double"
        }, 
    ],
    "output": 
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
    },
    "data": [0.515,"F",0.425,0.14,0.766,0.304,0.1725,0.255]
}

predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=json_serializer,
                                content_type=CONTENT_TYPE_JSON, accept=CONTENT_TYPE_CSV)

print(predictor.predict(payload))

In [None]:
sm_client = boto_session.client('sagemaker')
sm_client.delete_endpoint(EndpointName=endpoint_name)

In [None]:
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/batch_input_abalone.csv
!printf "\n\nShowing first five lines\n\n"    
!head -n 5 batch_input_abalone.csv 
!printf "\n\nAs we can see, it is identical to the training file apart from the label being absent here.\n\n"

In [None]:

batch_input_loc = sess.upload_data(path='batch_input_abalone.csv', bucket=default_bucket, key_prefix='batch')

In [None]:
input_data_path = 's3://{}/{}/{}'.format(default_bucket, 'batch', 'batch_input_abalone.csv')
output_data_path = 's3://{}/{}/{}'.format(default_bucket, 'batch_output/abalone', timestamp_prefix)
job_name = 'serial-inference-batch-' + timestamp_prefix
transformer = sagemaker.transformer.Transformer(
    # This was the model created using PipelineModel and it contains feature processing and XGBoost
    model_name = model_name,
    instance_count = 1,
    instance_type = 'ml.m5.xlarge',
    strategy = 'SingleRecord',
    assemble_with = 'Line',
    output_path = output_data_path,
    base_transform_job_name='serial-inference-batch',
    sagemaker_session=sess,
    accept = CONTENT_TYPE_CSV
)
transformer.transform(data = input_data_path,
                      job_name = job_name,
                      content_type = CONTENT_TYPE_CSV, 
                      split_type = 'Line')
transformer.wait()