# Deep Learning for Forecasting using GluonTS

## 1. Intro

GluonTS is an open source Python library for time series modeling, with a focus on deep learning architectures for forecasting.
It provides:
* Abstractions for prediction models
* Tools for model evaluation (backtesting)
* Model components, to help build custom models
* Pre-implemented, state-of-the-art forecasting models

We originally developed GluonTS on top of MXNet's Gluon API (hence the name), but the library is now compatible with PyTorch, too.

Useful links:
* GitHub: https://github.com/awslabs/gluon-ts (to file issues, trigger discussions, contribute changes, ...)
* Website: https://ts.gluon.ai (documentation, tutorials, ...)

Target users:
* **Researchers** who want to try out novel architectures for forecasting and compare them to the state of the art
* **Data scientist/solution architects** who want to experiment with several solutions to solve business problems
* **Machine learning engineer** who need to integrate forecasting models in production pipelines

### What you're going to learn

* Handle datasets in GluonTS (transform them and iterate them)
* Forecast and do backtest evaluations given a model (the "predictor" interface)
* Train a model based on pre-implemented architectures (the "estimator" interface)
* Implement a custom, neural-network-based architecture using PyTorch, and use it with the rest of GluonTS

## 2. Overview

In [None]:
from typing import List, Callable, Dict, Tuple, Any
from itertools import islice
from pprint import pprint
import numpy as np
import pandas as pd
import matplotlib
from matplotlib import pyplot as plt

GluonTS comes with a set of publicly available dataset, ready to use for training and testing models:

In [None]:
from gluonts.dataset.repository.datasets import get_dataset, dataset_names
from gluonts.dataset.util import to_pandas

In [None]:
dataset_names

For this tutorial, we're going to use the hourly dataset from the M4 competition:

In [None]:
dataset = get_dataset("traffic")

In [None]:
dataset.metadata

In [None]:
len(dataset.train)

In [None]:
len(dataset.test)

In [None]:
pprint(next(iter(dataset.train)))

In [None]:
to_pandas(next(iter(dataset.train)))[-24:]

In [None]:
plt.figure(figsize=(20, 15))

for idx, entry in enumerate(islice(dataset.train, 9)):
    ax = plt.subplot(3, 3, idx+1)
    to_pandas(entry)[-168:].plot()
    plt.grid()

plt.gcf().tight_layout()
plt.show()

In [None]:
from gluonts.model.seasonal_naive import SeasonalNaivePredictor

In [None]:
predictor_naive = SeasonalNaivePredictor(
    freq=dataset.metadata.freq,
    prediction_length=dataset.metadata.prediction_length,
    season_length=24 * 7,
)

In [None]:
forecasts = list(predictor_naive.predict(dataset.train))

In [None]:
forecasts[0]

In [None]:
plt.figure(figsize=(20, 15))

for idx, (entry, f) in islice(enumerate(zip(dataset.train, forecasts)), 9):
    ax = plt.subplot(3, 3, idx+1)
    to_pandas(entry)[-24 * 7:].plot()
    f.plot(color="r")
    plt.grid()
    plt.title(f"item_id: {f.item_id}")

plt.gcf().tight_layout()
plt.show()

In [None]:
from gluonts.evaluation import Evaluator
from gluonts.evaluation.backtest import backtest_metrics, make_evaluation_predictions

In [None]:
evaluator = Evaluator(quantiles=[0.1, 0.5, 0.9])

In [None]:
aggregate_metrics_naive, entrywise_metrics_naive = backtest_metrics(dataset.test, predictor_naive, evaluator=evaluator)

In [None]:
df_evaluations = pd.DataFrame.from_records(
    [aggregate_metrics_naive],
    index=["Naive seasonal"]
)

In [None]:
df_evaluations.transpose()

In [None]:
entrywise_metrics_naive

In [None]:
gb = entrywise_metrics_naive.groupby("item_id")

In [None]:
gb["MAPE"].agg(np.mean)

## 2. Using pre-implemented models: DeepAR

In [None]:
from gluonts.torch.model.deepar import DeepAREstimator

In [None]:
prediction_length = dataset.metadata.prediction_length

