In [None]:
%matplotlib inline

In [None]:
import torch
import os

import pandas as pd
import numpy as np

from matplotlib import pyplot as plt

from torchcast.state_space import Predictions
from torchcast.utils.datasets import load_air_quality_data
from torchcast.kalman_filter import KalmanFilter
from torchcast.utils.data import TimeSeriesDataset

from plotnine import facet_wrap

np.random.seed(2025-3-12)
torch.manual_seed(2025-3-12)

# Multivariate Forecasts: Beijing Multi-Site Air-Quality Data

We'll demonstrate several features of `torchcast` using a dataset from the [UCI Machine Learning Data Repository](https://archive.ics.uci.edu/dataset/501/beijing+multi+site+air+quality+data). It includes data on air pollutants and weather from 12 sites.

In [None]:
df_aq = load_air_quality_data('weekly')

SPLIT_DT = np.datetime64('2016-02-22')
df_aq['dataset'] = np.where(df_aq['week'] > SPLIT_DT, 'val', 'train')
df_aq

### Univariate Forecasts

Let's try to build a model to predict total particulate-matter (PM2.5 and PM10). 

First, we'll make our target the sum of these two types. We'll log-transform since this is strictly positive.

In [None]:
from torchcast.process import LocalTrend, Season

# dataset:
df_aq['PM'] = df_aq['PM10'] + df_aq['PM2p5'] 
df_aq['PM_log10'] = np.log10(df_aq['PM']) 
dataset_pm = TimeSeriesDataset.from_dataframe(
    dataframe=df_aq,
    dt_unit='W',
    measure_colnames=['PM_log10'],
    group_colname='station', 
    time_colname='week'
)
dataset_pm_train, _ = dataset_pm.train_val_split(dt=SPLIT_DT)

# model:
kf_pm = KalmanFilter(
    measures=['PM_log10'], 
    processes=[
        LocalTrend(id='trend'),
        Season(
            id='day_in_year', 
            period=365.25 / 7, 
            dt_unit='W', 
            K=5,
            fixed=True
        )
    ]
)

# fit:
kf_pm.fit(
    dataset_pm_train.tensors[0],
    start_offsets=dataset_pm_train.start_datetimes
)

Let's see how our forecasts look:

