In [50]:
from datetime import date, timedelta

In [51]:
start_date = date(2020, 2, 29)
end_date = date(2021, 8, 25)
delta = timedelta(days=1)
start_date_use = []
while start_date <= end_date:
    start_date_use.append(start_date.strftime("%Y-%m-%d"))
    start_date += delta

In [52]:
start_date_use[0]

'2020-02-29'

In [58]:
from __future__ import print_function

%matplotlib inline

import sys
import zipfile
from dateutil.parser import parse
import json
from random import shuffle
import random
import datetime
import os

import boto3
import s3fs
import sagemaker
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import timedelta
from sagemaker import get_execution_role

from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
from ipywidgets import IntSlider, FloatSlider, Checkbox

from sagemaker.serializers import IdentitySerializer

# set random seeds for reproducibility
np.random.seed(42)
random.seed(42)

sagemaker_session = sagemaker.Session()

s3_bucket = sagemaker.Session().default_bucket()  # replace with an existing bucket if needed
s3_prefix = "deepar-covid-demo-notebook"  # prefix used for all data stored within the bucket

region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role() 

s3_data_path = "s3://{}/{}/data".format(s3_bucket, s3_prefix)
s3_output_path = "s3://{}/{}/output".format(s3_bucket, s3_prefix)
image_name = sagemaker.amazon.amazon_estimator.get_image_uri(region, "forecasting-deepar", "latest")

#make the file direction
bucket = 'bingnan-covid-test/covid'
data_key = 'after228.csv'

data_location = 's3://{}/{}'.format(bucket, data_key)

#re-format the data
data = pd.read_csv(data_location, index_col=0, parse_dates=True , decimal=",")
data_new = data.iloc[:, :]

#reset the time index
data_new.reset_index(drop=True, inplace=True)

#set the ts.timeframe date time
data_new['date'] = data_new['date'].apply(lambda x: pd.Timestamp(x).strftime('%Y-%m-%d'))
data_new = data_new.set_index(pd.to_datetime(data_new['date']))

data_new_use = data_new.iloc[: , :]

data_new_use = data_new_use.resample("D").sum()

num_timeseries = data_new_use.shape[1]

# we use 24 hour frequency for the time series
freq = "D"

# we predict for 14 days
prediction_length = 7 * 2

# we also use 14 days as context length, this is the number of state updates accomplished before making predictions
context_length = 7 * 2 


def write_dicts_to_file(path, data):
    with open(path, "wb") as fp:
        for d in data:
            fp.write(json.dumps(d).encode("utf-8"))
            fp.write("\n".encode("utf-8"))

s3 = boto3.resource("s3")


def copy_to_s3(local_file, s3_path, override=True):
    assert s3_path.startswith("s3://")
    split = s3_path.split("/")
    bucket = split[2]
    path = "/".join(split[3:])
    buk = s3.Bucket(bucket)

    if len(list(buk.objects.filter(Prefix=path))) > 0:
        if not override:
            print(
                "File s3://{}/{} already exists.\nSet override to upload anyway.\n".format(
                    s3_bucket, s3_path
                )
            )
            return
        else:
            print("Overwriting existing file")
    with open(local_file, "rb") as data:
        print("Uploading file to {}".format(s3_path))
        buk.put_object(Key=path, Body=data)

estimator = sagemaker.estimator.Estimator(
    image_uri=image_name,
    sagemaker_session=sagemaker_session,
    role=role,
    instance_count=1,
    instance_type="ml.c4.2xlarge",
    base_job_name="deepar-covid-demo",
    output_path=s3_output_path,
)

#default number of layers is 2
hyperparameters = {
    "time_freq": freq,
    "epochs": "200",
    "early_stopping_patience": "40",
    "mini_batch_size": "64",
    "learning_rate": "5E-4",
    "context_length": str(context_length),
    "prediction_length": str(prediction_length),
}





