Skip to content

Commit

Permalink
backtest as Pipeline's method (#161)
Browse files Browse the repository at this point in the history
* Backtest start

* Backtest fix tests

* Add backtest utils

* Add deprecated decorator

* Add backtest to pipeline

* Revert "Add deprecated decorator"

This reverts commit e507ae7.

* Add docstring

* Upd CHANGELOG

* Make backtest work with pipelines

* Apply suggestions from code review

Co-authored-by: Martin Gabdushev <33594071+martins0n@users.noreply.github.com>

* Fix metric.class.name -> metric.repr in backtest metrics computation

* Add conftests for pipelines, ensembles

* Upd CHANGELOG

Co-authored-by: Martin Gabdushev <33594071+martins0n@users.noreply.github.com>
  • Loading branch information
julia-shenshina and martins0n committed Oct 12, 2021
1 parent f82f5e5 commit 9c37a03
Show file tree
Hide file tree
Showing 12 changed files with 654 additions and 128 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- TrendTransform ([#139](https://github.com/tinkoff-ai/etna-ts/pull/139))
- Running notebooks in ci ([#134](https://github.com/tinkoff-ai/etna-ts/issues/134))
- Cluster plotter to EDA ([#169](https://github.com/tinkoff-ai/etna-ts/pull/169))
- Pipeline.backtest method ([#161](https://github.com/tinkoff-ai/etna-ts/pull/161))
- STLTransform class ([#158](https://github.com/tinkoff-ai/etna-ts/pull/158))
- NN_examples notebook ([#159](https://github.com/tinkoff-ai/etna-ts/pull/159))

Expand All @@ -36,6 +37,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add Correlation heatmap in EDA notebook ([#144](https://github.com/tinkoff-ai/etna-ts/pull/144))
- Add `__repr__` for Pipeline ([#151](https://github.com/tinkoff-ai/etna-ts/pull/151))
- Defined random state for every test cases ([#155](https://github.com/tinkoff-ai/etna-ts/pull/155))
- TimeSeriesCrossValidation returns `Metric.__repr__` as a key in `backtest`'s return values ([#161](https://github.com/tinkoff-ai/etna-ts/pull/161))
- Add confidence intervals to Prophet ([#153](https://github.com/tinkoff-ai/etna-ts/pull/153))
- Add confidence intervals to SARIMA ([#172](https://github.com/tinkoff-ai/etna-ts/pull/172))

Expand Down
1 change: 1 addition & 0 deletions etna/metrics/__init__.py
Expand Up @@ -15,3 +15,4 @@
from etna.metrics.metrics import R2
from etna.metrics.metrics import SMAPE
from etna.metrics.metrics import MedAE
from etna.metrics.utils import compute_metrics
27 changes: 27 additions & 0 deletions etna/metrics/utils.py
@@ -0,0 +1,27 @@
from typing import Dict
from typing import List

from etna.datasets import TSDataset
from etna.metrics import Metric


def compute_metrics(metrics: List[Metric], y_true: TSDataset, y_pred: TSDataset) -> Dict[str, float]:
"""
Compute metrics for given y_true, y_pred.
Parameters
----------
metrics:
list of metrics to compute
y_true:
dataset of true values of time series
y_pred:
dataset of time series forecast
Returns
-------
dict of metrics in format {"metric_name": metric_value}
"""
metrics_values = {}
for metric in metrics:
metrics_values[metric.__repr__()] = metric(y_true=y_true, y_pred=y_pred)
return metrics_values
11 changes: 10 additions & 1 deletion etna/model_selection/backtest.py
@@ -1,3 +1,4 @@
import warnings
from copy import deepcopy
from enum import Enum
from typing import Any
Expand Down Expand Up @@ -182,7 +183,7 @@ def _compute_metrics(self, y_true: TSDataset, y_pred: TSDataset) -> Dict[str, fl
"""
metrics = {}
for metric in self.metrics:
metrics[metric.__class__.__name__] = metric(y_true=y_true, y_pred=y_pred)
metrics[metric.__repr__()] = metric(y_true=y_true, y_pred=y_pred)
return metrics

def get_forecasts(self) -> pd.DataFrame:
Expand Down Expand Up @@ -304,3 +305,11 @@ def backtest(
tslogger.finish_experiment()

return metrics_df, forecast_df, fold_info_df


warnings.warn(
"TimeSeriesCrossValidation is deprecated in etna==1.2.0, will be deleted in etna==1.4.0. "
"Use Pipeline.backtest method instead.",
DeprecationWarning,
stacklevel=3,
)
212 changes: 212 additions & 0 deletions etna/pipeline/pipeline.py
@@ -1,11 +1,33 @@
from copy import deepcopy
from enum import Enum
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple

import pandas as pd
from joblib import Parallel
from joblib import delayed

from etna.core import BaseMixin
from etna.datasets import TSDataset
from etna.loggers import tslogger
from etna.metrics import Metric
from etna.metrics import MetricAggregationMode
from etna.metrics.utils import compute_metrics
from etna.models.base import Model
from etna.transforms.base import Transform


class CrossValidationMode(Enum):
"""Enum for different cross-validation modes."""

expand = "expand"
constant = "constant"


class Pipeline(BaseMixin):
"""Pipeline of transforms with a final estimator."""

Expand Down Expand Up @@ -56,3 +78,193 @@ def forecast(self) -> TSDataset:
future = self.ts.make_future(self.horizon)
predictions = self.model.forecast(future)
return predictions

def _init_backtest(self):
self._folds: Optional[Dict[int, Any]] = None
self._fold_column = "fold_number"

@staticmethod
def _validate_backtest_n_folds(n_folds: int):
"""Check that given n_folds value is valid."""
if n_folds < 1:
raise ValueError(f"Folds number should be a positive number, {n_folds} given")

@staticmethod
def _validate_backtest_dataset(ts: TSDataset, n_folds: int, horizon: int):
"""Check that all the given timestamps have enough timestamp points to validate forecaster with given number of splits."""
min_required_length = horizon * n_folds
segments = set(ts.df.columns.get_level_values("segment"))
for segment in segments:
segment_target = ts[:, segment, "target"]
if len(segment_target) < min_required_length:
raise ValueError(
f"All the series from feature dataframe should contain at least "
f"{horizon} * {n_folds} = {min_required_length} timestamps; "
f"series {segment} does not."
)

@staticmethod
def _validate_backtest_metrics(metrics: List[Metric]):
"""Check that given metrics are valid for backtest."""
if not metrics:
raise ValueError("At least one metric required")
for metric in metrics:
if not metric.mode == MetricAggregationMode.per_segment:
raise ValueError(
f"All the metrics should be in {MetricAggregationMode.per_segment}, "
f"{metric.__class__.__name__} metric is in {metric.mode} mode"
)

@staticmethod
def _generate_folds_datasets(
ts: TSDataset, n_folds: int, horizon: int, mode: str = "expand"
) -> Tuple[TSDataset, TSDataset]:
"""Generate a sequence of train-test pairs according to timestamp."""
mode = CrossValidationMode[mode.lower()]
if mode == CrossValidationMode.expand:
constant_history_length = 0
elif mode == CrossValidationMode.constant:
constant_history_length = 1
else:
raise NotImplementedError(
f"Only '{CrossValidationMode.expand}' and '{CrossValidationMode.constant}' modes allowed"
)

timestamps = ts.index
min_timestamp_idx, max_timestamp_idx = 0, len(timestamps)
for offset in range(n_folds, 0, -1):
# if not self._constant_history_length, left border of train df is always equal to minimal timestamp value;
# it means that all the given data is used.
# if self._constant_history_length, left border of train df moves to one horizon steps on each split
min_train_idx = min_timestamp_idx + (n_folds - offset) * horizon * constant_history_length
max_train_idx = max_timestamp_idx - horizon * offset - 1
min_test_idx = max_train_idx + 1
max_test_idx = max_train_idx + horizon

min_train, max_train = timestamps[min_train_idx], timestamps[max_train_idx]
min_test, max_test = timestamps[min_test_idx], timestamps[max_test_idx]

train, test = ts.train_test_split(
train_start=min_train, train_end=max_train, test_start=min_test, test_end=max_test
)

yield train, test

def _run_fold(
self,
train: TSDataset,
test: TSDataset,
fold_number: int,
metrics: Optional[List[Metric]] = None,
) -> Dict[str, Any]:
"""Run fit-forecast pipeline of model for one fold."""
tslogger.start_experiment(job_type="crossval", group=str(fold_number))

pipeline = deepcopy(self)
pipeline.fit(ts=train)
forecast = pipeline.forecast()

fold = {}
for stage_name, stage_df in zip(("train", "test"), (train, test)):
fold[f"{stage_name}_timerange"] = {}
fold[f"{stage_name}_timerange"]["start"] = stage_df.index.min()
fold[f"{stage_name}_timerange"]["end"] = stage_df.index.max()
fold["forecast"] = forecast
fold["metrics"] = deepcopy(compute_metrics(metrics=metrics, y_true=test, y_pred=forecast))

tslogger.log_backtest_run(pd.DataFrame(fold["metrics"]), forecast.to_pandas(), test.to_pandas())
tslogger.finish_experiment()

return fold

def _get_backtest_metrics(self, aggregate_metrics: bool = False) -> pd.DataFrame:
"""Get dataframe with metrics."""
metrics_df = pd.DataFrame()

for i, fold in self._folds.items():
fold_metrics = pd.DataFrame(fold["metrics"]).reset_index().rename({"index": "segment"}, axis=1)
fold_metrics[self._fold_column] = i
metrics_df = metrics_df.append(fold_metrics)

metrics_df.sort_values(["segment", self._fold_column], inplace=True)

if aggregate_metrics:
metrics_df = metrics_df.groupby("segment").mean().reset_index().drop(self._fold_column, axis=1)

return metrics_df

def _get_fold_info(self) -> pd.DataFrame:
"""Get information about folds."""
timerange_df = pd.DataFrame()
for fold_number, fold_info in self._folds.items():
tmp_df = pd.DataFrame()
for stage_name in ("train", "test"):
for border in ("start", "end"):
tmp_df[f"{stage_name}_{border}_time"] = [fold_info[f"{stage_name}_timerange"][border]]
tmp_df[self._fold_column] = fold_number
timerange_df = timerange_df.append(tmp_df)
return timerange_df

def _get_backtest_forecasts(self) -> pd.DataFrame:
"""Get forecasts from different folds."""
stacked_forecast = pd.DataFrame()
for fold_number, fold_info in self._folds.items():
forecast = fold_info["forecast"]
for segment in forecast.segments:
forecast.loc[:, pd.IndexSlice[segment, self._fold_column]] = fold_number
stacked_forecast = stacked_forecast.append(forecast.df)
return stacked_forecast

def backtest(
self,
ts: TSDataset,
metrics: List[Metric],
n_folds: int = 5,
mode: str = "expand",
aggregate_metrics: bool = False,
n_jobs: int = 1,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Run backtest with the pipeline.
Parameters
----------
ts:
dataset to fit models in backtest
metrics:
list of metrics to compute for each fold
n_folds:
number of folds
mode:
one of 'expand', 'constant' -- train generation policy
aggregate_metrics:
if True aggregate metrics above folds, return raw metrics otherwise
n_jobs:
number of jobs to run in parallel
Returns
-------
pd.DataFrame, pd.DataFrame, pd.Dataframe:
metrics dataframe, forecast dataframe and dataframe with information about folds
"""
self._init_backtest()
self._validate_backtest_n_folds(n_folds=n_folds)
self._validate_backtest_dataset(ts=ts, n_folds=n_folds, horizon=self.horizon)
self._validate_backtest_metrics(metrics=metrics)
folds = Parallel(n_jobs=n_jobs, verbose=11, backend="multiprocessing")(
delayed(self._run_fold)(train=train, test=test, fold_number=fold_number, metrics=metrics)
for fold_number, (train, test) in enumerate(
self._generate_folds_datasets(ts=ts, n_folds=n_folds, horizon=self.horizon, mode=mode)
)
)

self._folds = {i: fold for i, fold in enumerate(folds)}

metrics_df = self._get_backtest_metrics(aggregate_metrics=aggregate_metrics)
forecast_df = self._get_backtest_forecasts()
fold_info_df = self._get_fold_info()

tslogger.start_experiment(job_type="crossval_results", group="all")
tslogger.log_backtest_metrics(ts, metrics_df, forecast_df, fold_info_df)
tslogger.finish_experiment()

return metrics_df, forecast_df, fold_info_df
75 changes: 75 additions & 0 deletions tests/conftest.py
Expand Up @@ -268,3 +268,78 @@ def ts_with_different_series_length(example_df: pd.DataFrame) -> TSDataset:
df.loc[:4, pd.IndexSlice["segment_1", "target"]] = None
ts = TSDataset(df=df, freq="H")
return ts


@pytest.fixture
def imbalanced_tsdf(random_seed) -> TSDataset:
"""Generate two series with big time range difference"""
df1 = pd.DataFrame({"timestamp": pd.date_range("2021-01-25", "2021-02-01", freq="D")})
df1["segment"] = "segment_1"
df1["target"] = np.random.uniform(0, 5, len(df1))

df2 = pd.DataFrame({"timestamp": pd.date_range("2020-01-01", "2021-02-01", freq="D")})
df2["segment"] = "segment_2"
df2["target"] = np.random.uniform(0, 5, len(df2))

df = df1.append(df2)
df = df.pivot(index="timestamp", columns="segment").reorder_levels([1, 0], axis=1).sort_index(axis=1)
df.columns.names = ["segment", "feature"]
ts = TSDataset(df, freq="D")
return ts


@pytest.fixture
def example_tsdf(random_seed) -> TSDataset:
df1 = pd.DataFrame()
df1["timestamp"] = pd.date_range(start="2020-01-01", end="2020-02-01", freq="H")
df1["segment"] = "segment_1"
df1["target"] = np.arange(len(df1)) + 2 * np.random.normal(size=len(df1))

df2 = pd.DataFrame()
df2["timestamp"] = pd.date_range(start="2020-01-01", end="2020-02-01", freq="H")
df2["segment"] = "segment_2"
df2["target"] = np.sqrt(np.arange(len(df2)) + 2 * np.cos(np.arange(len(df2))))

df = pd.concat([df1, df2], ignore_index=True)
df = df.pivot(index="timestamp", columns="segment").reorder_levels([1, 0], axis=1).sort_index(axis=1)
df.columns.names = ["segment", "feature"]
df = TSDataset(df, freq="H")
return df


@pytest.fixture
def big_daily_example_tsdf(random_seed) -> TSDataset:
df1 = pd.DataFrame()
df1["timestamp"] = pd.date_range(start="2019-01-01", end="2020-04-01", freq="D")
df1["segment"] = "segment_1"
df1["target"] = np.arange(len(df1)) + 2 * np.random.normal(size=len(df1))

df2 = pd.DataFrame()
df2["timestamp"] = pd.date_range(start="2019-06-01", end="2020-04-01", freq="D")
df2["segment"] = "segment_2"
df2["target"] = np.sqrt(np.arange(len(df2)) + 2 * np.cos(np.arange(len(df2))))

df = pd.concat([df1, df2], ignore_index=True)
df = df.pivot(index="timestamp", columns="segment").reorder_levels([1, 0], axis=1).sort_index(axis=1)
df.columns.names = ["segment", "feature"]
df = TSDataset(df, freq="D")
return df


@pytest.fixture
def big_example_tsdf(random_seed) -> TSDataset:
df1 = pd.DataFrame()
df1["timestamp"] = pd.date_range(start="2020-01-01", end="2021-02-01", freq="D")
df1["segment"] = "segment_1"
df1["target"] = np.arange(len(df1)) + 2 * np.random.normal(size=len(df1))

df2 = pd.DataFrame()
df2["timestamp"] = pd.date_range(start="2020-01-01", end="2021-02-01", freq="D")
df2["segment"] = "segment_2"
df2["target"] = np.sqrt(np.arange(len(df2)) + 2 * np.cos(np.arange(len(df2))))

df = pd.concat([df1, df2], ignore_index=True)
df = df.pivot(index="timestamp", columns="segment").reorder_levels([1, 0], axis=1).sort_index(axis=1)
df.columns.names = ["segment", "feature"]
df = TSDataset(df, freq="D")
return df

0 comments on commit 9c37a03

Please sign in to comment.