In [None]:
deepar_estimator = DeepAREstimator(
    freq=dataset.metadata.freq,
    prediction_length=prediction_length,
    num_feat_static_cat=len(dataset.metadata.feat_static_cat),
    cardinality=[int(f.cardinality) for f in dataset.metadata.feat_static_cat],
)

In [None]:
deepar_predictor = deepar_estimator.train(dataset.train, cache_data=True)

In [None]:
deepar_predictor

We can now do backtesting on the test dataset: in what follows, `make_evaluation_predictions` will slice out the trailing `prediction_length` observations from the test time series, and use the given predictor to obtain forecasts for the same time range.

In [None]:
forecast_it, ts_it = make_evaluation_predictions(
    dataset=dataset.test,
    predictor=deepar_predictor,
)

forecasts_deepar = list(forecast_it)
tss_deepar = list(ts_it)

In [None]:
plt.figure(figsize=(20, 15))
date_formatter = matplotlib.dates.DateFormatter('%b, %d')
plt.rcParams.update({'font.size': 15})

for idx, (forecast, ts) in islice(enumerate(zip(forecasts_deepar, tss_deepar)), 9):
    ax = plt.subplot(3, 3, idx+1)
    
    plt.plot(ts[-5 * prediction_length:], label="target")
    forecast.plot(color="g")
    plt.xticks(rotation=60)
    ax.xaxis.set_major_formatter(date_formatter)
    
plt.gcf().tight_layout()
plt.legend()
plt.show()

In [None]:
aggregate_metrics_deepar, entrywise_metrics_deepar = evaluator(tss_deepar, forecasts_deepar)

In [None]:
pd.DataFrame.from_records(
    [aggregate_metrics_naive, aggregate_metrics_deepar],
    index=["Naive seasonal", "DeepAR"]
).transpose()

## 3. Implementing a PyTorch model: a probabilistic feed-forward network

In [None]:
import torch
import torch.nn as nn
import pytorch_lightning as pl

We will use a pretty simple model, based on a feed-forward network whose output layer produces the parameters of a Student's t-distribution at each time step in the prediction range.

<img src="figures/feedforward.png" style="width: 300px; margin-left: auto; margin-right: auto;">

In [None]:
from gluonts.torch.modules.distribution_output import StudentTOutput

In [None]:
def mean_abs_scaling(seq: torch.Tensor, min_scale=1e-5):
    return seq.abs().mean(1).clamp(min_scale, None).unsqueeze(1)

In [None]:
class FeedForwardModel(nn.Module):
    def __init__(
        self,
        freq: str,
        prediction_length: int,
        context_length: int,
        hidden_dimensions: List[int],
        distr_output=StudentTOutput(),
        batch_norm: bool = False,
        scaling: Callable = mean_abs_scaling,
    ) -> None:
        super().__init__()

        assert prediction_length > 0
        assert context_length > 0
        assert len(hidden_dimensions) > 0

        self.freq = freq
        self.prediction_length = prediction_length
        self.context_length = context_length
        self.hidden_dimensions = hidden_dimensions
        self.distr_output = distr_output
        self.batch_norm = batch_norm
        self.scaling = scaling

        dimensions = [2 * context_length] + hidden_dimensions[:-1]

        modules = []
        for in_size, out_size in zip(dimensions[:-1], dimensions[1:]):
            modules += [self.__make_lin(in_size, out_size), nn.ReLU()]
            if batch_norm:
                modules.append(nn.BatchNorm1d(out_size))
        modules.append(
            self.__make_lin(
                dimensions[-1], prediction_length * hidden_dimensions[-1]
            )
        )

        self.nn = nn.Sequential(*modules)
        self.args_proj = self.distr_output.get_args_proj(hidden_dimensions[-1])

    @staticmethod
    def __make_lin(dim_in, dim_out):
        lin = nn.Linear(dim_in, dim_out)
        torch.nn.init.uniform_(lin.weight, -0.07, 0.07)
        torch.nn.init.zeros_(lin.bias)
        return lin

    def forward(self, context: torch.Tensor, context_observed: torch.Tensor) -> Tuple[Tuple[torch.Tensor, ...], torch.Tensor, torch.Tensor]:
        assert context.shape == context_observed.shape
        
        scale = self.scaling(context)
        scaled_context = context / scale
        nn_in = torch.cat((scaled_context, context_observed), dim=-1)
        nn_out = self.nn(nn_in)
        nn_out_reshaped = nn_out.reshape(
            -1, self.prediction_length, self.hidden_dimensions[-1]
        )
        distr_args = self.args_proj(nn_out_reshaped)

        return distr_args, torch.zeros_like(scale), scale