class DeepARPredictor(sagemaker.predictor.Predictor):
    def __init__(self, *args, **kwargs):
        super().__init__(
            *args,
            # serializer=JSONSerializer(),
            serializer=IdentitySerializer(content_type="application/json"),
            **kwargs,
        )

    def predict(
        self,
        ts,
        cat=None,
        dynamic_feat=None,
        num_samples=10,
        return_samples=False,
        quantiles=["0.1", "0.5", "0.9"],
    ):
        """Requests the prediction of for the time series listed in `ts`, each with the (optional)
        corresponding category listed in `cat`.

        ts -- `pandas.Series` object, the time series to predict
        cat -- integer, the group associated to the time series (default: None)
        num_samples -- integer, number of samples to compute at prediction time (default: 100)
        return_samples -- boolean indicating whether to include samples in the response (default: False)
        quantiles -- list of strings specifying the quantiles to compute (default: ["0.1", "0.5", "0.9"])

        Return value: list of `pandas.DataFrame` objects, each containing the predictions
        """
        prediction_time = ts.index[-1] + ts.index.freq
        quantiles = [str(q) for q in quantiles]
        req = self.__encode_request(ts, cat, dynamic_feat, num_samples, return_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req)
        return self.__decode_response(res, ts.index.freq, prediction_time, return_samples)

    def __encode_request(self, ts, cat, dynamic_feat, num_samples, return_samples, quantiles):
        instance = series_to_dict(
            ts, cat if cat is not None else None, dynamic_feat if dynamic_feat else None
        )

        configuration = {
            "num_samples": num_samples,
            "output_types": ["quantiles", "samples"] if return_samples else ["quantiles"],
            "quantiles": quantiles,
        }

        http_request_data = {"instances": [instance], "configuration": configuration}

        return json.dumps(http_request_data).encode("utf-8")

    def __decode_response(self, response, freq, prediction_time, return_samples):
        # we only sent one time series so we only receive one in return
        # however, if possible one will pass multiple time series as predictions will then be faster
        predictions = json.loads(response.decode("utf-8"))["predictions"][0]
        prediction_length = len(next(iter(predictions["quantiles"].values())))
        prediction_index = pd.date_range(
            start=prediction_time, freq=freq, periods=prediction_length
        )
        if return_samples:
            dict_of_samples = {"sample_" + str(i): s for i, s in enumerate(predictions["samples"])}
        else:
            dict_of_samples = {}
        return pd.DataFrame(
            data={**predictions["quantiles"], **dict_of_samples}, index=prediction_index
        )

    def set_frequency(self, freq):
        self.freq = freq


def encode_target(ts):
    return [x if np.isfinite(x) else "NaN" for x in ts]


def series_to_dict(ts, cat=None, dynamic_feat=None):
    """Given a pandas.Series object, returns a dictionary encoding the time series.

    ts -- a pands.Series object with the target time series
    cat -- an integer indicating the time series category

    Return value: a dictionary
    """
    obj = {"start": str(ts.index[0]), "target": encode_target(ts)}
    if cat is not None:
        obj["cat"] = cat
    if dynamic_feat is not None:
        obj["dynamic_feat"] = dynamic_feat
    return obj

The method get_image_uri has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
Defaulting to the only supported framework/algorithm version: 1. Ignoring framework/algorithm version: latest.


In [63]:
#k should start from 45, end at 198
forecast_outcome = []
for k in range(300,310):
#create an empty time series
    timeseries = []
    for i in range(num_timeseries):
        timeseries.append(np.trim_zeros(data_new_use.iloc[k:k+222, i], trim="f"))
        training_data = [
            {
            "start" : str(start_date_use[k]),
            "target" : ts[:].tolist()
            }
            for ts in timeseries  
        ]
            
    write_dicts_to_file("train.json", training_data)
    copy_to_s3("train.json", s3_data_path + "/train/train.json")
    
    s3filesystem = s3fs.S3FileSystem()
    estimator.set_hyperparameters(**hyperparameters)
    
    data_channels = {"train": "{}/train/".format(s3_data_path)}
    estimator.fit(inputs=data_channels, wait=True)
    
    predictor = estimator.deploy(
    initial_instance_count=1, instance_type="ml.m5.large", predictor_cls=DeepARPredictor
    )
    
    forecast_outcome.append(predictor.predict(ts=timeseries[3], quantiles=[0.5]))
    
    predictor.delete_endpoint()
    
print(forecast_outcome)    
    

Overwriting existing file
Uploading file to s3://sagemaker-us-east-2-222195595958/deepar-covid-demo-notebook/data/train/train.json
2021-09-12 14:35:56 Starting - Starting the training job...
2021-09-12 14:36:19 Starting - Launching requested ML instancesProfilerReport-1631457356: InProgress
...
2021-09-12 14:36:46 Starting - Preparing the instances for training......
2021-09-12 14:37:56 Downloading - Downloading input data
2021-09-12 14:37:56 Training - Downloading the training image........[34mArguments: train[0m
[34m[09/12/2021 14:39:06 INFO 140186635052416] Reading default configuration from /opt/amazon/lib/python3.6/site-packages/algorithm/resources/default-input.json: {'_kvstore': 'auto', '_num_gpus': 'auto', '_num_kv_servers': 'auto', '_tuning_objective_metric': '', 'cardinality': 'auto', 'dropout_rate': '0.10', 'early_stopping_patience': '', 'embedding_dimension': '10', 'learning_rate': '0.001', 'likelihood': 'student-t', 'mini_batch_size': '128', 'num_cells': '40', 'num_dyna