In [42]:
import botocore.session as s
from botocore.exceptions import ClientError
import boto3.session
import json
import boto3
import sagemaker
import s3fs
import time
import os
import random
import datetime
import operator
from botocore.exceptions import WaiterError
from botocore.waiter import WaiterModel
from botocore.waiter import create_waiter_with_client

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
from ipywidgets import IntSlider, FloatSlider, Checkbox

In [43]:
# Create custom waiter for the Redshift Data API to wait for finish execution of current SQL statement
waiter_name = 'DataAPIExecution'

delay=2
max_attempts=3

#Configure the waiter settings
waiter_config = {
  'version': 2,
  'waiters': {
    'DataAPIExecution': {
      'operation': 'DescribeStatement',
      'delay': delay,
      'maxAttempts': max_attempts,
      'acceptors': [
        {
          "matcher": "path",
          "expected": "FINISHED",
          "argument": "Status",
          "state": "success"
        },
        {
          "matcher": "pathAny",
          "expected": ["PICKED","STARTED","SUBMITTED"],
          "argument": "Status",
          "state": "retry"
        },
        {
          "matcher": "pathAny",
          "expected": ["FAILED","ABORTED"],
          "argument": "Status",
          "state": "failure"
        }
      ],
    },
  },
}

In [44]:
# set random seeds for reproducibility
np.random.seed(42)
random.seed(42)

In [45]:
sagemaker_session = sagemaker.Session()

Before starting, we can override the default values for the following:
- The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these.

## Retrieve DB details from AWS Secrets Manager
We need to retrieve from AWS Secrets Manager-
* Cluster Identifier
* Secrets ARN
* Database name

In [46]:
secret_name='SecretRedshiftMasterUser-jNyiUN1DweVn' ## replace the secret name with yours
session = boto3.session.Session()
region = session.region_name

client = session.client(
        service_name='secretsmanager',
        region_name=region
    )

try:
    get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    secret_arn=get_secret_value_response['ARN']

except ClientError as e:
    print("Error retrieving secret. Error: " + e.response['Error']['Message'])
    
else:
    # Depending on whether the secret is a string or binary, one of these fields will be populated.
    if 'SecretString' in get_secret_value_response:
        secret = get_secret_value_response['SecretString']
    else:
        secret = base64.b64decode(get_secret_value_response['SecretBinary'])
            
secret_json = json.loads(secret)

cluster_id=secret_json['dbClusterIdentifier']
db=secret_json['dbname']
print("Cluster_id: " + cluster_id + "\nDB: " + db + "\nSecret ARN: " + secret_arn)

Cluster_id: redshift-cluster-1
DB: dev
Secret ARN: arn:aws:secretsmanager:us-east-1:xxxxx:secret:SecretRedshiftMasterUser-xxxx-IZpZIl


We will create the Data API client. For the rest of the notebook we will use this Data API client `client_redshift`.

In [47]:
bc_session = s.get_session()

session = boto3.Session(
        botocore_session=bc_session,
        region_name=region,
    )

# Setup the client
client_redshift = session.client("redshift-data")
print("Data API client successfully loaded")

Data API client successfully loaded


## List Schema
We first list the schema in current database.

In [48]:
client_redshift.list_schemas(
    Database= db, 
    SecretArn= secret_arn, 
    ClusterIdentifier= cluster_id)["Schemas"]

['catalog_history',
 'information_schema',
 'marketing',
 'public',
 'public_raw',
 'pg_catalog',
 'pg_internal',
 'public',
 'sales']

In [49]:
client_redshift.list_tables(
    Database= db, 
    SecretArn= secret_arn, 
    SchemaPattern='public',
    TablePattern='%',
    ClusterIdentifier= cluster_id)["Tables"]

[{'name': 'categories', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'holidays', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'mv_tbl__sales_quantity_mv__0', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'product_sales', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'products', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'sales', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'sales_model_data', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'sales_quantity_mv', 'schema': 'public', 'type': 'VIEW'}]

## Initiating Waiter
Initiating the custom waiter for subseqent `execute_statement` Data API call to return FINISHED signal.

In [50]:
waiter_model = WaiterModel(waiter_config)
custom_waiter = create_waiter_with_client(waiter_name, waiter_model, client_redshift)

Create the schema `taxischema` and the table `nyc_greentaxi`.

In [51]:
client_redshift.list_tables(
    Database= db, 
    SecretArn= secret_arn, 
    SchemaPattern='public',
    ClusterIdentifier= cluster_id)["Tables"]

[{'name': 'categories', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'holidays', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'mv_tbl__sales_quantity_mv__0', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'product_sales', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'products', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'sales', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'sales_model_data', 'schema': 'public', 'type': 'TABLE'},
 {'name': 'sales_quantity_mv', 'schema': 'public', 'type': 'VIEW'}]

In [52]:
query_str = 'select sysdate;'

res = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)
print("Redshift Data API execution  started ...")
id=res["Id"]

# Waiter in try block and wait for DATA API to return
try:
    custom_waiter.wait(Id=id)
    print("Done waiting to finish Data API.")
except WaiterError as e:
    print (e)
    
desc=client_redshift.describe_statement(Id=id)
print("Status: " + desc["Status"] + ". Excution time: %d miliseconds" %float(desc["Duration"]/pow(10,6)))

Redshift Data API execution  started ...
Done waiting to finish Data API.
Status: FINISHED. Excution time: 3 miliseconds


## In-place analysis

We can execute Redshfit data API to fetch the query result into the Panda data frame. This simplifies the in-place analysis of Amazon Redshift cluster data since this bypasses UNLOAD-ing the data first into Amazon S3 and then loading into the Panda data frame.

In [53]:
query_str = "select count(*) from public.categories;"

res = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)
print("Redshift Data API execution  started ...")
id = res["Id"]

# Reset the 'delay' attribute of the waiter back to 2 seconds.
waiter_config["waiters"]["DataAPIExecution"]["delay"] = 2
waiter_model = WaiterModel(waiter_config)
custom_waiter = create_waiter_with_client(waiter_name, waiter_model, client_redshift)

# Waiter in try block and wait for DATA API to return
try:
    custom_waiter.wait(Id=id)
    print("Done waiting to finish Data API.")
except WaiterError as e:
    print (e)

output=client_redshift.get_statement_result(Id=id)
nrows=output["TotalNumRows"]
row_count=output["Records"][0][0]

print("\nTable rows count: %d" %row_count["longValue"])

Redshift Data API execution  started ...
Done waiting to finish Data API.

Table rows count: 1705


In [63]:
query_str = """SELECT abs(quantity) as qty,store_hk,
       category_hk,
       item_hk,
       item_iscorporate,
       item_isstocked,
       item_includeai,
       item_isdepartmentitem,
       item_isedi,
       item_ispriceunitonlabel,
       item_issendtomorgue,
       item_isinactive,
       item_isconverted,
       item_isdiscontinued,
       item_isincludeexo,
       item_isbasketactive,
       item_islocalitemreturnloss,
       category_iscosmetic,
       category_isotc,
       category_isrx,
       category_iscoupon,
       transaction_year,
       transaction_quarter,
       transaction_month,
       transaction_month_week,
       transaction_week_day,
       holiday
FROM public.product_sales WHERE sl <10000"""

res = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)

id = res["Id"]
# Waiter in try block and wait for DATA API to return
try:
    custom_waiter.wait(Id=id)
    print("Done waiting to finish Data API.")
except WaiterError as e:
    print (e)

output=client_redshift.get_statement_result(Id=id)
nrows=output["TotalNumRows"]
ncols=len(output["ColumnMetadata"])
print("print rows " + str(nrows))
print("print cols " + str(ncols))



#print("Number of columns: %d" %ncols)
resultrows=output["Records"]

col_labels=[]
for i in range(ncols): col_labels.append(output["ColumnMetadata"][i]['label'])
                                              
records=[]
for i in range(nrows): records.append(resultrows[i])

df = pd.DataFrame(np.array(resultrows), columns=col_labels)

df[col_labels[0]]=df[col_labels[0]].apply(operator.itemgetter('doubleValue'))
df[col_labels[1]]=df[col_labels[1]].apply(operator.itemgetter('stringValue'))
df[col_labels[2]]=df[col_labels[2]].apply(operator.itemgetter('stringValue'))
df[col_labels[3]]=df[col_labels[3]].apply(operator.itemgetter('stringValue'))
df[col_labels[4]]=df[col_labels[4]].apply(operator.itemgetter('stringValue'))
df[col_labels[5]]=df[col_labels[5]].apply(operator.itemgetter('stringValue'))
df[col_labels[6]]=df[col_labels[6]].apply(operator.itemgetter('stringValue'))
df[col_labels[7]]=df[col_labels[7]].apply(operator.itemgetter('stringValue'))
df[col_labels[8]]=df[col_labels[8]].apply(operator.itemgetter('stringValue'))
df[col_labels[9]]=df[col_labels[9]].apply(operator.itemgetter('stringValue'))
df[col_labels[10]]=df[col_labels[10]].apply(operator.itemgetter('stringValue'))
df[col_labels[11]]=df[col_labels[11]].apply(operator.itemgetter('stringValue'))
df[col_labels[12]]=df[col_labels[12]].apply(operator.itemgetter('stringValue'))
df[col_labels[13]]=df[col_labels[13]].apply(operator.itemgetter('stringValue'))
df[col_labels[14]]=df[col_labels[14]].apply(operator.itemgetter('stringValue'))
df[col_labels[15]]=df[col_labels[15]].apply(operator.itemgetter('stringValue'))
df[col_labels[16]]=df[col_labels[16]].apply(operator.itemgetter('stringValue'))
df[col_labels[17]]=df[col_labels[17]].apply(operator.itemgetter('stringValue'))
df[col_labels[18]]=df[col_labels[18]].apply(operator.itemgetter('stringValue'))
df[col_labels[19]]=df[col_labels[19]].apply(operator.itemgetter('stringValue'))
df[col_labels[20]]=df[col_labels[20]].apply(operator.itemgetter('stringValue'))
df[col_labels[21]]=df[col_labels[21]].apply(operator.itemgetter('longValue'))
df[col_labels[22]]=df[col_labels[22]].apply(operator.itemgetter('longValue'))
df[col_labels[23]]=df[col_labels[23]].apply(operator.itemgetter('longValue'))
df[col_labels[24]]=df[col_labels[24]].apply(operator.itemgetter('longValue'))
df[col_labels[25]]=df[col_labels[25]].apply(operator.itemgetter('longValue'))
df[col_labels[26]]=df[col_labels[26]].apply(operator.itemgetter('longValue'))

