# Time-Series Forecasting with Amazon SageMaker Autopilot using SageMaker Python SDK
## Contents

1. Introduction
1. Setup
1. Model Training
1. Real-Time Predictions (Inference)
1. Batch Predictions (Inference)

## 1. Introduction

This notebook uses Amazon SageMaker Autopilot to train a time-series model and produce predictions against the trained model. At the top-level, customers fetch a set of tabular historical data from Amazon Redshift and convert the data into two csv files for training and inference and make an API call to train a model. Once the model has been trained, you can elect to produce prediction as a batch or via a real-time endpoint. As part of the training process, SageMaker Autopilot manages and runs multiple time series models concurrently. All of these models are combined into a single ensembled model which blends the candidate models in a ratio that minimizes forecast error. Customers are provided with metadata and models for the ensemble and all underlying candidate models too. SageMaker Autopilot orchestrates this entire process and provides several artifacts as a result.

These artifacts include:

1. backtest (holdout) forecasts per base model over multiple time windows,
2. accuracy metrics per base model,
3. backtest results and accuracy metrics for the ensembled model,
4. a scaled explainability report displaying the importance of each covariate and static metadata feature.
5. all model artifacts are provided as well on S3, which can be registered or use for batch/real-time inference

## 2. Setup

In [None]:
# Update boto3 using this method, or your preferred method
!pip install --upgrade boto3 botocore --quiet
!pip install --upgrade sagemaker --quiet

In [None]:
import sagemaker
import boto3
from sagemaker import get_execution_role
from time import gmtime, strftime, sleep
import datetime
from io import StringIO
import pandas as pd


region = boto3.Session().region_name
session = sagemaker.Session()

# Modify the following default_bucket to use a bucket of your choosing
bucket = session.default_bucket()
#bucket = 'my-bucket'
prefix = 'sales-automl'

role = get_execution_role()

# This is the client we will use to interact with SageMaker Autopilot
sm = boto3.Session().client(service_name="sagemaker", region_name=region)

In [None]:
%%sql project.redshift
-- Fetch data from Amazon Redshift using below SQL Query
select  sm.store_id, sm.store_name, dm.date_time as sales_date, ss.total_sales, sp.promo,sp.school_holiday
from    store_dim sm,
        date_dim dm,
        store_sales ss,
        store_promotions sp
where   sm.store_id = ss.store_id
and     dm.date_key = ss.date_key
and     sm.store_id = sp.store_id
and     dm.date_key = sp.date_key
order by sm.store_id

In [None]:
# Access the dataframe using the output of the previous cell.
df = _.df
df.head()

We provide a sample set of data to accompany this notebook. You may use our synthetic dataset, or alter this notebook to accommodate your own data. As a note, the next cell will copy a file to your S3 bucket and prefix defined in the last cell. As an alternate, we provide a method to copy the file to your local disk too.

IMPORTANT: When training a model, your input data can contain a mixture of covariate and static item metadata. Take care to create future-dated rows that extend to the end of your prediction horizon. In the future-dated rows, carry your static item metadata and expected covariate values. Future-dated target-value (y) should be empty. Please download the example synthetic file using the S3 copy command in the next cell. You can observe the data programmatically or in a text editor as an example.

The structure of the CSV file provided is as follows:

- **`store`**: unique ID for the store
- **`saledate`**: datetime `YYYY-MM-DD HH:mm:ss`
- **`sales`**: units of products sold that day
- **`promo`**: was it a promotion day?
- **`schoolholiday`**: was it a school holiday?

In [None]:
# Pull some data out for inference
stores = df.store_id.unique()

train_dfs = []
inference_dfs = []
for store in stores:
    # Are there store with less than 25 entries in the datafraME?
    try:
        assert len(df[df.store_id==store]) >= 25
    except:
        print(f"Store {store} has less than 25 entries")
        continue
    # Create the inference dataset by taking the last 5 datapoints of each store
    store_df = df[df.store_id==store].sort_values(by="sales_date")
    train_dfs.append(store_df[:-5])
    inference_dfs.append(store_df[-5:])
train_df = pd.concat(train_dfs)
inference_df = pd.concat(inference_dfs)

In [None]:
# Save the two dataframes from before into two csv's for training and inference
train_df.to_csv("sales_train.csv", index=False)
inference_df.to_csv("sales_inference.csv", index=False)

In [None]:
# Display the first few rows of a DataFrame
!head sales_train.csv

## 3. Model Training

Establish an AutoML training job name

In [None]:
filename = "sales_train.csv"
item_identifier_attribute_name="store_id"
target_attribute_name="total_sales"
timestamp_attribute_name="sales_date"
columns_to_be_filled_with_zeros_if_missing = ["promo", "school_holiday"]


base_job_name = "sales-automl"

Define training job specifications. More information about [create_auto_ml_job_v2](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/create_auto_ml_job_v2.html) can be found in our SageMaker documentation.</n></n>This JSON body leverages the built-in sample data schema. Please consult the documentation to understand how to alter the parameters for your unique schema.

In [None]:
from sagemaker.automl.automlv2 import AutoMLTimeSeriesForecastingConfig, AutoMLV2, LocalAutoMLDataChannel

input_data = LocalAutoMLDataChannel(
    data_type="S3Prefix",
    channel_type="training",
    path=filename,
    content_type="text/csv;header=present"
)

