# Step 1: Basic setup

In [None]:
# conda update -n base -c conda-forge conda

In [None]:
conda install bokeh=2.4.3

In [None]:
conda install pandas_bokeh=0.5.5

In [None]:
import sys
import os
from datetime import datetime

# importing forecast notebook utility from notebooks/common directory
sys.path.insert(0, os.path.abspath("./common/"))
import util
import util.fcst_utils

%reload_ext autoreload
import boto3
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline 
plt.rcParams['figure.figsize'] = (15.0, 5.0)

import pandas_bokeh
pandas_bokeh.output_notebook()

In [None]:
# Setup variables
owner = "martin.macecek@rearc.io"
type = "Internal"
usage = "Playground"
prefix = "mac-training"
role_name = f"{prefix}-forecasting"
bucket_name = f"{prefix}-bucket-275279264324-us-east-1"
data_key = "forecasting/input/RIVN.csv"
prepared_data_key_prefix = "forecasting/prepared/rivn"
item_id = "RIVN"
target_column_name = "close"

# Setup more variables
s3_target_data_key = f"s3://{bucket_name}/{prepared_data_key_prefix}.csv"
s3_related_data_key = f"s3://{bucket_name}/{prepared_data_key_prefix}_rts.csv"
date_format = '%Y%m%d_%H%M%S'
ui_date_format = '%a, %d %b %Y %H:%M:%S %Z'

# Tags for resource tagging
tags = [{'Key': 'Owner', 'Value': owner},
        {'Key': 'Type', 'Value': type},
        {'Key': 'Usage', 'Value': usage}]

In [None]:
region = boto3.Session().region_name
account_id = boto3.client('sts').get_caller_identity().get('Account')
print(f"Account: {account_id}, Region: {region}")

In [None]:
# Connect API sessions
session = boto3.Session(region_name=region) 
s3 = session.client(service_name='s3')
forecast = session.client(service_name='forecast') 
forecastquery = session.client(service_name='forecastquery')

In [None]:
# Create or retrieve the role to provide to Amazon Forecast.
role_arn = util.get_or_create_iam_role(role_name=role_name)

# echo user inputs without account
print(f"Success! Role '{role_arn.split('/')[1]}' ready for use.")

In [None]:
util.get_or_create_bucket(bucket_name, region=region)
print(f"Success! Bucket '{bucket_name}' ready for use.")

# Step 2: Data preparation

In [None]:
stock_df = pd.read_csv(f"s3://{bucket_name}/{data_key}", dtype=object)
stock_df.drop(["Vol.", "Change %"], axis=1, inplace=True)
stock_df["Date"] = pd.to_datetime(stock_df["Date"], format="%m/%d/%Y").dt.date
stock_df[["Price", "Open", "High", "Low"]] = stock_df[["Price", "Open", "High", "Low"]].astype(float)
stock_df.rename(columns={'Date': 'datetime', 'Price': target_column_name, 'Open': 'open', 'High': 'high', 'Low': 'low'}, inplace=True)
stock_df["item_id"] = item_id
stock_df.head()

In [None]:
new_index = pd.Index(pd.date_range(stock_df.datetime.min(), stock_df.datetime.max()), name="datetime")
stock_df.set_index("datetime").reindex(new_index)
stock_df = stock_df.set_index("datetime").reindex(new_index).reset_index().ffill()
stock_df = stock_df.sort_values(by=['datetime'], ascending=False)
stock_df.head()

In [None]:
stock_df.plot(x='datetime', y=[target_column_name, 'open', 'high', 'low'], figsize=(15, 8))
plt.xlabel('Date Time')
plt.ylabel('Stock Price')
plt.show()

# Step 2: Prepare and Save the Target Time Series

In this exmple, we are only using the supplemental fields `open`, `high` and `low` for forecasting.
Later we may explore with supplemental values from other stocks.

In [None]:
# Forecast length in days (Units is defined below)
FORECAST_LENGTH = 30

