In [1]:
# !conda install -c anaconda pandas-datareader -y

In [2]:
import json
import boto3
import sagemaker
import pandas as pd
import pandas_datareader.data as web
%matplotlib inline

  from pandas.util.testing import assert_frame_equal


In [3]:
base_job_name = 'sparkSummitDemo'
start_date = '2014-01-01'
end_date = '2016-01-01'

start_date_full = start_date + ' 00:00:00'
end_date_full = end_date + ' 00:00:00'

In [4]:
sagemaker_session = sagemaker.Session()
s3_bucket = '2021-demos'
s3_prefix = 'sparkSummit'

# role = sagemaker.get_execution_role()
# role = "your-sagemaker-role-arn"
role = 'arn:aws:iam::921212210452:role/service-role/AmazonSageMaker-ExecutionRole-20191122T164449'

region = sagemaker_session.boto_region_name

s3_data_path = "s3://{}/{}/data".format(s3_bucket, s3_prefix)
s3_output_path = "s3://{}/{}/output".format(s3_bucket, s3_prefix)

image_name = sagemaker.amazon.amazon_estimator.get_image_uri(region, "forecasting-deepar", "latest")

In [5]:
symbol  = web.DataReader('SPY', data_source = 'yahoo', start=start_date, end=end_date)
symbol0 = web.DataReader('AMZN', data_source = 'yahoo', start=start_date, end=end_date)
symbol1 = web.DataReader('GOOG', data_source = 'yahoo', start=start_date, end=end_date)
symbol2 = web.DataReader('AAPL', data_source = 'yahoo', start=start_date, end=end_date)

In [6]:
training_rows = round(symbol.shape[0]*.8)
test_rows = symbol.shape[0] - training_rows

In [7]:
training_data = [
    {
        "start": start_date_full,
        "target": symbol[:training_rows].Close.tolist()  # We use -1, because pandas indexing includes the upper bound 
    }
    for ts in symbol
]
print(len(training_data))

6


In [8]:
test_data = [
    {
        "start": start_date_full,
        "target": symbol[:test_rows].Close.tolist()  # We use -1, because pandas indexing includes the upper bound 
    }
    for ts in symbol
]
print(len(test_data))

6


In [9]:
def write_dicts_to_file(path, data):
    with open(path, 'wb') as fp:
        for d in data:
            fp.write(json.dumps(d).encode("utf-8"))
            fp.write("\n".encode('utf-8'))

In [10]:
write_dicts_to_file("train.json", training_data)
write_dicts_to_file("test.json", test_data)

In [11]:
s3 = boto3.resource('s3')
bucket = s3.Bucket(s3_bucket)

for dataset in ['train', 'test']:
    filename = dataset + '.json'
    with open(filename, 'rb') as data:
        key = s3_prefix + '/data/' + dataset + '/' + filename
        result = bucket.put_object(Key=key, Body=data)
        print('Uploaded file to {}'.format(result))

Uploaded file to s3.Object(bucket_name='2021-demos', key='sparkSummit/data/train/train.json')
Uploaded file to s3.Object(bucket_name='2021-demos', key='sparkSummit/data/test/test.json')


In [12]:
estimator = sagemaker.estimator.Estimator(
    sagemaker_session=sagemaker_session,
    image_name=image_name,
    role=role,
    train_instance_count=1,
    train_instance_type='ml.c4.2xlarge',
    base_job_name=base_job_name,
    output_path=s3_output_path
)

In [13]:
# Predict for 14 days
prediction_length = 14

# Use 14 days as context length, 
# This is the number of state updates accomplished before making predictions
context_length = 14

In [14]:
hyperparameters = {
    "time_freq": 'D',
    "epochs": "400",
    "early_stopping_patience": "40",
    "mini_batch_size": "64",
    "learning_rate": "5E-4",
    "context_length": str(context_length),
    "prediction_length": str(prediction_length)
}
estimator.set_hyperparameters(**hyperparameters)

In [15]:
%%time
data_channels = {
    "train": "{}/train/".format(s3_data_path),
    "test": "{}/test/".format(s3_data_path)
}

estimator.fit(inputs=data_channels, wait=True)

2020-05-15 18:04:39 Starting - Starting the training job...
2020-05-15 18:04:43 Starting - Launching requested ML instances......
2020-05-15 18:05:43 Starting - Preparing the instances for training...
2020-05-15 18:06:36 Downloading - Downloading input data
2020-05-15 18:06:36 Training - Downloading the training image...
2020-05-15 18:07:13 Training - Training image download completed. Training in progress..[34mArguments: train[0m
[34m[05/15/2020 18:07:16 INFO 140645276194624] Reading default configuration from /opt/amazon/lib/python2.7/site-packages/algorithm/resources/default-input.json: {u'num_dynamic_feat': u'auto', u'dropout_rate': u'0.10', u'mini_batch_size': u'128', u'test_quantiles': u'[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]', u'_tuning_objective_metric': u'', u'_num_gpus': u'auto', u'num_eval_samples': u'100', u'learning_rate': u'0.001', u'num_cells': u'40', u'num_layers': u'2', u'embedding_dimension': u'10', u'_kvstore': u'auto', u'_num_kv_servers': u'auto', u'cardin

Prior to deployment we need to create a class that handles I/O with the following functions: 

* set_prediction_parameters() - set frequency of incoming data and length to predict
* __encode_request() 

In [16]:
def series_to_obj(ts, cat=None, dynamic_feat=None):
    obj = {"start": str(ts.index[0]), "target": list(ts)}
    if cat is not None:
        obj["cat"] = cat
    if dynamic_feat is not None:
        obj["dynamic_feat"] = dynamic_feat 
    return obj

class DeepARPredictor(sagemaker.predictor.RealTimePredictor):

    def set_prediction_parameters(self, freq, prediction_length):
        self.freq = freq
        self.prediction_length = prediction_length
        
    def predict(self, ts, cat=None, encoding="utf-8", num_samples=test_rows, quantiles=["0.1", "0.5", "0.9"]):
        prediction_times = [x.index[-1]+1 for x in ts]
        req = self.__encode_request(ts, cat, encoding, num_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req)
        return self.__decode_response(res, prediction_times, encoding)
    
    def __encode_request(self, ts, cat, encoding, num_samples, quantiles):
        instances = [series_to_obj(ts[k], cat[k] if cat else None) for k in range(len(ts))]
        configuration = {"num_samples": num_samples, "output_types": ["quantiles"], "quantiles": quantiles}
        http_request_data = {"instances": instances, "configuration": configuration}
        return json.dumps(http_request_data).encode(encoding)
    
    def __decode_response(self, response, prediction_times, encoding):
        response_data = json.loads(response.decode(encoding))
        list_of_df = []
        for k in range(len(prediction_times)):
            prediction_index = pd.DatetimeIndex(start=prediction_times[k], freq=self.freq, periods=self.prediction_length)
            list_of_df.append(pd.DataFrame(data=response_data['predictions'][k]['quantiles'], index=prediction_index))
        return list_of_df
    


In [None]:
predictor = estimator.deploy(
    initial_instance_count=1,
    instance_type='ml.m4.xlarge',
    predictor_cls=DeepARPredictor)

--------------------------------------------

In [None]:
test_data = symbol.Close
test_data1 = symbol1.Close
test_data2 = symbol2.Close
test_data3 = symbol3.Close

In [None]:
type(test_data)

In [None]:
test_portfolio = [test_data, test_data1, test_data2, test_data3 ]

In [None]:
type(test_portfolio)

In [None]:
predictor.predict(ts=test_data_array, quantiles=[0.10, 0.5, 0.90])