使用协变量的TimesFM

这个教程笔记演示了在进行预测时如何利用外部协变量与TimesFM配合使用。在运行这个笔记本之前，请确保：

- 您已经阅读了TimesFM的README。
- 本地内核使用Python 3.10正在运行。

设置环境并安装 TimesFM。

In [None]:
import os
os.environ['XLA_PYTHON_CLIENT_PREALLOCATE'] = 'false'
os.environ['JAX_PMAP_USE_TENSORSTORE'] = 'false'

In [None]:
!pip install timesfm
import timesfm

加载检查点

注意：请根据您的设备配置后端（“cpu”、“gpu”或“tpu”）。此笔记本默认在CPU上运行。

我们从HuggingFace加载1.0-200m模型的检查点。

In [None]:
timesfm_backend = "cpu"  # @param

from jax._src import config
config.update(
    "jax_platforms", {"cpu": "cpu", "gpu": "cuda", "tpu": ""}[timesfm_backend]
)

model = timesfm.TimesFm(
    context_len=512,
    horizon_len=128,
    input_patch_len=32,
    output_patch_len=128,
    num_layers=20,
    model_dims=1280,
    backend=timesfm_backend,
)
model.load_from_checkpoint(repo_id="google/timesfm-1.0-200m")

# 协变量

让我们以一个杂货店销售预测的玩具例子来说明：

**任务：** 给定本周（7天）观察到的每日销售额，预测下周（7天）的每日销售额。

```
产品：冰淇淋
每日销售额：[30, 30, 4, 5, 7, 8, 10]
类别：食品
基准价格：1.99
星期几：[0, 1, 2, 3, 4, 5, 6, 0, 1, 2, 3, 4, 5, 6]
是否促销：[是，是，否，否，否，是，是，否，否，否，否，否，否，否]
每日温度：[31.0, 24.3, 19.4, 26.2, 24.6, 30.0, 31.1, 32.4, 30.9, 26.0, 25.0, 27.8, 29.5, 31.2]
```

```
产品：防晒霜
每日销售额：[5, 7, 12, 13, 5, 6, 10]
类别：皮肤产品
基准价格：29.99
星期几：[0, 1, 2, 3, 4, 5, 6, 0, 1, 2, 3, 4, 5, 6]
是否促销：[否，否，是，是，否，否，否，是，是，是，是，是，是，是]
每日温度：[31.0, 24.3, 19.4, 26.2, 24.6, 30.0, 31.1, 32.4, 30.9, 26.0, 25.0, 27.8, 29.5, 31.2]
```

在这个例子中，除了`每日销售额`之外，我们还有协变量`类别`，`基准价格`，`星期几`，`是否促销`，`每日温度`。让我们介绍一些概念：

**静态协变量**是每个时间序列的协变量。
- 在我们的例子中，`类别`是一个**静态分类协变量**，
- `基准价格`是一个**静态数值协变量**。

**动态协变量**是每个时间戳的协变量。
- 日期/时间相关特征通常可以视为动态协变量。
- 在我们的例子中，`星期几`和`是否促销`是**动态分类协变量**。
- `每日温度`是一个**动态数值协变量**。

**注意：** 这里我们强制要求动态协变量需要涵盖预测的上下文和时间跨度。例如，在这个例子中，所有动态协变量有14个值：前7个对应观察到的7天，后7个对应接下来的7天。

# 带协变量的TimesFM

我们在这里采取的策略是将协变量视为分组的上下文外生回归因子（XReg），并在TimesFM之外拟合线性模型。最终的预测将是TimesFM预测和线性模型预测之和。

简单来说，我们考虑了这两种选择。

**选项1：** 获取TimesFM预测，并拟合将残差回归到协变量上的线性模型（“timesfm + xreg”）。

**选项2：** 将时间序列本身的线性模型拟合到协变量上，然后使用TimesFM预测残差（“xreg + timesfm”）。