df

Done waiting to finish Data API.
print rows 9999
print cols 27


Unnamed: 0,qty,store_hk,category_hk,item_hk,item_iscorporate,item_isstocked,item_includeai,item_isdepartmentitem,item_isedi,item_ispriceunitonlabel,...,category_iscosmetic,category_isotc,category_isrx,category_iscoupon,transaction_year,transaction_quarter,transaction_month,transaction_month_week,transaction_week_day,holiday
0,1.0,4B74DEFC64686F120696C43AEAFB228B,9A37909EC1DFACE5C7FDD46C66B604DD,5D524384B82482DF6BCA5E0C87C5E822,true,true,true,false,false,false,...,false,false,false,false,20,1,3,1,1,0
1,1.0,C11DCF9B2E22B2E6C6A424EFACB2528C,9A37909EC1DFACE5C7FDD46C66B604DD,D031649596FD078F28A87F353A13EDD1,true,true,true,false,false,false,...,false,false,false,false,19,4,12,4,7,0
2,1.0,52830DBF56F587767F810C8BC78A26E9,9A37909EC1DFACE5C7FDD46C66B604DD,B83A43C1AC9C8775B8CB2BB7C648F735,true,true,true,false,false,false,...,false,true,false,false,20,1,3,4,7,0
3,1.0,E4F6BF766CFF69E86A96B8E97F497E36,9A37909EC1DFACE5C7FDD46C66B604DD,8B4E982774C9418F780574F611EF897B,true,false,false,false,false,false,...,false,false,false,false,20,3,9,2,7,0
4,1.0,69184604752C65742866D1ABF7F68732,9A37909EC1DFACE5C7FDD46C66B604DD,96A78731F20599B99FF0461A36B1FD09,true,true,true,false,false,false,...,true,false,false,false,20,4,12,5,4,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9994,1.0,B68484EA6F64CD6BD5F637A2CA7129F3,9A37909EC1DFACE5C7FDD46C66B604DD,235074D2F4845F432329E55792DF4574,true,true,true,false,false,false,...,false,false,false,false,19,4,12,2,3,0
9995,1.0,DE192B3A48262A654E50849BBABA9203,9A37909EC1DFACE5C7FDD46C66B604DD,7A6120954AC417EC990372724482EB6E,true,true,true,false,false,false,...,false,true,false,false,19,4,12,5,2,1
9996,1.0,66CEB41858A514D81D2F47167D9CCFCD,9A37909EC1DFACE5C7FDD46C66B604DD,B12D1B51499A5A5FD4048D8A1C4E276F,true,true,true,false,false,false,...,false,false,false,false,20,4,12,1,6,0
9997,1.0,CC7B44B8053EB838F14898987F892010,9A37909EC1DFACE5C7FDD46C66B604DD,71D496C1B813093F1A6A21821CA31174,true,true,true,false,false,false,...,false,false,false,false,21,1,1,2,6,0


In [None]:
df

## ML Model Build

In [14]:
s3_bucket = sagemaker.Session().default_bucket()  # replace with an existing bucket if needed
s3_prefix = 'redshift-deepar-nyctaxi-demo-notebook'    # prefix used for all data stored within the bucket

In [15]:
s3_data_path = "s3://{}/{}/data".format(s3_bucket, s3_prefix)
s3_output_path = "s3://{}/{}/output".format(s3_bucket, s3_prefix)
print(s3_output_path)

s3://sagemaker-us-east-1-001876746742/redshift-deepar-nyctaxi-demo-notebook/output


Next, we configure the container image to be used for the region that we are running in.

In [16]:
image_name = sagemaker.amazon.amazon_estimator.get_image_uri(region, "forecasting-deepar", "latest")

The method get_image_uri has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
Defaulting to the only supported framework/algorithm version: 1. Ignoring framework/algorithm version: latest.