# What is your forecast time unit granularity?
# Choices are: ^Y|M|W|D|h|30min|15min|10min|5min|1min$ 
DATASET_FREQUENCY = "D"
TIMESTAMP_FORMAT = "yyyy-MM-dd"
# delimiter = ','

# What name do you want to give this project?  
# We will use this same name for your Forecast Dataset Group name.
PROJECT = 'rivn-forecast'
DATA_VERSION = 1

In [None]:
target_df = stock_df[['item_id', 'datetime', target_column_name]][:-FORECAST_LENGTH]
target_df.head(5)

In [None]:
rts_df = stock_df[['item_id', 'datetime', 'open', 'high', 'low']]
rts_df.head(5)

In [None]:
print(f"{len(target_df)} + {FORECAST_LENGTH} = {len(rts_df)}")
assert len(target_df) + FORECAST_LENGTH == len(rts_df), "length doesn't match"

In [None]:
target_df.to_csv(s3_target_data_key, index= False, header = False)
rts_df.to_csv(s3_related_data_key, index= False, header = False)

# Step 3: Create the Dataset Group and Dataset

## Dataset Group

In [None]:
dataset_group_name = f"{prefix}_{PROJECT}_{DATA_VERSION}".replace("-", "_")
print(f"Dataset Group Name = {dataset_group_name}")

In [None]:
dataset_arns = []
try:
    create_dataset_group_response = \
        forecast.create_dataset_group(Domain="RETAIL",
                                      DatasetGroupName=dataset_group_name,
                                      DatasetArns=dataset_arns,
                                      Tags=tags
                                     )
    dataset_group_arn = create_dataset_group_response['DatasetGroupArn']
    status = util.wait(lambda: forecast.describe_dataset_group(DatasetGroupArn=dataset_group_arn))
    assert status
except forecast.exceptions.ResourceAlreadyExistsException:
    dataset_group_arn = f"arn:aws:forecast:{region}:{account_id}:dataset-group/{dataset_group_name}"
    print(f"Dataset group {dataset_group_arn} already exists.")

## Target Schema

In [None]:
# Specify the schema of your dataset here. Make sure the order of columns matches the raw data files.
tts_schema = {
   "Attributes": [
      {
         "AttributeName": "item_id",
         "AttributeType": "string"
      },
      {
         "AttributeName": "timestamp",
         "AttributeType": "timestamp"
      },
      {
         "AttributeName": "demand",
         "AttributeType": "float"
      }
   ]
}

## Target Dataset

In [None]:
tts_dataset_name = f"{dataset_group_name}_tts"
print(tts_dataset_name)

In [None]:
try:
    create_dataset_tts_response = \
        forecast.create_dataset(Domain="RETAIL",
                                DatasetType='TARGET_TIME_SERIES',
                                DatasetName=tts_dataset_name,
                                DataFrequency=DATASET_FREQUENCY,
                                Schema=tts_schema,
                                Tags=tags
                               )
    tts_dataset_arn = create_dataset_tts_response['DatasetArn']
    status = util.wait(lambda: forecast.describe_dataset(DatasetArn=tts_dataset_arn))
    assert status
except forecast.exceptions.ResourceAlreadyExistsException:
    tts_dataset_arn = f"arn:aws:forecast:{region}:{account_id}:dataset/{tts_dataset_name}"
    print(f"Target dataset {tts_dataset_arn} already exists.")

## Related schema

In [None]:
# Specify the schema of your dataset here. Make sure the order of columns matches the raw data files.
rts_schema = {
   "Attributes": [
      {
         "AttributeName": "item_id",
         "AttributeType": "string"
      },
      {
         "AttributeName": "timestamp",
         "AttributeType": "timestamp"
      },
      {
         "AttributeName": "open_price",
         "AttributeType": "float"
      },
      {
         "AttributeName": "high_price",
         "AttributeType": "float"
      },
      {
         "AttributeName": "low_price",
         "AttributeType": "float"
      }
   ]
}