让我们以电价预测（EPF）为例。

In [None]:
import pandas as pd
import numpy as np
from collections import defaultdict

In [None]:
df = pd.read_csv('https://datasets-nixtla.s3.amazonaws.com/EPF_FR_BE.csv')
df['ds'] = pd.to_datetime(df['ds'])
df

这个数据集除了小时目标`y`之外还有一些协变量：

- `unique_id`：一个静态的分类变量，表示国家。
- `gen_forecast`：一个动态的数值协变量，表示预计要生成的电力量。
- `system_load`：观察到的系统负载。请注意，这**不能**被视为动态数值协变量，因为我们无法提前知道其在预测时间范围内的值。
- `weekday`：一个动态的分类协变量。

现在让我们基于这个数据集为TimesFM进行一些预测任务。为简单起见，我们创建预测上下文为120个时间点（小时）和预测视野为24个时间点。

In [None]:
# Data pipelining
def get_batched_data_fn(
    batch_size: int = 128, 
    context_len: int = 120, 
    horizon_len: int = 24,
):
  examples = defaultdict(list)

  num_examples = 0
  for country in ("FR", "BE"):
    sub_df = df[df["unique_id"] == country]
    for start in range(0, len(sub_df) - (context_len + horizon_len), horizon_len):
      num_examples += 1
      examples["country"].append(country)
      examples["inputs"].append(sub_df["y"][start:(context_end := start + context_len)].tolist())
      examples["gen_forecast"].append(sub_df["gen_forecast"][start:context_end + horizon_len].tolist())
      examples["week_day"].append(sub_df["week_day"][start:context_end + horizon_len].tolist())
      examples["outputs"].append(sub_df["y"][context_end:(context_end + horizon_len)].tolist())
  
  def data_fn():
    for i in range(1 + (num_examples - 1) // batch_size):
      yield {k: v[(i * batch_size) : ((i + 1) * batch_size)] for k, v in examples.items()}
  
  return data_fn


In [None]:
# Define metrics
def mse(y_pred, y_true):
  y_pred = np.array(y_pred)
  y_true = np.array(y_true)
  return np.mean(np.square(y_pred - y_true), axis=1, keepdims=True)

def mae(y_pred, y_true):
  y_pred = np.array(y_pred)
  y_true = np.array(y_true)
  return np.mean(np.abs(y_pred - y_true), axis=1, keepdims=True)


现在让我们尝试`model.forecast_with_covariates`。

具体来说，输出是一个元组，第一个元素是新的预测结果。

In [None]:
# Benchmark
batch_size = 128
context_len = 120
horizon_len = 24
input_data = get_batched_data_fn(batch_size = 128)
metrics = defaultdict(list)
import time

for i, example in enumerate(input_data()):
  raw_forecast, _ = model.forecast(
      inputs=example["inputs"], freq=[0] * len(example["inputs"])
  )
  start_time = time.time()
  # Forecast with covariates
  # Output: new forecast, forecast by the xreg
  cov_forecast, ols_forecast = model.forecast_with_covariates(  
      inputs=example["inputs"],
      dynamic_numerical_covariates={
          "gen_forecast": example["gen_forecast"],
      },
      dynamic_categorical_covariates={
          "week_day": example["week_day"],
      },
      static_numerical_covariates={},
      static_categorical_covariates={
          "country": example["country"]
      },
      freq=[0] * len(example["inputs"]),
      xreg_mode="xreg + timesfm",              # default
      ridge=0.0,
      force_on_cpu=False,
      normalize_xreg_target_per_input=True,    # default
  )
  print(
      f"\rFinished batch {i} linear in {time.time() - start_time} seconds",
      end="",
  )
  metrics["eval_mae_timesfm"].extend(
      mae(raw_forecast[:, :horizon_len], example["outputs"])
  )
  metrics["eval_mae_xreg_timesfm"].extend(mae(cov_forecast, example["outputs"]))
  metrics["eval_mae_xreg"].extend(mae(ols_forecast, example["outputs"]))
  metrics["eval_mse_timesfm"].extend(
      mse(raw_forecast[:, :horizon_len], example["outputs"])
  )
  metrics["eval_mse_xreg_timesfm"].extend(mse(cov_forecast, example["outputs"]))
  metrics["eval_mse_xreg"].extend(mse(ols_forecast, example["outputs"]))

print()

for k, v in metrics.items():
  print(f"{k}: {np.mean(v)}")

# My output:
# eval_mae_timesfm: 6.762283045916956
# eval_mae_xreg_timesfm: 5.39219617611074
# eval_mae_xreg: 37.15275842572484
# eval_mse_timesfm: 166.7771466306823
# eval_mse_xreg_timesfm: 120.64757721021306
# eval_mse_xreg: 1672.2116821201796

你应该看到接近以下结果：
```
eval_mae_timesfm: 6.762283045916956
eval_mae_xreg_timesfm: 5.39219617611074
eval_mae_xreg: 37.15275842572484
eval_mse_timesfm: 166.7771466306823
eval_mse_xreg_timesfm: 120.64757721021306
eval_mse_xreg: 1672.2116821201796
```

在加入协变量的情况下，TimesFM 预测的平均绝对误差从6.76降至5.39，均方误差从166.78降至120.65。提供纯拟合线性模型的结果供参考。

## 格式化您的请求

将协变量正确格式化非常关键，这样我们才能调用`model.forecast_with_covariates`。请查看其文档字符串获取更多细节。在这里，让我们还从一个玩具数据输入管道中获取一个批次，以便快速解释。

In [None]:
toy_input_pipeline = get_batched_data_fn(batch_size=2, context_len=5, horizon_len=2)
print(next(toy_input_pipeline()))


你应该看到类似于以下内容的东西
```
{
    'country': ['FR', 'FR'], 
    'inputs': [[53.48, 51.93, 48.76, 42.27, 38.41], [48.76, 42.27, 38.41, 35.72, 32.66]], 
    'gen_forecast': [[76905.0, 75492.0, 74394.0, 72639.0, 69347.0, 67960.0, 67564.0], [74394.0, 72639.0, 69347.0, 67960.0, 67564.0, 67277.0, 67019.0]], 
    'week_day': [[3, 3, 3, 3, 3, 3, 3], [3, 3, 3, 3, 3, 3, 3]], 
    'outputs': [[35.72, 32.66], [32.83, 30.06]],
}
```

注意:
- 在这个批次中我们有两个示例。
- 对于每个示例，我们支持不同的上下文长度和预测长度，就像`model.forecast`一样。尽管在这个数据集中没有展示出来。
- 如果存在动态协变量，预测长度将从中推断出来，例如提供了多少个值作为输入之外的附加值。请确保你的每个示例的动态协变量长度相同。
- 静态协变量每个示例一个。

更多应用

过去的动态协变量是仅在上下文中可用的协变量。例如，在我们的示例中，`system_load` 是一个过去的动态协变量。时间序列模型通常可以处理这种情况，但是批处理上下文回归不能解决这个问题，因为这些回归变量在未来是不可用的。如果您确实拥有这些协变量并认为它们非常有意义，那么有两个简单粗暴的尝试选项：

1. 将这些过去的动态协变量进行位移和重复，以使用它们的延迟版本。例如，如果您认为本周的`system_load`对于预测下周是有意义的，您可以通过将时间戳位移 7 个单位来创建一个`delay_7_system_load`，并将其用作 TimesFM 的一个动态数值协变量。
2. Bootstrap，即运行一次 TimesFM 来预测这些过去的动态协变量到未来，然后再次调用 TimesFM 使用这些预测值作为这些动态协变量的未来部分。

多变量时间序列

对于多变量时间序列，如果我们需要单变量预测，我们可以尝试将主时间序列视为目标，将其余部分作为动态协变量。