In [17]:
redshift_unload_path = s3_output_path + '/unload/'

query_str = "unload('select coalesce(v1.pickup_timestamp_norm, v2.pickup_timestamp_norm) as pickup_timestamp_norm , coalesce(v1.vendor_1, 0) as vendor_1 , coalesce(v2.vendor_2, 0) as vendor_2 from (select case when extract(minute from lpep_dropoff_datetime) between 0 and 14 then dateadd(minute, 0, date_trunc(''hour'', lpep_dropoff_datetime)) when extract(minute from lpep_dropoff_datetime) between 15 and 29 then dateadd(minute, 15, date_trunc(''hour'', lpep_dropoff_datetime)) when extract(minute from lpep_dropoff_datetime) between 30 and 44 then dateadd(minute, 30, date_trunc(''hour'', lpep_dropoff_datetime)) when extract(minute from lpep_dropoff_datetime) between 45 and 59 then dateadd(minute, 45, date_trunc(''hour'', lpep_dropoff_datetime)) end as pickup_timestamp_norm , count(1) as vendor_1 from taxischema.nyc_greentaxi where vendorid = 1 group by 1) v1 full outer join (select case when extract(minute from lpep_dropoff_datetime) between 0 and 14 then dateadd(minute, 0, date_trunc(''hour'', lpep_dropoff_datetime)) when extract(minute from lpep_dropoff_datetime) between 15 and 29 then dateadd(minute, 15, date_trunc(''hour'', lpep_dropoff_datetime)) when extract(minute from lpep_dropoff_datetime) between 30 and 44 then dateadd(minute, 30, date_trunc(''hour'', lpep_dropoff_datetime)) when extract(minute from lpep_dropoff_datetime) between 45 and 59 then dateadd(minute, 45, date_trunc(''hour'', lpep_dropoff_datetime)) end as pickup_timestamp_norm , count(1)  as vendor_2 from taxischema.nyc_greentaxi where vendorid = 2 group by 1) v2 on v1.pickup_timestamp_norm = v2.pickup_timestamp_norm order by pickup_timestamp_norm ;') to '" + redshift_unload_path + "' iam_role '" + redshift_iam_role + "' format as CSV header ALLOWOVERWRITE GZIP"

res = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)
print("Redshift Data API execution  started ...")
id = res["Id"]

# Reset the 'delay' attribute of the waiter to 20 seconds for the UNLOAD to finish.
waiter_config["waiters"]["DataAPIExecution"]["delay"] = 20
waiter_model = WaiterModel(waiter_config)
custom_waiter = create_waiter_with_client(waiter_name, waiter_model, client_redshift)

# Waiter in try block and wait for DATA API to return
try:
    custom_waiter.wait(Id=id)
    print("Done waiting to finish Data API.")
except WaiterError as e:
    print (e)
    
print("Query execution complete")

NameError: name 'redshift_iam_role' is not defined

In [None]:
s3 = boto3.client('s3')

def load_df_from_s3(s3_path):
    assert s3_path.startswith('s3://')
    split = s3_path.split('/')
    bucket = split[2]
    prefix = '/'.join(split[3:])
    print("S3 filepath is %s" %s3_path)

  
    datafiles = s3.list_objects_v2(Bucket=bucket, Prefix = prefix)['Contents']
    prefix_df = []
    fs = s3fs.S3FileSystem()

    for file in datafiles[0:]:
        key = file['Key']

        with fs.open('s3://'+ bucket + '/' + key) as f:
            df = pd.read_csv(f, compression='gzip', index_col=0, parse_dates=True, decimal=',', sep=',')
            #print("S3 file %s is reading" %f)
            
        prefix_df.append(df)
        #print("File retrieved %s" %key)
        
    return pd.concat(prefix_df)

pd_df = load_df_from_s3(redshift_unload_path)

In [None]:
num_timeseries = pd_df.shape[1]
data_trip = pd_df.resample('2H').sum() / 8
timeseries = []
for i in range(num_timeseries):
    timeseries.append(np.trim_zeros(data_trip.iloc[:,i], trim='f'))

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(15, 2), sharex=True)
axx = axs.ravel()
for i in range(0, 2):
    timeseries[i].loc["2018-10-01":"2020-12-31"].plot(ax=axx[i])
    axx[i].set_xlabel("Date")    
    axx[i].set_ylabel("Ride count: vendor- %d" %i)   
    axx[i].grid(which='minor', axis='x')

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(15, 4), sharex=True)
axx = axs.ravel()
for i in range(0, 2):
    timeseries[i].loc["2019-01-01":"2019-12-31"].plot(ax=axx[i])
    axx[i].set_xlabel("date")    
    axx[i].set_ylabel("Ride count: vendor- %d" %i)   
    axx[i].grid(which='minor', axis='x')

### Train and Test splits