Note: all inputs and outputs of the `forward` method will have shape `batch_size x context_length`

We can now instantiate the training network, and explore its set of parameters.

In [None]:
freq = dataset.metadata.freq
prediction_length = dataset.metadata.prediction_length
context_length = 2 * 7 * 24
hidden_dimensions = [96, 48]

In [None]:
model = FeedForwardModel(
    freq=freq,
    prediction_length=prediction_length,
    context_length=context_length,
    hidden_dimensions=hidden_dimensions,
)

In [None]:
sum(np.prod(p.shape) for p in model.parameters())

In [None]:
for p in model.parameters():
    print(p.shape)

Now we must specify how to *train* the model, i.e. how the loss will be computed to perform training via SGD-like algorithms. One way to do this, in the PyTorch ecosystem, is to wrap the model within a `LightningModule` from the PyTorch Lightning library:

In [None]:
class FeedForwardLightningModule(pl.LightningModule):
    def __init__(self, model: FeedForwardModel):
        super().__init__()
        self.save_hyperparameters()
        self.model = model

    def training_step(self, batch: Dict[str, Any], batch_idx: int):
        context = batch["past_target"]
        context_observed = batch["past_observed_values"]
        target = batch["future_target"]

        assert context.shape[-1] == self.model.context_length
        assert target.shape[-1] == self.model.prediction_length

        distr_args, loc, scale = self.model(context, context_observed)
        distr = self.model.distr_output.distribution(distr_args, loc, scale)
        loss = -distr.log_prob(target)

        return loss.mean()

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        return optimizer

In [None]:
training_module = FeedForwardLightningModule(model)

### Defining the training data loader

We now set up the data loader which will yield batches of data to train on. Starting from the original dataset, the data loader is configured to apply the following transformation, which does essentially two things:
* Replaces `nan`s in the target field with a dummy value (zero), and adds a field indicating which values were actually observed vs imputed this way.
* Slices out training instances of a fixed length randomly from the given dataset; these will be stacked into batches by the data loader itself.

In [None]:
from gluonts.transform import Chain, AddObservedValuesIndicator, InstanceSplitter, ExpectedNumInstanceSampler

In [None]:
transformation = AddObservedValuesIndicator(
    target_field="target",
    output_field="observed_values",
)

In [None]:
transformed_data = transformation.apply(dataset.train)

In [None]:
pprint(next(iter(transformed_data)))

In [None]:
from gluonts.transform import InstanceSplitter, SelectFields

<img src="figures/training_data_iter.png" style="width: 800px; margin-left: auto; margin-right: auto;">

In [None]:
splitter = InstanceSplitter(
    start_field="start",
    target_field="target",
    is_pad_field="is_pad",
    forecast_start_field="forecast_start",
    instance_sampler=ExpectedNumInstanceSampler(
        num_instances=1,
        min_future=prediction_length,
    ),
    past_length=context_length,
    future_length=prediction_length,
    time_series_fields=["observed_values"],
)

In [None]:
select_fields = SelectFields(["past_target", "past_observed_values", "future_target", "future_observed_values", "item_id"])

In [None]:
from gluonts.itertools import Cyclic

In [None]:
training_instances = (splitter + select_fields).apply(Cyclic(transformed_data))

In [None]:
for instance in training_instances:
    pprint(instance)
    break

In [None]:
from gluonts.itertools import IterableSlice
from gluonts.torch.util import IterableDataset
from torch.utils.data import DataLoader

In [None]:
training_batches = IterableSlice(
    iter(DataLoader(IterableDataset(training_instances), batch_size=32)),
    50,
)

In [None]:
batch = next(iter(training_batches))

In [None]:
batch.keys()

In [None]:
batch["past_target"]

In [None]:
batch["past_target"].shape