## Related dataset

In [None]:
rts_dataset_name = f"{dataset_group_name}_rts"
print(rts_dataset_name)

In [None]:
try:
    create_dataset_rts_response = \
        forecast.create_dataset(Domain="RETAIL",
                                DatasetType='RELATED_TIME_SERIES',
                                DatasetName=rts_dataset_name,
                                DataFrequency=DATASET_FREQUENCY,
                                Schema=rts_schema,
                                Tags=tags
                               )
    rts_dataset_arn = create_dataset_rts_response['DatasetArn']
    status = util.wait(lambda: forecast.describe_dataset(DatasetArn=rts_dataset_arn))
    assert status
except forecast.exceptions.ResourceAlreadyExistsException:
    rts_dataset_arn = f"arn:aws:forecast:{region}:{account_id}:dataset/{rts_dataset_name}"
    print(f"Related dataset {rts_dataset_arn} already exists.")

In [None]:
dataset_arns = []
dataset_arns.append(tts_dataset_arn)
dataset_arns.append(rts_dataset_arn)
update_dataset_response = forecast.update_dataset_group(DatasetGroupArn=dataset_group_arn, DatasetArns=dataset_arns)
status = util.wait(lambda: forecast.describe_dataset_group(DatasetGroupArn=dataset_group_arn))
assert status

# Step 4: Import data from S3 to Forecast

## Target data

**Note:** Depending on the data size, the import can take 10 mins or more to become **ACTIVE**.

In [None]:
import_tts_dataset_response = True
if len(util.get_dataset_import_jobs(tts_dataset_arn, forecast)) > 0:
    print("Target dataset has already imported data.")
    import_tts_dataset_response = True if input("Re-import (y/N)? ").lower() == "y" else False

if import_tts_dataset_response:
    tts_dataset_import_job_response = \
        forecast.create_dataset_import_job(DatasetImportJobName=f"tts_job_{datetime.now().strftime(date_format)}",
                                           DatasetArn=tts_dataset_arn,
                                           DataSource= {
                                             "S3Config" : {
                                                 "Path": s3_target_data_key,
                                                 "RoleArn": role_arn
                                             } 
                                           },
                                           TimestampFormat=TIMESTAMP_FORMAT,
                                           Tags=tags
                                          )
    tts_dataset_import_job_arn=tts_dataset_import_job_response['DatasetImportJobArn']
    status = util.wait(lambda: forecast.describe_dataset_import_job(DatasetImportJobArn=tts_dataset_import_job_arn))
    assert status
    print("Target data imported.")
else:
    print("Target data re-import skipped")

## Related data

**Note:** Depending on the data size, the import can take 10 mins or more to become **ACTIVE**.

In [None]:
import_rts_dataset_response = True
if len(util.get_dataset_import_jobs(rts_dataset_arn, forecast)) > 0:
    print("Related dataset has already imported data.")
    import_rts_dataset_response = True if input("Re-import (y/N)? ").lower() == "y" else False

if import_rts_dataset_response:
    rts_dataset_import_job_response = \
        forecast.create_dataset_import_job(DatasetImportJobName=f"rts_job_{datetime.now().strftime(date_format)}",
                                           DatasetArn=rts_dataset_arn,
                                           DataSource= {
                                             "S3Config" : {
                                                 "Path": s3_related_data_key,
                                                 "RoleArn": role_arn
                                             } 
                                           },
                                           TimestampFormat=TIMESTAMP_FORMAT,
                                           Tags=tags
                                          )
    rts_dataset_import_job_arn = rts_dataset_import_job_response["DatasetImportJobArn"]
    status = util.wait(lambda: forecast.describe_dataset_import_job(DatasetImportJobArn=rts_dataset_import_job_arn))
    assert status
    print("Related data imported.")
else:
    print("Related data re-import skipped")

# Step 5: Training and Evaluation

## DeepAR+