Often times one is interested in evaluating the model or tuning its hyperparameters by looking at error metrics on a hold-out test set. Here we split the available data into train and test sets for evaluating the trained model. For standard machine learning tasks such as classification and regression, one typically obtains this split by randomly separating examples into train and test sets. However, in forecasting it is important to do this train/test split based on time rather than by time series.

In this example, we will reserve the last section of each of the time series for evalutation purpose and use only the first part as training data. 

In [None]:
# we use 2 hour frequency for the time series
freq = '2H'

# we predict for 7 days
prediction_length = 7 * 12

# we also use 7 days as context length, this is the number of state updates accomplished before making predictions
context_length = 7 * 12

We specify here the portion of the data that is used for training: the model sees data from 2019-01-01 to 2019-04-01 for training.

In [None]:
start_dataset = pd.Timestamp("2019-01-01 00:00:00", freq=freq)
end_training = pd.Timestamp("2020-01-31 00:00:00", freq=freq)

The DeepAR JSON input format represents each time series as a JSON object. In the simplest case each time series just consists of a start time stamp (``start``) and a list of values (``target``). For more complex cases, DeepAR also supports the fields ``dynamic_feat`` for time-series features and ``cat`` for categorical features, which we will use  later.

In [None]:
training_data = [
    {
        "start": str(start_dataset),
        "target": ts[start_dataset:end_training][:-1].tolist()  # We use -1, because pandas indexing includes the upper bound 
    }
    for ts in timeseries
]

As test data, we will consider time series extending beyond the training range: these will be used for computing test scores, by using the trained model to forecast their trailing 7 days, and comparing predictions with actual values.
To evaluate our model performance on more than one week, we generate test data that extends to 1, 2, 3, 4 weeks beyond the training range. This way we perform *rolling evaluation* of our model.

In [None]:
num_test_windows = 4

idx =  []
print(len(pd.date_range(start_dataset, end_training )))
period_range = len(pd.date_range(start_dataset, end_training))
      
for i in range(1, num_test_windows + 1) :
    idx.append(pd.date_range(start_dataset, periods = period_range + i * prediction_length, freq=freq))

test_data = [
    {   
        "start": str(start_dataset),
        "target": ts[idx[k]].tolist()
    }
    for k in range(0, num_test_windows)
    for ts in timeseries
]

Let's now write the dictionary to the `jsonlines` file format that DeepAR understands (it also supports gzipped jsonlines and parquet).

In [None]:
def write_dicts_to_file(path, data):
    with open(path, 'wb') as fp:
        for d in data:
            fp.write(json.dumps(d).encode("utf-8"))
            fp.write("\n".encode('utf-8'))

In [None]:
%%time
write_dicts_to_file("train.json", training_data)
write_dicts_to_file("test.json", test_data)

Now that we have the data files locally, let us copy them to S3 where DeepAR can access them. Depending on your connection, this may take a couple of minutes

In [None]:
s3 = boto3.resource('s3')
def copy_to_s3(local_file, s3_path, override=False):
    assert s3_path.startswith('s3://')
    split = s3_path.split('/')
    bucket = split[2]
    path = '/'.join(split[3:])
    buk = s3.Bucket(bucket)
    
    if len(list(buk.objects.filter(Prefix=path))) > 0:
        if not override:
            print('File s3://{}/{} already exists.\nSet override to upload anyway.\n'.format(s3_bucket, s3_path))
            return
        else:
            print('Overwriting existing file')
    with open(local_file, 'rb') as data:
        print('Uploading file to {}'.format(s3_path))
        buk.put_object(Key=path, Body=data)

In [None]:
%%time
copy_to_s3("train.json", s3_data_path + "/train/train.json")
copy_to_s3("test.json", s3_data_path + "/test/test.json")

Let's have a look to what we just wrote to S3.

In [None]:
s3filesystem = s3fs.S3FileSystem()
with s3filesystem.open(s3_data_path + "/train/train.json", 'rb') as fp:
    print(fp.readline().decode("utf-8")[:100] + "...")

We are all set with our dataset processing, we can now call DeepAR to train a model and generate predictions.

### Train a model

Here we define the estimator that will launch the training job.

In [None]:
estimator = sagemaker.estimator.Estimator(
    sagemaker_session=sagemaker_session,
    image_name=image_name,
    role=redshift_iam_role,
    train_instance_count=1,
    train_instance_type='ml.c4.2xlarge',
    base_job_name='redshift-deepar-nyctaxi-demo',
    output_path=s3_output_path
)

Next we need to set the hyperparameters for the training job. For example frequency of the time series used, number of data points the model will look at in the past, number of predicted data points. The other hyperparameters concern the model to train (number of layers, number of cells per layer, likelihood function) and the training options (number of epochs, batch size, learning rate...). We use default parameters for every optional parameter in this case (you can always use Sagemaker Automated Model Tuning to tune them).