In [None]:
batch["past_observed_values"]

In [None]:
batch["past_observed_values"].shape

In [None]:
batch["future_target"].shape

In [None]:
batch["future_observed_values"].shape

Let's see what the batches in the stream are composed of:

In [None]:
batch_ids = []
for epoch_no in range(10):
    for batch in training_batches:
        batch_ids.append(batch["item_id"])

In [None]:
len(batch_ids)

In [None]:
count_per_batch = np.zeros(shape=(len(dataset.train), len(batch_ids)))
for k, ids in enumerate(batch_ids):
    for id in ids:
        count_per_batch[id, k] = 1

In [None]:
import seaborn as sns

In [None]:
plt.figure(figsize=(20, 10))
sns.heatmap(count_per_batch)
plt.title("Training batches")
plt.xlabel("Batch no.")
plt.ylabel("Item ID")
plt.show()

### Train the model

We can now train the model using any of the available optimizers from PyTorch:

In [None]:
trainer = pl.Trainer(max_epochs=20)

In [None]:
trainer.fit(training_module, train_dataloader=training_batches)

### Create predictor out of the trained model, and test it

In [None]:
from gluonts.torch.model.predictor import PyTorchPredictor
from gluonts.model.forecast_generator import DistributionForecastGenerator
from gluonts.transform import TestSplitSampler

In [None]:
def get_predictor(input_transform, model, batch_size=32):
    splitter = InstanceSplitter(
        start_field="start",
        target_field="target",
        is_pad_field="is_pad",
        forecast_start_field="forecast_start",
        instance_sampler=TestSplitSampler(),
        past_length=context_length,
        future_length=prediction_length,
        time_series_fields=["observed_values"],
    )
    return PyTorchPredictor(
        freq=model.freq,
        prediction_length=model.prediction_length,
        input_names=["past_target", "past_observed_values"],
        prediction_net=model,
        batch_size=batch_size,
        input_transform=input_transform + splitter,
        forecast_generator=DistributionForecastGenerator(model.distr_output),
    )

In [None]:
predictor = get_predictor(input_transform=transformation, model=model)

<img src="figures/prediction_data_iter.png" style="width: 800px; margin-left: auto; margin-right: auto;">

In [None]:
forecast_it, ts_it = make_evaluation_predictions(
    dataset=dataset.test,
    predictor=predictor,
    num_samples=100,
)

forecasts_feedforward = list(f.to_sample_forecast() for f in forecast_it)
tss_feedforward = list(ts_it)

Once we have the forecasts, we can plot them:

In [None]:
plt.figure(figsize=(20, 15))
date_formatter = matplotlib.dates.DateFormatter('%b, %d')
plt.rcParams.update({'font.size': 15})

for idx, (forecast, ts) in islice(enumerate(zip(forecasts_feedforward, tss_feedforward)), 9):
    ax = plt.subplot(3, 3, idx+1)
    plt.plot(ts[-5 * prediction_length:], label="target")
    forecast.plot(color="g")
    plt.xticks(rotation=60)
    ax.xaxis.set_major_formatter(date_formatter)
    
plt.gcf().tight_layout()
plt.legend()
plt.show()

And we can compute evaluation metrics, that summarize the performance of the model on our test data.

In [None]:
aggregate_metrics_feedforward, entrywise_metrics_feedforward = evaluator(tss_feedforward, forecasts_feedforward)

In [None]:
pd.DataFrame.from_records(
    [aggregate_metrics_naive, aggregate_metrics_deepar, aggregate_metrics_feedforward],
    index=["Naive seasonal", "DeepAR", "Feed-forward"]
).transpose()

## 5. Additional features

Several additional features in GluonTS we did not cover here:

* Models (mainly implemented in MXNet):
    * Wavenet [van den Oord et al., 2016]
    * Deep State Space Models [Rangapuram et al., 2018]
    * Deep Factors [Wang et al. 2019]
    * MQ-RNN/CNN [Wen et al., 2017]
    * Attention-based models (transformers)
    * Gaussian processes, Temporal point processes, ...
    * **Multivariate** models
* Serialization/deserialization of models
* Helpers to train & deploy models in the cloud, using Amazon SageMaker