## Demo on how to use AWS Marpetplace Machine Learning Products with AWS Data Exchange products

@Copyright Rick Cao

This notebook serves a demo on using the below mentioned algorithm product and data products to create a model endpoint. 

 - [Scikit Decision Trees](https://aws.amazon.com/marketplace/pp/prodview-ha4f3kqugba3u?qid=1591728221699&sr=0-1&ref_=srh_res_product_title)
 
 - [COVID-19 - World Confirmed Cases, Deaths, and Testing](https://aws.amazon.com/marketplace/pp/prodview-3b32sjummof5s?qid=1591728365105&sr=0-1&ref_=srh_res_product_title)



In [1]:
import boto3
import os
import re 
import time
import click
import uuid
import sagemaker as sage
from sagemaker import get_execution_role
import pandas as pd
import numpy as np

In [2]:
dx = boto3.client('dataexchange', region_name='us-east-1')
s3 = boto3.client('s3')

In [3]:
def get_all_revisions(data_set_id):

    revisions = []
    res = dx.list_data_set_revisions(DataSetId=data_set_id)
    next_token = res.get('NextToken')
    
    revisions += res.get('Revisions')
    while next_token:
        res = dx.list_data_set_revisions(DataSetId=data_set_id,
                                         NextToken=next_token)
        revisions += res.get('Revisions')
        next_token = res.get('NextToken')
        
    return revisions


def get_all_assets(data_set_id, revision_id):
    assets = []
    res = dx.list_revision_assets(DataSetId=data_set_id,
                                  RevisionId=revision_id)
    next_token = res.get('NextToken')
    
    assets += res.get('Assets')
    while next_token:
        res = dx.list_revision_assets(DataSetId=data_set_id,
                                      RevisionId=revision_id,
                                      NextToken=next_token)
        assets += res.get('Assets')
        next_token = res.get('NextToken')
        
    return assets


def get_entitled_data_sets():
    data_sets = []
    res = dx.list_data_sets(Origin='ENTITLED')
    next_token = res.get('NextToken')
    
    data_sets += res.get('DataSets')
    while next_token:
        res = dx.list_data_sets(Origin='ENTITLED',
                                NextToken=next_token)
        data_sets += res.get('DataSets')
        next_token = res.get('NextToken')
        
    return data_sets


def export_assets(assets, bucket):
    
    asset_destinations = []

    for asset in assets:
        asset_destinations.append({
            "AssetId": asset.get('Id'),
            "Bucket": bucket,
            "Key": asset.get('Name')
        })

    job = dx.create_job(Type='EXPORT_ASSETS_TO_S3', Details={
        "ExportAssetsToS3": {
            "RevisionId": asset.get("RevisionId"), "DataSetId": asset.get("DataSetId"),
            "AssetDestinations": asset_destinations
        }
    })

    job_id = job.get('Id')
    dx.start_job(JobId=job_id)

    while True:
        job = dx.get_job(JobId=job_id)

        if job.get('State') == 'COMPLETED':
            break
        elif job.get('State') == 'ERROR':
            raise Exception("Job {} failed to complete - {}".format(
                job_id, job.get('Errors')[0].get('Message'))
            )

        time.sleep(1)


def to_url(s):
    s = re.sub(r"[^\w\s]", '', s)
    s = re.sub(r"\s+", '-', s)

    return s


def download_assets(assets, bucket, asset_dir):
    for asset in assets:
        asset_name = asset.get('Name')
        sub_dir = os.path.dirname(asset_name)
        full_dir = os.path.join(asset_dir, sub_dir)

        if not os.path.exists(full_dir):
            os.makedirs(full_dir)

        asset_file = os.path.join(full_dir, os.path.basename(asset_name))

        s3.download_file(bucket, asset_name, asset_file)

        print("Downloaded file {}".format(asset_file))


def make_s3_staging_bucket():
    bucket_name = str(uuid.uuid4())
    s3.create_bucket(Bucket=bucket_name)
    return bucket_name


def remove_s3_bucket(bucket_name):
    s3_resource = boto3.resource('s3')
    bucket = s3_resource.Bucket(bucket_name)
    bucket.objects.all().delete()
    bucket.delete()

## Upload the data to S3 bucket

In [4]:
role = get_execution_role()

# S3 prefixes
common_prefix = "DEMO-scikit"
training_input_prefix = common_prefix + "/training-input-data"
batch_inference_input_prefix = common_prefix + "/batch-inference-input-data"

In [5]:
sagemaker_session = sage.Session()
staging_bucket = sagemaker_session.default_bucket()

In [6]:
data_sets = get_entitled_data_sets()
len(data_sets)

5

In [7]:
ds_id = "5d6ec0f777f3e82430693c096426adf1" # rearc COVID-19 - World Confirmed Cases, Deaths, and Testing

last_rev = get_all_revisions(ds_id)[0]
assets = get_all_assets(ds_id, last_rev.get('Id'))

destination_dir = os.path.join('covid19', last_rev.get('Id'))

export_assets(assets, staging_bucket)
download_assets(assets, staging_bucket, destination_dir)

Downloaded file covid19/f340dca7434c4817a900c813fe1e5adc/covid-19-world-cases-deaths-testing/dataset/covid-19-world-cases-deaths-testing.csv
Downloaded file covid19/f340dca7434c4817a900c813fe1e5adc/covid-19-world-cases-deaths-testing/dataset/covid-19-world-cases-deaths-testing.xlsx


## Retrieve data from S3 bucket and preprocess the data

In [8]:
data_key = 'covid-19-world-cases-deaths-testing.csv'
data_location = 's3://{}/{}/{}'.format(staging_bucket, "covid-19-world-cases-deaths-testing/dataset", data_key)

In [9]:
df = pd.read_csv(data_location)

In [10]:
df=df[df['date']=="2020-06-08"]

In [11]:
df=df[['iso_code', 'continent', 'location','date','total_cases','new_cases',
      'total_deaths','new_deaths','total_cases_per_million','new_cases_per_million',
      'total_deaths_per_million','new_deaths_per_million', 'stringency_index', 'population', 
      'population_density', 'median_age','aged_65_older', 'aged_70_older', 'gdp_per_capita',
     'cvd_death_rate', 'diabetes_prevalence', 'handwashing_facilities', 'hospital_beds_per_thousand']]

In [12]:
df_X = df[['total_cases','new_cases',
      'total_deaths','new_deaths','total_cases_per_million','new_cases_per_million',
           'new_deaths_per_million', 'stringency_index', 'population', 
      'population_density', 'median_age','aged_65_older', 'aged_70_older', 'gdp_per_capita',
     'cvd_death_rate', 'diabetes_prevalence', 'handwashing_facilities', 'hospital_beds_per_thousand']]

In [13]:
df = df.fillna(0)

In [14]:
df.dropna(inplace=True)

In [15]:
df['total_cases_per_million'].median()

314.30049999999994

In [16]:
len(df)

210

In [17]:
df['total_cases_per_million'] = df['total_cases_per_million'] > 300

In [18]:
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(df_X.values , df['total_cases_per_million'].values, test_size=0.1, shuffle=True)

In [19]:
X_train = np.nan_to_num(X_train)
X_val = np.nan_to_num(X_val)

In [21]:
input_data = np.hstack((y_train.reshape(y_train.shape[0],1), X_train))
# input_data = np.hstack((X_train, y_train.reshape(188,1)))

In [22]:
pd.DataFrame(input_data).to_csv("input_data.csv", index=False)
training_input_prefix = "training_input"

In [23]:
training_input = sagemaker_session.upload_data("input_data.csv", key_prefix=training_input_prefix)
print ("Training Data Location " + training_input)

Training Data Location s3://sagemaker-us-east-1-833255123925/training_input/input_data.csv


## Fetch the algorithm product and submit a training job

In [24]:
algorithm_arn = "arn:aws:sagemaker:us-east-1:865070037744:algorithm/scikit-decision-trees-15423055-57b73412d2e93e9239e4e16f83298b8f"

In [25]:
import json
import time
from sagemaker.algorithm import AlgorithmEstimator

algo = AlgorithmEstimator(
            algorithm_arn=algorithm_arn,
            role=role,
            train_instance_count=1,
            train_instance_type='ml.m4.xlarge',
            base_job_name='intel-from-aws-marketplace')

In [26]:
algo.fit({'training': training_input})

2020-06-09 22:33:49 Starting - Starting the training job...
2020-06-09 22:33:51 Starting - Launching requested ML instances......
2020-06-09 22:35:08 Starting - Preparing the instances for training......
2020-06-09 22:36:17 Downloading - Downloading input data
2020-06-09 22:36:17 Training - Downloading the training image...
2020-06-09 22:36:48 Training - Training image download completed. Training in progress..[34mStarting the training.[0m
[34mvalidation-accuracy: 1.0[0m
[34mTraining complete.[0m

2020-06-09 22:37:00 Uploading - Uploading generated training model
2020-06-09 22:37:00 Completed - Training job completed
Training seconds: 61
Billable seconds: 61


### Automated Model Tuning (optional)

In [27]:
from sagemaker.tuner import HyperparameterTuner, IntegerParameter

## This demo algorithm supports max_leaf_nodes as the only tunable hyperparameter.
hyperparameter_ranges = {'max_leaf_nodes': IntegerParameter(1, 10)}

tuner = HyperparameterTuner(estimator=algo, base_tuning_job_name='some-name',
                                objective_metric_name='validation:accuracy',
                                hyperparameter_ranges=hyperparameter_ranges,
                                max_jobs=2, max_parallel_jobs=2)

tuner.fit({'training': training_input}, include_cls_metadata=False)
tuner.wait()

................................................!


### Launch the endpoint and test your prediction

In [28]:
from sagemaker.predictor import csv_serializer
predictor = algo.deploy(1, 'ml.m4.xlarge', serializer=csv_serializer)

..........
-------------!

In [None]:
# from sagemaker.predictor import RealTimePredictor
# predictor1 = RealTimePredictor(endpoint="intel-from-aws-marketplace-2020-06-09-02-08-55-620",
#                               sagemaker_session=sagemaker_session,
#                               serializer=csv_serializer)

In [None]:
# predictor1.predict(X_val)

In [None]:
# TRANSFORM_WORKDIR = "data/transform"
# X_val.to_csv(TRANSFORM_WORKDIR + "/batchtransform_test.csv", index=False, header=False)

# transform_input = sagemaker_session.upload_data(TRANSFORM_WORKDIR, key_prefix=batch_inference_input_prefix) + "/batchtransform_test.csv"
# print("Transform input uploaded to " + transform_input)

In [None]:
# transformer = algo.transformer(1, 'ml.m4.xlarge')
# transformer.transform(transform_input, content_type='text/csv')
# transformer.wait()

# print("Batch Transform output saved to " + transformer.output_path)

In [29]:
prediction = predictor.predict(X_val).decode('utf-8')

In [30]:
prediction=np.array(prediction.split("\n")[:-1], dtype='float')

In [31]:
prediction

array([0., 1., 0., 1., 1., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 1., 0.,
       0., 1., 1., 1.])

In [None]:
# from sklearn.metrics import accuracy_score

# print(accuracy_score(y_val, prediction))

In [32]:
y_val-prediction

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0.])

### We could do more powerful by using streaming data

In [None]:
algo.delete_endpoint()