ts_config = AutoMLTimeSeriesForecastingConfig(
    forecast_frequency='D',  # The frequency of predictions in a forecast.
    forecast_horizon=5,  # The number of time-steps that the model predicts.
    forecast_quantiles=['p50','p60','p70','p80','p90'], # The quantiles used to train the model for forecasts at a specified quantile. 
    filling = {x: {'middlefill': 'zero', 'backfill' : 'zero', 'futurefill' : 'zero'} for x in columns_to_be_filled_with_zeros_if_missing},
    item_identifier_attribute_name=item_identifier_attribute_name,
    target_attribute_name=target_attribute_name,
    timestamp_attribute_name=timestamp_attribute_name,
    # grouping_attribute_names=['location_code']
)

automl_job = AutoMLV2(
    problem_config=ts_config,
    role=role,
    base_job_name=base_job_name,
    output_path=f's3://{bucket}/{prefix}/output'
)

With parameters now defined, invoke the [training job] using Python SDK (https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html) and monitor for its completion. You can expect the training to take about 1 hour.

In [None]:
%%time
automl_job.fit(input_data, wait=True, logs=True)

Retrieve the best Candidate. Below is an example to use the best candidate in the subsequent inference phase.

In [None]:
best_candidate= automl_job.best_candidate()
best_candidate_name = best_candidate['CandidateName']

## 4. Deploy the best model to a SageMaker Real-time endpoint

If you want to perform real-time inference, review this section. If you want to perform batch processing, you may skip the real-time inference section and move to Batch Predictions (Inference).


In [None]:
endpoint_name = f"ep-{best_candidate_name}-automl-ts"

automl_sm_model = automl_job.create_model(name=best_candidate_name, candidate=best_candidate)

predictor = automl_job.deploy(initial_instance_count=1, endpoint_name=endpoint_name, instance_type='ml.m5.xlarge')

## Now, we test the inference

The next cells help demonstrate opening a local CSV file for inference. Alternately, this data could come from S3, a database query or live application. In this example, the data is loaded into a Python memory object.


In [None]:
# Extracting a very small sample data from the sales_inference.csv to test the realtime endpoint

sales_inference_realtime = pd.read_csv("sales_inference.csv")

realtime_inference_test = sales_inference_realtime.sample(n=10)

realtime_inference_test.to_csv("sales_realtime.csv", index=False)

In [None]:
# A small sample file that corresponds to the sample training dataset and trained model schema

input_file = './sales_realtime.csv'
f=open(input_file,'r')
inference_data = f.read()
f.close()

In [None]:
from sagemaker.predictor import Predictor

realtime_predictor = Predictor(
    endpoint_name=endpoint_name,
    session = sagemaker.Session()
)

initial_args = {
    "EndpointName": endpoint_name,
    "Body": inference_data,
    "ContentType": "text/csv"
    }

In [None]:
response = realtime_predictor.predict(
    data=inference_data,
    initial_args=initial_args
)

In [None]:
# Decoding the byte data to a string, assuming UTF-8 encoding
decoded_data = response.decode('utf-8')

output_file = 'real-time-prediction-output.csv'
# Writing the decoded data to a CSV file
with open(output_file, 'w', newline='') as file:
    file.write(decoded_data)

In [None]:
df = pd.read_csv(StringIO(decoded_data), sep=',')
df.head(10)

## 5. Batch Inference with SageMaker Batch Transform

Amazon SageMaker Batch Transform is a high-performance and scalable service designed for running batch predictions on large datasets. It allows users to easily transform data and make predictions by deploying machine learning models without the need to manage any infrastructure. This service is particularly useful for scenarios where you need to process a large amount of data in a batch manner, such as for generating predictions from a trained model on a schedule or in response to specific events. Batch Transform automatically manages the computing resources required, scales them to match the volume of data, and efficiently processes the data in batches, making it a cost-effective solution for batch inference needs.

In [None]:
from sagemaker.session import Session

inference_file_name = "sales_inference.csv"

# Create the model object
model = automl_job.create_model("sales-automl")

# Upload inference data
inference_data = Session().upload_data(
    path=inference_file_name, 
    bucket=bucket, key_prefix=prefix+'/inference'
)

# Create the Transformer
transformer = model.transformer(
    instance_count=1,
    instance_type='ml.m5.12xlarge',
    output_path=f's3://{bucket}/{prefix}/batch_transform/output/',
    max_payload=0,  # in MB
    strategy='SingleRecord',
    assemble_with='Line',
)

# Start the transform job
transformer.transform(
    data=inference_data,
    content_type='text/csv;header=present',
    split_type='None',
)

# Wait for the transform job to finish
transformer.wait()

In [None]:
output_path = transformer.output_path
output_file = output_path+inference_file_name+'.out'
!aws s3 cp $output_file .

In [None]:
import pandas as pd
df = pd.read_csv(inference_file_name+'.out')
df

#### Cleanup Real-time Endpoint Resources

As needed, you can stop the endpoint and related billing costs as follows. When you need the endpoint again, you can follow the deployment steps again. Ideally, at a future time, another newer model is trained and able to be deployed as well.

In [None]:
realtime_predictor.delete_endpoint()

## 6. MLOps Pipeline

To automate re-training, we would normally use SageMaker Pipelines. However, AutoMLV2 is currently bugged with SageMaker Pipelines. We suggest using AWS Step Functions to automate model retraining and deployment.