In [None]:
# helper for transforming log back to original:
def inverse_transform(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    # bias-correction for log-transform (see https://otexts.com/fpp2/transformations.html#bias-adjustments)
    df['mean'] = df['mean'] + .5 * df['std'] ** 2
    df['lower'] = df['mean'] - 1.96 * df['std']
    df['upper'] = df['mean'] + 1.96 * df['std']
    # inverse the log10:
    df[['actual', 'mean', 'upper', 'lower']] = 10 ** df[['actual', 'mean', 'upper', 'lower']]
    df['measure'] = df['measure'].str.replace('_log10', '')
    return df

# generate forecasts:
forecast = kf_pm(
        dataset_pm_train.tensors[0],
        start_offsets=dataset_pm_train.start_datetimes,
        out_timesteps=dataset_pm.tensors[0].shape[1]
)

df_uv_pred = (forecast
             .set_metadata(group_colname='station', time_colname='week')
             .to_dataframe(dataset_pm, conf=None)
             .pipe(inverse_transform))
forecast.plot(
    df_uv_pred, 
    max_num_groups=2, 
    split_dt=SPLIT_DT, 
    figure_size=(6,5)
)

### Multivariate Forecasts

Can we improve our model by splitting the pollutant we are forecasting into its two types (2.5 and 10), and modeling them in a multivariate manner?

In [None]:
# create a dataset:
df_aq['PM10_log10'] = np.log10(df_aq['PM10'])
df_aq['PM2p5_log10'] = np.log10(df_aq['PM2p5'])
dataset_pm_multivariate = TimeSeriesDataset.from_dataframe(
    dataframe=df_aq,
    dt_unit='W',
    measure_colnames=['PM10_log10','PM2p5_log10'],
    group_colname='station', 
    time_colname='week'
)
dataset_pm_multivariate_train, _ = dataset_pm_multivariate.train_val_split(dt=SPLIT_DT)

# create a model:
_processes = []
for m in dataset_pm_multivariate.measures[0]:
    _processes.extend([
        LocalTrend(id=f'{m}_trend', measure=m),
        Season(id=f'{m}_day_in_year', period=365.25 / 7, dt_unit='W', K=5, measure=m, fixed=True)
    ])
kf_pm_multivariate = KalmanFilter(measures=dataset_pm_multivariate.measures[0], processes=_processes)

# fit:
kf_pm_multivariate.fit(
    dataset_pm_multivariate_train.tensors[0],
    start_offsets=dataset_pm_multivariate_train.start_datetimes
)

We can generate our one-month-ahead predictions for validation as we did before:

In [None]:
with torch.no_grad():
    forecast_mv = kf_pm_multivariate(
        dataset_pm_multivariate_train.tensors[0],
        start_offsets=dataset_pm_multivariate_train.start_datetimes,
        out_timesteps=dataset_pm_multivariate.num_timesteps # <--- how we ask it to forecast past original times
    )
forecast_mv.means.shape

At this point, though, we run into a problem: we we have forecasts for both PM2.5 and PM10, but we ultimately want a forecast for their *sum*. With untransformed data, we could take advantage of the fact that [sum of correlated normals is still normal](https://en.wikipedia.org/wiki/Sum_of_normally_distributed_random_variables#Correlated_random_variables); but we have log-transformed our measures. This seems like it was the right choice (i.e. our residuals look reasonably normal and i.i.d):

In [None]:
forecast_mv.plot(
    forecast_mv.to_dataframe(dataset_pm_multivariate, type='components').query("process=='residuals'"),
    time_colname='time', group_colname='group',
    figure_size=(6,5)
) + facet_wrap('measure', ncol=1)

In this case, we can't take the sum of our forecasts to get the forecast of the sum, and [there's no simple closed-form expression for the sum of lognormals](https://scholar.google.com/scholar?hl=en&as_sdt=0%2C14&q=SUMS+OF+LOGNORMALS&btnG=).

One option that is fairly easy in `torchcast` is to use a [Monte-Carlo](https://en.wikipedia.org/wiki/Monte_Carlo_method) approach: we'll just generate random-samples based on the means and covariances underlying our forecast. In that case, the sum of the PM2.5 + PM10 forecasted-samples *is* the forecasted PM sum we are looking for:

In [None]:
def mc_preds_to_dataframe(preds: Predictions,
                          dataset: TimeSeriesDataset,
                          inverse_transform_fun: callable,
                          num_draws: int = 1000,
                          **kwargs) -> pd.DataFrame:
    """
    Our predictions are on the transformed scale, and we'd like to sum across measures on the original scale;
    this function uses a monte-carlo approach to do this.
    """
    # generate draws from the forecast distribution, apply inverse-transform:
    mc_draws = inverse_transform_fun(torch.distributions.MultivariateNormal(*preds).rsample((num_draws,)))
    # sum across measures (e.g. 2.5 and 10), then mean across draws:
    mc_predictions = mc_draws.sum(-1, keepdim=True).mean(0)
    # convert to a dataframe
    return TimeSeriesDataset.tensor_to_dataframe(
        mc_predictions,
        times=dataset.times(),
        group_names=dataset.group_names,
        measures=['predicted'],
        **kwargs
    )

In [None]:
df_mv_pred = mc_preds_to_dataframe(
    forecast_mv,
    dataset_pm_multivariate,
    inverse_transform_fun=lambda x: 10 ** x,
    group_colname='station',
    time_colname='week'
)
df_mv_pred

### Evaluating Performance

In [None]:
def get_station_error(df: pd.DataFrame, pred_colname: str = 'mean', actual_colname: str = 'actual') -> pd.DataFrame:
    return (df
             .assign(sq_error=lambda df: (df[pred_colname] - df[actual_colname]) ** 2)
             .groupby(['station'])
             ['sq_error'].mean()
             .reset_index()
             .assign(rmse= lambda df: df['sq_error'] ** .5))

def agg_within_group_err(df: pd.DataFrame, error_col : str = 'rmse', group_col: str = 'station') -> pd.DataFrame:
    df = df.copy(deep=False)
    df['error_norm'] = df[error_col] - df.groupby(group_col)[error_col].transform('mean')
    return df.groupby('model').agg(**{error_col : (error_col, 'mean'), 'sem' : ('error_norm', 'sem')})

In [None]:
df_fct_err = pd.concat([
            df_uv_pred
             .loc[lambda df: df['week'] > SPLIT_DT,:]
             .pipe(get_station_error)
             .assign(model='univariate')
            ,
            df_mv_pred
             .loc[lambda df: df['week'] > SPLIT_DT,:]
             .merge(df_aq.loc[:, ['station', 'week', 'PM']])
             .pipe(get_station_error, pred_colname='predicted', actual_colname='PM')
             .assign(model='multivariate')
]).reset_index(drop=True)
df_fct_err.sort_values('station').tail(6)

We see that this approach has reduced our error substantially in the validation period (though at the cost of slightly increasing it in the training period). We can look at the per-site differences to reduce noise:

In [None]:
df_fct_err_agg = agg_within_group_err(df_fct_err)
# https://chris-said.io/2014/12/01/independent-t-tests-and-the-83-confidence-interval-a-useful-trick-for-eyeballing-your-data/
_multi = -stats.norm.ppf((1 - .83) / 2)

df_fct_err_agg['rmse'].plot(
    kind='bar', 
    yerr=df_fct_err_agg['sem'] * _multi, 
    ylabel="Root Sq. Err\n(lower is better)", 
    title="Forecasting Error",
    ylim=(77,83)
)
plt.show()

### Backtesting

In [None]:
with torch.no_grad():
    one_mo_uv = kf_pm(
        dataset_pm.tensors[0],
        start_offsets=dataset_pm.start_datetimes,
        n_step=4
    )
    one_mo_mv = kf_pm_multivariate(
        dataset_pm_multivariate.tensors[0],
        start_offsets=dataset_pm_multivariate.start_datetimes,
        n_step=4
    )
    
df_backtest_err1mo = pd.concat([

    one_mo_uv
    .to_dataframe(dataset_pm, time_colname='week', group_colname='station', conf=None)
    .pipe(inverse_transform)
    .loc[lambda df: df['week'] > SPLIT_DT, :]
    .pipe(get_station_error)
    .assign(model='univariate')
    ,

    mc_preds_to_dataframe(one_mo_mv,
                          dataset_pm_multivariate,
                          inverse_transform_fun=lambda x: 10 ** x,
                          group_colname='station',
                          time_colname='week')
    .loc[lambda df: df['week'] > SPLIT_DT,:]
    .merge(df_aq[['station', 'week', 'PM']])
    .pipe(get_station_error, pred_colname='predicted', actual_colname='PM')
    .assign(model='multivariate')
])

df_backtest1mo_err_agg = agg_within_group_err(df_backtest_err1mo)

df_backtest1mo_err_agg['rmse'].plot(
    kind='bar', 
    yerr=df_backtest1mo_err_agg['sem'], 
    ylabel="Root Sq. Err\n(lower is better)", 
    title="Backtesting (1-Month-Ahead) Error",
    ylim=(77,83)
)
plt.show()

### Adding Predictors

In many settings we have external (a.k.a. _exogenous_) predictors we'd like to incorporate. Here we'll use four predictors corresponding to weather conditions. Of course, in a forecasting context, we run into the problem of needing to fill in values for these predictors for future dates. For an arbitrary forecast horizon this can be a complex issue; for simplicity here we'll focus on the 4-week-ahead predictions we used above, and simply lag our weather predictors by 4.

In [None]:
from torchcast.process import LinearModel

# prepare external predictors:
predictors_raw = ['TEMP', 'PRES', 'DEWP']
predictors = [p.lower() + '_lag4' for p in predictors_raw]
# standardize:
predictor_means = df_aq.query("dataset=='train'")[predictors_raw].mean()
predictor_stds = df_aq.query("dataset=='train'")[predictors_raw].std()
df_aq[predictors] = (df_aq[predictors_raw] - predictor_means) / predictor_stds
# lag:
df_aq[predictors] = df_aq.groupby('station')[predictors].shift(4, fill_value=0)

# create dataset:
dataset_pm_lm = TimeSeriesDataset.from_dataframe(
    dataframe=df_aq,
    dt_unit='W',
    y_colnames=['PM10_log10','PM2p5_log10'],
    X_colnames=predictors,
    group_colname='station', 
    time_colname='week',
)
dataset_pm_lm_train, _ = dataset_pm_lm.train_val_split(dt=SPLIT_DT)
dataset_pm_lm_train

In [None]:
# create a model:
_processes = []
for m in dataset_pm_lm.measures[0]:
    _processes.extend([
        LocalTrend(id=f'{m}_trend', measure=m),
        Season(id=f'{m}_day_in_year', period=365.25 / 7, dt_unit='W', K=5, measure=m, fixed=True),
        LinearModel(id=f'{m}_lm', predictors=predictors, measure=m)
    ])
kf_pm_lm = KalmanFilter(measures=dataset_pm_lm.measures[0], processes=_processes)

# fit:
y, X = dataset_pm_lm_train.tensors
kf_pm_lm.fit(
    y,
    X=X, # if you want to supply different predictors to different processes, you can use `{process_name}__X`
    start_offsets=dataset_pm_lm_train.start_datetimes
)

Here we show how to inspect the influence of each predictor:

In [None]:
# inspect components:
with torch.no_grad():
    y, X = dataset_pm_lm.tensors
    one_mo_lm = kf_pm_lm(
        y,
        X=X,
        start_offsets=dataset_pm_lm.start_datetimes,
        n_step=4
    )
one_mo_lm.plot(
    one_mo_lm
    .to_dataframe(dataset_pm_lm, type='components')
    .query("process.str.contains('lm')")
    .query("time > '2014-01-01'"),
    split_dt=SPLIT_DT, 
    figure_size=(6,5)
) + facet_wrap('measure', ncol=1)

Now let's look at the change in error from our earlier multivariate model vs. one that includes predictors:

In [None]:
df_backtest1mo_err_agg2 = agg_within_group_err(
    pd.concat([
        # old ones:
        df_backtest_err1mo,
        # new one:
        mc_preds_to_dataframe(one_mo_lm,
                              dataset_pm_lm,
                              inverse_transform_fun=lambda x: 10 ** x,
                              group_colname='station',
                              time_colname='week')
        .loc[lambda df: df['week'] > SPLIT_DT,:]
        .merge(df_aq[['station', 'week', 'PM']])
        .pipe(get_station_error, pred_colname='predicted', actual_colname='PM')
        .assign(model='mv_with_preds')
    ])
)

df_backtest1mo_err_agg2['rmse'].plot(
    kind='bar', 
    yerr=df_backtest1mo_err_agg2['sem'], 
    ylabel="Root Sq. Err\n(lower is better)", 
    title="Backtesting (1-Month-Ahead) Error",
    ylim=(77,83)
)
plt.show()