diff --git a/CHANGELOG.md b/CHANGELOG.md index ee7a69a94..aae6f3447 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] +### Added +- BinsegTrendTransform, ChangePointsTrendTransform ([#87](https://github.com/tinkoff-ai/etna-ts/pull/87)) + +### Changed + +### Fixed + + ## [1.1.0] - 2021-09-23 ### Added - MedianOutliersTransform, DensityOutliersTransform ([#30](https://github.com/tinkoff-ai/etna-ts/pull/30)) diff --git a/etna/transforms/__init__.py b/etna/transforms/__init__.py index eb6639afe..c66f92475 100644 --- a/etna/transforms/__init__.py +++ b/etna/transforms/__init__.py @@ -1,5 +1,7 @@ from etna.transforms.add_constant import AddConstTransform from etna.transforms.base import Transform +from etna.transforms.binseg import BinsegTrendTransform +from etna.transforms.change_points_trend import ChangePointsTrendTransform from etna.transforms.datetime_flags import DateFlagsTransform from etna.transforms.datetime_flags import TimeFlagsTransform from etna.transforms.detrend import LinearTrendTransform diff --git a/etna/transforms/binseg.py b/etna/transforms/binseg.py new file mode 100644 index 000000000..c9c285429 --- /dev/null +++ b/etna/transforms/binseg.py @@ -0,0 +1,105 @@ +from functools import lru_cache +from typing import Any +from typing import Optional + +from ruptures.base import BaseCost +from ruptures.costs import cost_factory +from ruptures.detection import Binseg +from sklearn.linear_model import LinearRegression + +from etna.transforms.change_points_trend import ChangePointsTrendTransform +from etna.transforms.change_points_trend import TDetrendModel + + +class _Binseg(Binseg): + """Binary segmentation with lru_cache.""" + + def __init__( + self, + model: str = "l2", + custom_cost: Optional[BaseCost] = None, + min_size: int = 2, + jump: int = 5, + params: Any = None, + ): + """Initialize a Binseg instance. + + Args: + model (str, optional): segment model, ["l1", "l2", "rbf",...]. Not used if ``'custom_cost'`` is not None. + custom_cost (BaseCost, optional): custom cost function. Defaults to None. + min_size (int, optional): minimum segment length. Defaults to 2 samples. + jump (int, optional): subsample (one every *jump* points). Defaults to 5 samples. + params (dict, optional): a dictionary of parameters for the cost instance. + """ + if custom_cost is not None and isinstance(custom_cost, BaseCost): + self.cost = custom_cost + elif params is None: + self.cost = cost_factory(model=model) + else: + self.cost = cost_factory(model=model, **params) + self.min_size = max(min_size, self.cost.min_size) + self.jump = jump + self.n_samples = None + self.signal = None + + @lru_cache(maxsize=None) + def single_bkp(self, start: int, end: int) -> Any: + """Run _single_bkp with lru_cache decorator.""" + return self._single_bkp(start=start, end=end) + + +class BinsegTrendTransform(ChangePointsTrendTransform): + """BinsegTrendTransform uses _Binseg model as a change point detection model in ChangePointsTrendTransform transform.""" + + def __init__( + self, + in_column: str, + detrend_model: TDetrendModel = LinearRegression(), + model: str = "ar", + custom_cost: Optional[BaseCost] = None, + min_size: int = 2, + jump: int = 1, + n_bkps: int = 5, + pen: Optional[float] = None, + epsilon: Optional[float] = None, + ): + """Init BinsegTrendTransform. + + Parameters + ---------- + in_column: + name of column to apply transform to + detrend_model: + model to get trend in data + model: + binseg segment model, ["l1", "l2", "rbf",...]. Not used if 'custom_cost' is not None. + custom_cost: + binseg custom cost function + min_size: + minimum segment length necessary to decide it is a stable trend segment + jump: + jump value can speed up computations: if jump==k, the algo will use every k-th value for change points search. + n_bkps: + number of change points to find + pen: + penalty value (>0) + epsilon: + reconstruction budget (>0) + """ + self.model = model + self.custom_cost = custom_cost + self.min_size = min_size + self.jump = jump + self.n_bkps = n_bkps + self.pen = pen + self.epsilon = epsilon + super().__init__( + in_column=in_column, + change_point_model=_Binseg( + model=self.model, custom_cost=self.custom_cost, min_size=self.min_size, jump=self.jump + ), + detrend_model=detrend_model, + n_bkps=self.n_bkps, + pen=self.pen, + epsilon=self.epsilon, + ) diff --git a/etna/transforms/change_points_trend.py b/etna/transforms/change_points_trend.py new file mode 100644 index 000000000..02a2c0ea9 --- /dev/null +++ b/etna/transforms/change_points_trend.py @@ -0,0 +1,208 @@ +from copy import deepcopy +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple + +import numpy as np +import pandas as pd +from ruptures.base import BaseEstimator +from ruptures.costs import CostLinear +from sklearn.base import RegressorMixin + +from etna.transforms.base import PerSegmentWrapper +from etna.transforms.base import Transform + +TTimestampInterval = Tuple[pd.Timestamp, pd.Timestamp] +TDetrendModel = RegressorMixin + + +class _OneSegmentChangePointsTrendTransform(Transform): + """_OneSegmentChangePointsTransform subtracts multiple linear trend from series.""" + + def __init__( + self, + in_column: str, + change_point_model: BaseEstimator, + detrend_model: TDetrendModel, + **change_point_model_predict_params, + ): + """Init _OneSegmentChangePointsTrendTransform. + + Parameters + ---------- + in_column: + name of column to apply transform to + change_point_model: + model to get trend change points + detrend_model: + model to get trend in data + change_point_model_predict_params: + params for change_point_model predict method + """ + self.in_column = in_column + self.out_columns = in_column + self.change_point_model = change_point_model + self.detrend_model = detrend_model + self.per_interval_models: Optional[Dict[TTimestampInterval, TDetrendModel]] = None + self.intervals: Optional[List[TTimestampInterval]] = None + self.change_point_model_predict_params = change_point_model_predict_params + + def _prepare_signal(self, series: pd.Series) -> np.array: + """Prepare series for change point model.""" + signal = series.to_numpy() + if isinstance(self.change_point_model.cost, CostLinear): + signal = signal.reshape((-1, 1)) + return signal + + def _get_change_points(self, series: pd.Series) -> List[pd.Timestamp]: + """Fit change point model with series data and predict trens change points.""" + signal = self._prepare_signal(series=series) + timestamp = series.index + self.change_point_model.fit(signal=signal) + # last point in change points is the first index after the series + change_points_indices = self.change_point_model.predict(**self.change_point_model_predict_params)[:-1] + change_points = [timestamp[idx] for idx in change_points_indices] + return change_points + + @staticmethod + def _build_trend_intervals(change_points: List[pd.Timestamp]) -> List[TTimestampInterval]: + """Create list of stable trend intervals from list of change points.""" + change_points = sorted(change_points) + left_border = pd.Timestamp.min + intervals = [] + for point in change_points: + right_border = point + intervals.append((left_border, right_border)) + left_border = right_border + intervals.append((left_border, pd.Timestamp.max)) + return intervals + + def _init_detrend_models( + self, intervals: List[TTimestampInterval] + ) -> Dict[Tuple[pd.Timestamp, pd.Timestamp], TDetrendModel]: + """Create copy of detrend model for each timestamp interval.""" + per_interval_models = {interval: deepcopy(self.detrend_model) for interval in intervals} + return per_interval_models + + def _get_timestamps(self, series: pd.Series) -> np.ndarray: + """Convert ETNA timestamp-index to a list of timestamps to fit regression models.""" + timestamps = series.index + timestamps = np.array([[ts.timestamp()] for ts in timestamps]) + return timestamps + + def _fit_per_interval_model(self, series: pd.Series): + """Fit per-interval models with corresponding data from series.""" + for interval in self.intervals: + tmp_series = series[interval[0] : interval[1]] + x = self._get_timestamps(series=tmp_series) + y = tmp_series.values + self.per_interval_models[interval].fit(x, y) + + def _predict_per_interval_model(self, series: pd.Series) -> pd.Series: + """Apply per-interval detrending to series.""" + trend_series = pd.Series(index=series.index) + for interval in self.intervals: + tmp_series = series[interval[0] : interval[1]] + if tmp_series.empty: + continue + x = self._get_timestamps(series=tmp_series) + trend = self.per_interval_models[interval].predict(x) + trend_series[tmp_series.index] = trend + return trend_series + + def fit(self, df: pd.DataFrame) -> "OneSegmentChangePointsTransform": + """Fit OneSegmentChangePointsTransform: find thrend change points in df, fit detrend models with data from intervals of stable trend. + + Parameters + ---------- + df: + one segment dataframe indexed with timestamp + + Returns + ------- + self + """ + # we need copy here because Binseg with CostAR (model="ar") changes given signal inplace; if it is fixed + # @TODO: delete copy + series = df.loc[df[self.in_column].first_valid_index() :, self.in_column].copy(deep=True) + change_points = self._get_change_points(series=series) + self.intervals = self._build_trend_intervals(change_points=change_points) + self.per_interval_models = self._init_detrend_models(intervals=self.intervals) + self._fit_per_interval_model(series=series) + return self + + def transform(self, df: pd.DataFrame) -> pd.DataFrame: + """Split df to intervals of stable trend and subtract trend from each one. + + Parameters + ---------- + df: + one segment dataframe to subtract trend + + Returns + ------- + detrended df: pd.DataFrame + df with detrended in_column series + """ + df._is_copy = False + series = df.loc[df[self.in_column].first_valid_index() :, self.in_column] + trend_series = self._predict_per_interval_model(series=series) + df.loc[:, self.in_column] -= trend_series + return df + + def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame: + """Split df to intervals of stable trend according to previous change point detection and add trend to each one. + + Parameters + ---------- + df: + one segment dataframe to turn trend back + + Returns + ------- + df: pd.DataFrame + df with restored trend in in_column + """ + df._is_copy = False + series = df.loc[df[self.in_column].first_valid_index() :, self.in_column] + trend_series = self._predict_per_interval_model(series=series) + df.loc[:, self.in_column] += trend_series + return df + + +class ChangePointsTrendTransform(PerSegmentWrapper): + """ChangePointsTrendTransform subtracts multiple linear trend from series.""" + + def __init__( + self, + in_column: str, + change_point_model: BaseEstimator, + detrend_model: TDetrendModel, + **change_point_model_predict_params, + ): + """Init ChangePointsTrendTransform. + + Parameters + ---------- + in_column: + name of column to apply transform to + change_point_model: + model to get trend change points + detrend_model: + model to get trend in data + change_point_model_predict_params: + params for change_point_model predict method + """ + self.in_column = in_column + self.change_point_model = change_point_model + self.detrend_model = detrend_model + self.change_point_model_predict_params = change_point_model_predict_params + super().__init__( + transform=_OneSegmentChangePointsTrendTransform( + in_column=self.in_column, + change_point_model=self.change_point_model, + detrend_model=self.detrend_model, + **self.change_point_model_predict_params, + ) + ) diff --git a/tests/conftest.py b/tests/conftest.py index 866909fca..96d2c4b9a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -225,3 +225,33 @@ def outliers_tsds(): tsds = TSDataset(df, "1d", exog) return tsds + + +@pytest.fixture +def multitrend_df() -> pd.DataFrame: + """Generate one segment pd.DataFrame with multiple linear trend.""" + df = pd.DataFrame({"timestamp": pd.date_range("2020-01-01", "2021-05-31")}) + ns = [100, 150, 80, 187] + ks = [0.4, -0.3, 0.8, -0.6] + x = np.zeros(shape=(len(df))) + left = 0 + right = 0 + for i, (n, k) in enumerate(zip(ns, ks)): + right += n + x[left:right] = np.arange(0, n, 1) * k + for _n, _k in zip(ns[:i], ks[:i]): + x[left:right] += _n * _k + left = right + df["target"] = x + df["segment"] = "segment_1" + df = TSDataset.to_dataset(df=df) + return df + + +@pytest.fixture +def ts_with_different_series_length(example_df: pd.DataFrame) -> TSDataset: + """Generate TSDataset with different lengths series.""" + df = TSDataset.to_dataset(example_df) + df.loc[:4, pd.IndexSlice["segment_1", "target"]] = None + ts = TSDataset(df=df, freq="H") + return ts diff --git a/tests/test_transforms/test_binseg_trend_transform.py b/tests/test_transforms/test_binseg_trend_transform.py new file mode 100644 index 000000000..aadc4fd14 --- /dev/null +++ b/tests/test_transforms/test_binseg_trend_transform.py @@ -0,0 +1,54 @@ +from copy import deepcopy +from typing import Any + +import numpy as np +import pytest +from ruptures.costs import CostAR +from ruptures.costs import CostL1 +from ruptures.costs import CostL2 +from ruptures.costs import CostLinear +from ruptures.costs import CostMl +from ruptures.costs import CostNormal +from ruptures.costs import CostRank +from ruptures.costs import CostRbf + +from etna.datasets import TSDataset +from etna.transforms.binseg import BinsegTrendTransform + + +def test_binseg_in_pipeline(example_tsds: TSDataset): + bs = BinsegTrendTransform(in_column="target") + example_tsds.fit_transform([bs]) + for segment in example_tsds.segments: + assert abs(example_tsds[:, segment, "target"].mean()) < 1 + + +@pytest.mark.parametrize( + "custom_cost_class", (CostMl, CostAR, CostLinear, CostRbf, CostL2, CostL1, CostNormal, CostRank) +) +def test_binseg_run_with_custom_costs(example_tsds: TSDataset, custom_cost_class: Any): + """Check that binseg trend works with different custom costs.""" + bs = BinsegTrendTransform(in_column="target", custom_cost=custom_cost_class()) + ts = deepcopy(example_tsds) + ts.fit_transform([bs]) + ts.inverse_transform() + assert (ts.df == example_tsds.df).all().all() + + +@pytest.mark.parametrize("model", ("l1", "l2", "normal", "rbf", "linear", "ar", "mahalanobis", "rank")) +def test_binseg_run_with_model(example_tsds: TSDataset, model: Any): + """Check that binseg trend works with different models.""" + bs = BinsegTrendTransform(in_column="target", model=model) + ts = deepcopy(example_tsds) + ts.fit_transform([bs]) + ts.inverse_transform() + assert (ts.df == example_tsds.df).all().all() + + +def test_binseg_runs_with_different_series_length(ts_with_different_series_length: TSDataset): + """Check that binseg works with datasets with different length series.""" + bs = BinsegTrendTransform(in_column="target") + ts = deepcopy(ts_with_different_series_length) + ts.fit_transform([bs]) + ts.inverse_transform() + np.allclose(ts.df.values, ts_with_different_series_length.df.values, equal_nan=True) diff --git a/tests/test_transforms/test_change_points_trend_transform.py b/tests/test_transforms/test_change_points_trend_transform.py new file mode 100644 index 000000000..6ffe4e494 --- /dev/null +++ b/tests/test_transforms/test_change_points_trend_transform.py @@ -0,0 +1,173 @@ +import numpy as np +import pandas as pd +import pytest +from sklearn.linear_model import LinearRegression + +from etna.datasets import TSDataset +from etna.transforms.binseg import _Binseg +from etna.transforms.change_points_trend import _OneSegmentChangePointsTrendTransform + + +@pytest.fixture +def post_multitrend_df() -> pd.DataFrame: + """Generate pd.DataFrame with timestamp after multitrend_df.""" + df = pd.DataFrame({"timestamp": pd.date_range("2021-07-01", "2021-07-31")}) + df["target"] = 0 + df["segment"] = "segment_1" + df = TSDataset.to_dataset(df=df) + return df + + +@pytest.fixture +def pre_multitrend_df() -> pd.DataFrame: + """Generate pd.DataFrame with timestamp before multitrend_df.""" + df = pd.DataFrame({"timestamp": pd.date_range("2019-12-01", "2019-12-31")}) + df["target"] = 0 + df["segment"] = "segment_1" + df = TSDataset.to_dataset(df=df) + return df + + +@pytest.mark.parametrize("n_bkps", (5, 10, 12, 27)) +def test_get_change_points(multitrend_df: pd.DataFrame, n_bkps: int): + """Check that _get_change_points method return correct number of points in correct format.""" + bs = _OneSegmentChangePointsTrendTransform( + in_column="target", change_point_model=_Binseg(), detrend_model=LinearRegression(), n_bkps=n_bkps + ) + change_points = bs._get_change_points(multitrend_df["segment_1"]["target"]) + assert isinstance(change_points, list) + assert len(change_points) == n_bkps + for point in change_points: + assert isinstance(point, pd.Timestamp) + + +def test_build_trend_intervals(): + """Check correctnes of intervals generation with list of change points.""" + change_points = [pd.Timestamp("2020-01-01"), pd.Timestamp("2020-01-18"), pd.Timestamp("2020-02-24")] + expected_intervals = [ + (pd.Timestamp.min, pd.Timestamp("2020-01-01")), + (pd.Timestamp("2020-01-01"), pd.Timestamp("2020-01-18")), + (pd.Timestamp("2020-01-18"), pd.Timestamp("2020-02-24")), + (pd.Timestamp("2020-02-24"), pd.Timestamp.max), + ] + intervals = _OneSegmentChangePointsTrendTransform._build_trend_intervals(change_points=change_points) + assert isinstance(intervals, list) + assert len(intervals) == 4 + for (exp_left, exp_right), (real_left, real_right) in zip(expected_intervals, intervals): + assert exp_left == real_left + assert exp_right == real_right + + +def test_models_after_fit(multitrend_df: pd.DataFrame): + """Check that fit method generates correct number of detrend model's copies.""" + bs = _OneSegmentChangePointsTrendTransform( + in_column="target", change_point_model=_Binseg(), detrend_model=LinearRegression(), n_bkps=5 + ) + bs.fit(df=multitrend_df["segment_1"]) + assert isinstance(bs.per_interval_models, dict) + assert len(bs.per_interval_models) == 6 + models = bs.per_interval_models.values() + models_ids = [id(model) for model in models] + assert len(set(models_ids)) == 6 + + +def test_transform_detrend(multitrend_df: pd.DataFrame): + """Check that transform method detrends given series.""" + bs = _OneSegmentChangePointsTrendTransform( + in_column="target", change_point_model=_Binseg(), detrend_model=LinearRegression(), n_bkps=5 + ) + bs.fit(df=multitrend_df["segment_1"]) + transformed = bs.transform(df=multitrend_df["segment_1"]) + assert transformed.columns == ["target"] + assert abs(transformed["target"].mean()) < 0.1 + + +def test_transform(multitrend_df: pd.DataFrame): + """Check that detrend models get series trends.""" + bs = _OneSegmentChangePointsTrendTransform( + in_column="target", change_point_model=_Binseg(), detrend_model=LinearRegression(), n_bkps=50 + ) + bs.fit(df=multitrend_df["segment_1"]) + transformed = bs.transform(df=multitrend_df["segment_1"]) + assert transformed.columns == ["target"] + assert abs(transformed["target"].std()) < 1 + + +def test_inverse_transform(multitrend_df: pd.DataFrame): + """Check that inverse_transform turns transformed series back to the origin one.""" + bs = _OneSegmentChangePointsTrendTransform( + in_column="target", change_point_model=_Binseg(), detrend_model=LinearRegression(), n_bkps=5 + ) + bs.fit(df=multitrend_df["segment_1"]) + + transformed = bs.transform(df=multitrend_df["segment_1"].copy(deep=True)) + transformed_df_old = transformed.reset_index() + transformed_df_old["segment"] = "segment_1" + transformed_df = TSDataset.to_dataset(df=transformed_df_old) + + inversed = bs.inverse_transform(df=transformed_df["segment_1"].copy(deep=True)) + + np.testing.assert_array_almost_equal(inversed["target"], multitrend_df["segment_1"]["target"], decimal=10) + + +def test_inverse_transform_hard(multitrend_df: pd.DataFrame): + """Check the logic of out-of-sample inverse transformation: for past and future dates unseen by transform.""" + bs = _OneSegmentChangePointsTrendTransform( + in_column="target", change_point_model=_Binseg(), detrend_model=LinearRegression(), n_bkps=5 + ) + bs.fit(df=multitrend_df["segment_1"]["2020-02-01":"2021-05-01"]) + + transformed = bs.transform(df=multitrend_df["segment_1"].copy(deep=True)) + transformed_df_old = transformed.reset_index() + transformed_df_old["segment"] = "segment_1" + transformed_df = TSDataset.to_dataset(df=transformed_df_old) + + inversed = bs.inverse_transform(df=transformed_df["segment_1"].copy(deep=True)) + + np.testing.assert_array_almost_equal(inversed["target"], multitrend_df["segment_1"]["target"], decimal=10) + + +def test_transform_pre_history(multitrend_df: pd.DataFrame, pre_multitrend_df: pd.DataFrame): + """Check that transform works correctly in case of fully unseen pre history data.""" + bs = _OneSegmentChangePointsTrendTransform( + in_column="target", change_point_model=_Binseg(), detrend_model=LinearRegression(), n_bkps=20 + ) + bs.fit(df=multitrend_df["segment_1"]) + transformed = bs.transform(pre_multitrend_df["segment_1"]) + expected = [x * 0.4 for x in list(range(31, 0, -1))] + np.testing.assert_array_almost_equal(transformed["target"], expected, decimal=10) + + +def test_inverse_transform_pre_history(multitrend_df: pd.DataFrame, pre_multitrend_df: pd.DataFrame): + """Check that inverse_transform works correctly in case of fully unseen pre history data.""" + bs = _OneSegmentChangePointsTrendTransform( + in_column="target", change_point_model=_Binseg(), detrend_model=LinearRegression(), n_bkps=20 + ) + bs.fit(df=multitrend_df["segment_1"]) + inversed = bs.inverse_transform(pre_multitrend_df["segment_1"]) + expected = [x * (-0.4) for x in list(range(31, 0, -1))] + np.testing.assert_array_almost_equal(inversed["target"], expected, decimal=10) + + +def test_transform_post_history(multitrend_df: pd.DataFrame, post_multitrend_df: pd.DataFrame): + """Check that transform works correctly in case of fully unseen post history data with offset.""" + bs = _OneSegmentChangePointsTrendTransform( + in_column="target", change_point_model=_Binseg(), detrend_model=LinearRegression(), n_bkps=20 + ) + bs.fit(df=multitrend_df["segment_1"]) + transformed = bs.transform(post_multitrend_df["segment_1"]) + # trend + last point of seen data + trend for offset interval + expected = [abs(x * (-0.6) - 52.6 - 0.6 * 30) for x in list(range(1, 32))] + np.testing.assert_array_almost_equal(transformed["target"], expected, decimal=10) + + +def test_inverse_transform_post_history(multitrend_df: pd.DataFrame, post_multitrend_df: pd.DataFrame): + """Check that inverse_transform works correctly in case of fully unseen post history data with offset.""" + bs = _OneSegmentChangePointsTrendTransform( + in_column="target", change_point_model=_Binseg(), detrend_model=LinearRegression(), n_bkps=20 + ) + bs.fit(df=multitrend_df["segment_1"]) + transformed = bs.inverse_transform(post_multitrend_df["segment_1"]) + # trend + last point of seen data + trend for offset interval + expected = [x * (-0.6) - 52.6 - 0.6 * 30 for x in list(range(1, 32))] + np.testing.assert_array_almost_equal(transformed["target"], expected, decimal=10)