In [None]:
algorithm_arn = 'arn:aws:forecast:::algorithm/'
algorithm = 'Deep_AR_Plus'
algorithm_arn_deep_ar_plus = algorithm_arn + algorithm
predictor_name_deep_ar = f"{dataset_group_name}_{algorithm.lower()}"
print(f"Predictor Name = {predictor_name_deep_ar}")

**NOTE:** Training a forecast model can take several hours to become **ACTIVE**.

In [None]:
retrain_predictor_deep_ar = False
existing_predictor_deep_ar = util.get_predictor(predictor_name_deep_ar, forecast)

if existing_predictor_deep_ar:
    predictor_arn_deep_ar = existing_predictor_deep_ar['PredictorArn']
    print(f"DeepAR+ Predictor {predictor_arn_deep_ar} already exists.")
    retrain_predictor_deep_ar = True if input("Retrain model (y/N)? ").lower() == "y" else False

if existing_predictor_deep_ar and retrain_predictor_deep_ar:
    util.delete_forecasts_by_predictor(predictor_arn_deep_ar, forecast)
    print(f"Deleting DeepAR+ Predictor {predictor_arn_deep_ar}...")
    util.wait_till_delete(lambda: forecast.delete_predictor(PredictorArn=predictor_arn_deep_ar))
elif existing_predictor_deep_ar and not retrain_predictor_deep_ar:
    print(f"Keeping existing DeepAR+ Predictor {predictor_arn_deep_ar}.")

if not existing_predictor_deep_ar or retrain_predictor_deep_ar:
    print(f"Creating DeepAR+ Predictor {predictor_arn_deep_ar}...")
    create_predictor_deep_ar_response = \
        forecast.create_predictor(PredictorName=predictor_name_deep_ar,
                                  AlgorithmArn=algorithm_arn_deep_ar_plus,
                                  ForecastHorizon=FORECAST_LENGTH,
                                  PerformAutoML=False,
                                  PerformHPO=False,
                                  InputDataConfig={
                                      "DatasetGroupArn": dataset_group_arn,
                                      "SupplementaryFeatures": [
                                          {"Name": "holiday",
                                           "Value": "US"}],
                                  },
                                  FeaturizationConfig={"ForecastFrequency": DATASET_FREQUENCY},
                                  Tags=tags
                                 )
    predictor_arn_deep_ar = create_predictor_deep_ar_response['PredictorArn']
    status = util.wait(lambda: forecast.describe_predictor(PredictorArn=predictor_arn_deep_ar))
    assert status

## Prophet

In [None]:
algorithm_arn = 'arn:aws:forecast:::algorithm/'
algorithm = 'Prophet'
algorithm_arn_prophet = algorithm_arn + algorithm
predictor_name_prophet = f"{dataset_group_name}_{algorithm.lower()}"
print(f"Predictor Name = {predictor_name_prophet}")

**NOTE:** Training a forecast model can take several hours to become **ACTIVE**.

In [None]:
retrain_predictor_prophet = False
existing_predictor_prophet = util.get_predictor(predictor_name_prophet, forecast)

if existing_predictor_prophet:
    predictor_arn_prophet = existing_predictor_prophet['PredictorArn']
    print(f"Prophet Predictor {predictor_arn_prophet} already exists.")
    retrain_predictor_prophet = True if input("Retrain model (y/N)? ").lower() == "y" else False

if existing_predictor_prophet and retrain_predictor_prophet:
    print(f"Deleting Prophet Predictor {predictor_arn_prophet}...")
    util.wait_till_delete(lambda: forecast.delete_predictor(PredictorArn=predictor_arn_prophet))
elif existing_predictor_prophet and not retrain_predictor_prophet:
    print(f"Keeping existing Prophet Predictor {predictor_arn_prophet}.")