In [None]:
hyperparameters = {
    "time_freq": freq,
    "epochs": "400",
    "early_stopping_patience": "40",
    "mini_batch_size": "64",
    "learning_rate": "5E-4",
    "context_length": str(context_length),
    "prediction_length": str(prediction_length)
}

In [None]:
estimator.set_hyperparameters(**hyperparameters)

We are ready to launch the training job. SageMaker will start an EC2 instance, download the data from S3, start training the model and save the trained model.

If you provide the test data channel as we do in this example, DeepAR will also calculate accuracy metrics for the trained model on this test. This is done by predicting the last prediction_length points of each time-series in the test set and comparing this to the actual value of the time-series.

Note: the next cell may take a few minutes to complete, depending on data size, model complexity, training options.

In [None]:
%%time
data_channels = {
    "train": "{}/train/".format(s3_data_path),
    "test": "{}/test/".format(s3_data_path)
}

estimator.fit(inputs=data_channels, wait=True)

Since you pass a test set in this example, accuracy metrics for the forecast are computed and logged (see bottom of the log). You can find the definition of these metrics from our documentation. You can use these to optimize the parameters and tune your model or use SageMaker's Automated Model Tuning service to tune the model for you.

   ### Create endpoint and predictor

Now that we have a trained model, we can use it to perform predictions by deploying it to an endpoint.

**Note: Remember to delete the endpoint after running this experiment. A cell at the very bottom of this notebook will do that: make sure you run it at the end.**

To query the endpoint and perform predictions, we can define the following utility class: this allows making requests using `pandas.Series` objects rather than raw JSON strings.

Create endpoint and predictor
Now that we have a trained model, we can use it to perform predictions by deploying it to an endpoint.

Note: Remember to delete the endpoint after running this experiment. A cell at the very bottom of this notebook will do that: make sure you run it at the end.

To query the endpoint and perform predictions, we can define the following utility class: this allows making requests using pandas.Series objects rather than raw JSON strings.

In [None]:
class DeepARPredictor(sagemaker.predictor.RealTimePredictor):
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, content_type=sagemaker.content_types.CONTENT_TYPE_JSON, **kwargs)
        
    def predict(self, ts, cat=None, dynamic_feat=None, 
                num_samples=100, return_samples=False, quantiles=["0.1", "0.5", "0.9"]):
        """Requests the prediction of for the time series listed in `ts`, each with the (optional)
        corresponding category listed in `cat`.
        
        ts -- `pandas.Series` object, the time series to predict
        cat -- integer, the group associated to the time series (default: None)
        num_samples -- integer, number of samples to compute at prediction time (default: 100)
        return_samples -- boolean indicating whether to include samples in the response (default: False)
        quantiles -- list of strings specifying the quantiles to compute (default: ["0.1", "0.5", "0.9"])
        
        Return value: list of `pandas.DataFrame` objects, each containing the predictions
        """
        prediction_time = ts.index[-1] + pd.Timedelta(freq)
        quantiles = [str(q) for q in quantiles]
        req = self.__encode_request(ts, cat, dynamic_feat, num_samples, return_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req)
        return self.__decode_response(res, ts.index.freq, prediction_time, return_samples)
    
    def __encode_request(self, ts, cat, dynamic_feat, num_samples, return_samples, quantiles):
        instance = series_to_dict(ts, cat if cat is not None else None, dynamic_feat if dynamic_feat else None)

        configuration = {
            "num_samples": num_samples,
            "output_types": ["quantiles", "samples"] if return_samples else ["quantiles"],
            "quantiles": quantiles
        }
        
        http_request_data = {
            "instances": [instance],
            "configuration": configuration
        }
        
        return json.dumps(http_request_data).encode('utf-8')
    
    def __decode_response(self, response, freq, prediction_time, return_samples):
        # we only sent one time series so we only receive one in return
        # however, if possible one will pass multiple time series as predictions will then be faster
        predictions = json.loads(response.decode('utf-8'))['predictions'][0]
        prediction_length = len(next(iter(predictions['quantiles'].values())))
        #prediction_index = pd.DatetimeIndex(start=prediction_time, freq=freq, periods=prediction_length)
        prediction_index = pd.date_range(prediction_time, periods = prediction_length, freq=freq)
        if return_samples:
            dict_of_samples = {'sample_' + str(i): s for i, s in enumerate(predictions['samples'])}
        else:
            dict_of_samples = {}
        return pd.DataFrame(data={**predictions['quantiles'], **dict_of_samples}, index=prediction_index)

    def set_frequency(self, freq):
        self.freq = freq
        
def encode_target(ts):
    return [x if np.isfinite(x) else "NaN" for x in ts]        

def series_to_dict(ts, cat=None, dynamic_feat=None):
    """Given a pandas.Series object, returns a dictionary encoding the time series.

    ts -- a pands.Series object with the target time series
    cat -- an integer indicating the time series category

    Return value: a dictionary
    """
    obj = {"start": str(ts.index[0]), "target": encode_target(ts)}
    if cat is not None:
        obj["cat"] = cat
    if dynamic_feat is not None:
        obj["dynamic_feat"] = dynamic_feat        
    return obj