if not existing_predictor_prophet or retrain_predictor_prophet:
    print(f"Creating Prophet Predictor {predictor_arn_prophet}...")
    create_predictor_response = \
        forecast.create_predictor(PredictorName=predictor_name_prophet,
                                  AlgorithmArn=algorithm_arn_prophet,
                                  ForecastHorizon=FORECAST_LENGTH,
                                  PerformAutoML=False,
                                  PerformHPO=False,
                                  InputDataConfig= {
                                      "DatasetGroupArn": dataset_group_arn,
                                      "SupplementaryFeatures": [
                                          {"Name": "holiday",
                                           "Value": "US"}],
                                  },
                                  FeaturizationConfig= {"ForecastFrequency": DATASET_FREQUENCY},
                                  Tags=tags
                                 )
    predictor_arn_prophet = create_predictor_response['PredictorArn']
    status = util.wait(lambda: forecast.describe_predictor(PredictorArn=predictor_arn_prophet))
    assert status

## Auto

In [None]:
predictor_name_auto = f"{dataset_group_name}_auto"
print(f"Predictor Name = {predictor_name_auto}")

**NOTE:** Training a forecast model can take several hours to become **ACTIVE**.

In [None]:
existing_predictor_auto = util.get_predictor(predictor_name_auto, forecast)
if existing_predictor_auto:
    predictor_arn_auto = existing_predictor_auto['PredictorArn']
    print(f"Auto Predictor {predictor_arn_auto} already exists.")
    if input("Retrain model (y/N)? ").lower() == "y":
        args = {
            "PredictorName": f"{predictor_name_auto}_retrain_{datetime.now().strftime(date_format)}",
            "ReferencePredictorArn": predictor_arn_auto
        }
    else:
        args = {}
else:
    args = {
        "PredictorName": predictor_name_auto,
        "ForecastHorizon": FORECAST_LENGTH,
        "ForecastFrequency": DATASET_FREQUENCY,
        "DataConfig": {
            "DatasetGroupArn": dataset_group_arn,
            "AdditionalDatasets": [
                {
                    "Name": "holiday",
                    "Configuration": {
                        "CountryCode": ["US"]
                    }
                }
            ]
        },
        "Tags": tags
    }

if args:
    create_predictor_auto_response = forecast.create_auto_predictor(**args)
    predictor_arn_auto = create_predictor_auto_response['PredictorArn']
    print(f"{'Retraining existing' if existing_predictor_auto else 'Creating new'} Auto Predictor {predictor_arn_auto}...")
    status = util.wait(lambda: forecast.describe_auto_predictor(PredictorArn=predictor_arn_auto))
    assert status
else:
    print(f"Keeping existing Auto Predictor {predictor_arn_auto}.")

## Step 6: Predictor Error Metrics

In [None]:
error_metrics_deep_ar_plus = forecast.get_accuracy_metrics(PredictorArn=predictor_arn_deep_ar)

In [None]:
error_metrics_prophet = forecast.get_accuracy_metrics(PredictorArn=predictor_arn_prophet)

In [None]:
error_metrics_auto = forecast.get_accuracy_metrics(PredictorArn=predictor_arn_auto)

In [None]:
def extract_summary_metrics(metric_response, predictor_name):
    df = pd.DataFrame(metric_response['PredictorEvaluationResults']
                 [0]['TestWindows'][0]['Metrics']['WeightedQuantileLosses'])
    df['Predictor'] = predictor_name
    return df

In [None]:
deep_ar_metrics = extract_summary_metrics(error_metrics_deep_ar_plus, "DeepAR")
prophet_metrics = extract_summary_metrics(error_metrics_prophet, "Prophet")
auto_metrics = extract_summary_metrics(error_metrics_auto, "Auto")

In [None]:
pd.concat([deep_ar_metrics, prophet_metrics, auto_metrics]) \
    .pivot(index='Quantile', columns='Predictor', values='LossValue').plot.bar();

# Step 7: Forecasting

## DeepAR+

In [None]:
forecast_name_prefix_deep_ar = f"{dataset_group_name}_deeparp"
forecast_name_deep_ar = f"{forecast_name_prefix_deep_ar}_{datetime.now().strftime(date_format)}"
print(f"Forecast Name = {forecast_name_deep_ar}")

In [None]:
existing_forecasts_deep_ar = util.get_forecasts(forecast_name_prefix_deep_ar, forecast, False)
create_new_forecast = True
if len(existing_forecasts_deep_ar) > 0:
    print(f"Forecasts exist with the latest one from {existing_forecasts_deep_ar[0]['CreationTime'].strftime(ui_date_format)}.")
    if not input("Create new forecast (y/N)? ").lower() == "y":
        forecast_arn_deep_ar = existing_forecasts_deep_ar[0]["ForecastArn"]
        create_new_forecast = False

if create_new_forecast:
    create_forecast_response_deep_ar = forecast.create_forecast(ForecastName=forecast_name_deep_ar,
                                                                PredictorArn=predictor_arn_deep_ar,
                                                                Tags=tags
                                                               )
    forecast_arn_deep_ar = create_forecast_response_deep_ar['ForecastArn']
    status = util.wait(lambda: forecast.describe_forecast(ForecastArn=forecast_arn_deep_ar))
    assert status

# Step 8: Querying

## DeepAR+

In [None]:
forecast_response_deep = forecastquery.query_forecast(
    ForecastArn=forecast_arn_deep_ar,
    Filters={"item_id": item_id})

In [None]:
forecast_response_deep

In [None]:
exact = util.load_exact_sol(s3_target_data_key, item_id, target_col_name=target_column_name)

In [None]:
def plot_forecasts(fcsts, exact, freq = '1D', forecastHorizon=30, time_back = 30, target_col_name='target', reverse=False):
    p10 = pd.DataFrame(fcsts['Forecast']['Predictions']['p10'])
    p50 = pd.DataFrame(fcsts['Forecast']['Predictions']['p50'])
    p90 = pd.DataFrame(fcsts['Forecast']['Predictions']['p90'])
    pred_int = p50['Timestamp'].apply(lambda x: pd.Timestamp(x))
    fcst_start_date = pred_int.iloc[0]
    fcst_end_date = pred_int.iloc[-1]
    time_int = exact['timestamp'].apply(lambda x: pd.Timestamp(x))
    if reverse:
        plt.plot(time_int.head(time_back),
                 exact[target_col_name].head(time_back).values,
                 color = 'r')
    else:
        plt.plot(time_int[-time_back:],
                 exact[target_col_name].values[-time_back:],
                 color = 'r')
    plt.plot(pred_int, p50['Value'].values, color = 'k')
    plt.fill_between(pred_int, 
                     p10['Value'].values,
                     p90['Value'].values,
                     color='b', alpha=0.3);
    plt.axvline(x=pd.Timestamp(fcst_start_date), linewidth=1, color='g', ls='dashed')
    plt.axvline(x=pd.Timestamp(fcst_end_date), linewidth=1, color='g', ls='dashed')
    plt.xticks(rotation=30)
    plt.legend(['Target', 'Forecast'], loc = 'lower left')


In [None]:
plot_forecasts(forecast_response_deep,
                    exact,
                    freq=f'1{DATASET_FREQUENCY}',
                    forecastHorizon=FORECAST_LENGTH,
                    time_back=30,
                    target_col_name=target_column_name,
                    reverse=True
                   )
plt.title("DeepAR Forecast");

In [None]:
exact

In [None]:
from bokeh.io
from bokeh.plotting import figure

# p=figure(min_width=500, height=500)

exact.plot_bokeh.line(x='timestamp', y=[target_column_name], width=500)

# Cleanup

## Datasets and Dataset Group

In [None]:
# util.delete_dataset_group(dataset_group_arn, forecast)
util.delete_dataset_group("arn:aws:forecast:us-east-1:275279264324:dataset-group/mac_training_rivn_forecast_1", forecast)