Now we can deploy the model and create and endpoint that can be queried using our custom DeepARPredictor class.

In [None]:
predictor = estimator.deploy(
    initial_instance_count=1,
    instance_type='ml.m4.xlarge',
    predictor_cls=DeepARPredictor)

### Make predictions and plot results

Now we can use the `predictor` object to generate predictions.

In [None]:
predictor.predict(ts=timeseries[1], quantiles=[0.10, 0.5, 0.90]).head()

Below we define a plotting function that queries the model and displays the forecast.

In [None]:
def plot(
    predictor, 
    target_ts, 
    cat=None, 
    dynamic_feat=None, 
    forecast_date=end_training, 
    show_samples=False, 
    plot_history=7 * 12,
    confidence=80
):
    print("calling served model to generate predictions starting from {}".format(str(forecast_date)))
    assert(confidence > 50 and confidence < 100)
    low_quantile = 0.5 - confidence * 0.005
    up_quantile = confidence * 0.005 + 0.5
        
    # we first construct the argument to call our model
    args = {
        "ts": target_ts[:forecast_date],
        "return_samples": show_samples,
        "quantiles": [low_quantile, 0.5, up_quantile],
        "num_samples": 100
    }


    if dynamic_feat is not None:
        args["dynamic_feat"] = dynamic_feat
        fig = plt.figure(figsize=(20, 6))
        ax = plt.subplot(2, 1, 1)
    else:
        fig = plt.figure(figsize=(20, 3))
        ax = plt.subplot(1,1,1)
    
    if cat is not None:
        args["cat"] = cat
        ax.text(0.9, 0.9, 'cat = {}'.format(cat), transform=ax.transAxes)

    # call the end point to get the prediction
    prediction = predictor.predict(**args)

    # plot the samples
    if show_samples: 
        for key in prediction.keys():
            if "sample" in key:
                prediction[key].plot(color='lightskyblue', alpha=0.2, label='_nolegend_')
                
                
    # plot the target
    #target_section = target_ts[forecast_date-plot_history:forecast_date+prediction_length]
    target_section = target_ts[forecast_date-pd.Timedelta(hours=plot_history):forecast_date+pd.Timedelta(hours=prediction_length)]
    target_section.plot(color="black", label='target')
    
    # plot the confidence interval and the median predicted
    ax.fill_between(
        prediction[str(low_quantile)].index, 
        prediction[str(low_quantile)].values, 
        prediction[str(up_quantile)].values, 
        color="b", alpha=0.3, label='{}% confidence interval'.format(confidence)
    )
    prediction["0.5"].plot(color="b", label='P50')
    ax.legend(loc=2)    
    
    # fix the scale as the samples may change it
    ax.set_ylim(target_section.min() * 0.5, target_section.max() * 1.5)
    
    if dynamic_feat is not None:
        for i, f in enumerate(dynamic_feat, start=1):
            ax = plt.subplot(len(dynamic_feat) * 2, 1, len(dynamic_feat) + i, sharex=ax)
            feat_ts = pd.Series(
                #index=pd.DatetimeIndex(start=target_ts.index[0], freq=target_ts.index.freq, periods=len(f)),
                index = pd.date_range(target_ts.index[0], periods = len(f), freq=target_ts.index.freq),
                data=f
            )
            #feat_ts[forecast_date-plot_history:forecast_date+prediction_length].plot(ax=ax, color='g')
            feat_ts[forecast_date-pd.Timedelta(hours=plot_history):forecast_date+pd.Timedelta(hours=prediction_length)].plot(ax=ax, color='g')

We can interact with the function previously defined, to look at the forecast of any customer at any point in (future) time. 

For each request, the predictions are obtained by calling our served model on the fly.

Here we forecast the consumption of an office after week-end (note the lower week-end consumption). 
You can select any time series and any forecast date, just click on `Run Interact` to generate the predictions from our served endpoint and see the plot.

In [None]:
style = {'description_width': 'initial'}

In [None]:
@interact_manual(
    vendor_id=IntSlider(min=0, max=1, value=1, style=style), 
    forecast_day=IntSlider(min=0, max=100, value=51, style=style),
    confidence=IntSlider(min=60, max=95, value=80, step=5, style=style),
    history_weeks_plot=IntSlider(min=1, max=20, value=1, style=style),
    show_samples=Checkbox(value=False),
    continuous_update=False
)
def plot_interact(vendor_id, forecast_day, confidence, history_weeks_plot, show_samples):
    plot(
        predictor,
        target_ts=timeseries[vendor_id],
        forecast_date=end_training + datetime.timedelta(days=forecast_day),
        show_samples=show_samples,
        plot_history=history_weeks_plot * 12 * 7,
        confidence=confidence
    )

### Delete endpoints

In [None]:
predictor.delete_endpoint()

### added by Roop

In [41]:
query_str = """SELECT quantity,store_hk,
       category_hk,
       item_hk,
       item_iscorporate,
       item_isstocked,
       item_includeai,
       item_isdepartmentitem,
       item_isedi,
       item_ispriceunitonlabel,
       item_issendtomorgue,
       item_isinactive,
       item_isconverted,
       item_isdiscontinued,
       item_isincludeexo,
       item_isbasketactive,
       item_islocalitemreturnloss,
       category_iscosmetic,
       category_isotc,
       category_isrx,
       category_iscoupon,
       transaction_year,
       transaction_quarter,
       transaction_month,
       transaction_month_week,
       transaction_week_day,
       holiday
FROM public.product_sales WHERE sl <10"""

res = client_redshift.execute_statement(Database= db, SecretArn= secret_arn, Sql= query_str, ClusterIdentifier= cluster_id)

id = res["Id"]
# Waiter in try block and wait for DATA API to return
try:
    custom_waiter.wait(Id=id)
    print("Done waiting to finish Data API.")
except WaiterError as e:
    print (e)

output=client_redshift.get_statement_result(Id=id)
nrows=output["TotalNumRows"]
ncols=len(output["ColumnMetadata"])
#print("Number of columns: %d" %ncols)
resultrows=output["Records"]

Done waiting to finish Data API.


In [None]:
unload ('select * from venue')
to 's3://mybucket/tickit/unload/venue_' 
iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole'
parallel off;

In [39]:
df.count()

quantity                      9
store_hk                      9
category_hk                   9
item_hk                       9
item_iscorporate              9
item_isstocked                9
item_includeai                9
item_isdepartmentitem         9
item_isedi                    9
item_ispriceunitonlabel       9
item_issendtomorgue           9
item_isinactive               9
item_isconverted              9
item_isdiscontinued           9
item_isincludeexo             9
item_isbasketactive           9
item_islocalitemreturnloss    9
category_iscosmetic           9
category_isotc                9
category_isrx                 9
category_iscoupon             9
transaction_year              9
transaction_quarter           9
transaction_month             9
transaction_month_week        9
transaction_week_day          9
holiday                       9
dtype: int64

In [20]:
train_data = df.sample(frac=0.8,random_state=200)

In [22]:
test_data = df.drop(train_data.index)

In [23]:
test_data_no_target = test_data.drop(columns=['quantity'])

In [25]:

region = boto3.Session().region_name

session = sagemaker.Session()

In [26]:
bucket = session.default_bucket()
prefix = 'sagemaker/public'

In [27]:
train_file = 'train_data.csv';
train_data.to_csv(train_file, index=False, header=True)
train_data_s3_path = session.upload_data(path=train_file, key_prefix=prefix + "/train")
print('Train data uploaded to: ' + train_data_s3_path)

test_file = 'test_data.csv';
test_data_no_target.to_csv(test_file, index=False, header=False)
test_data_s3_path = session.upload_data(path=test_file, key_prefix=prefix + "/test")
print('Test data uploaded to: ' + test_data_s3_path)

Train data uploaded to: s3://sagemaker-us-east-1-001876746742/sagemaker/public/train/train_data.csv
Test data uploaded to: s3://sagemaker-us-east-1-001876746742/sagemaker/public/test/test_data.csv


In [35]:
input_data_config = [{
      'DataSource': {
        'S3DataSource': {
          'S3DataType': 'S3Prefix',
          'S3Uri': 's3://{}/{}/train'.format(bucket,prefix)
        }
      },
      'TargetAttributeName': 'quantity'
    }
  ]

output_data_config = {
    'S3OutputPath': 's3://{}/{}/output'.format(bucket,prefix)
  }

In [33]:
from sagemaker import get_execution_role
role = get_execution_role()
# This is the client we will use to interact with SageMaker AutoPilot
sm = boto3.Session().client(service_name='sagemaker',region_name=region)


In [36]:
from time import gmtime, strftime, sleep
timestamp_suffix = strftime('%d-%H-%M-%S', gmtime())

auto_ml_job_name = 'automl-public-' + timestamp_suffix
print('AutoMLJobName: ' + auto_ml_job_name)

sm.create_auto_ml_job(AutoMLJobName=auto_ml_job_name,
                      InputDataConfig=input_data_config,
                      OutputDataConfig=output_data_config,
                      AutoMLJobConfig={'CompletionCriteria':
                                       {'MaxCandidates': 20}
                                      },
                      RoleArn=role)

AutoMLJobName: automl-public-29-00-48-43


ClientError: An error occurred (ValidationException) when calling the CreateAutoMLJob operation: Dataset is not large enough: expected minimum number of rows is 500 but only 7 were found.

## end of roop code