Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] simple tabular prediction reduction for forecasting #4564

Merged
merged 12 commits into from May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/api_reference/forecasting.rst
Expand Up @@ -77,6 +77,7 @@ Use ``make_reduction`` for easy specification.
RecursiveTimeSeriesRegressionForecaster
DirRecTabularRegressionForecaster
DirRecTimeSeriesRegressionForecaster
YfromX

Naive forecasters
-----------------
Expand Down
2 changes: 2 additions & 0 deletions sktime/forecasting/compose/__init__.py
Expand Up @@ -27,6 +27,7 @@
"BaggingForecaster",
"ForecastByLevel",
"Permute",
"YfromX",
]

from sktime.forecasting.compose._bagging import BaggingForecaster
Expand All @@ -53,6 +54,7 @@
MultioutputTimeSeriesRegressionForecaster,
RecursiveTabularRegressionForecaster,
RecursiveTimeSeriesRegressionForecaster,
YfromX,
make_reduction,
)
from sktime.forecasting.compose._stack import StackingForecaster
206 changes: 203 additions & 3 deletions sktime/forecasting/compose/_reduce.py
Expand Up @@ -24,6 +24,7 @@
"DirRecTabularRegressionForecaster",
"DirRecTimeSeriesRegressionForecaster",
"DirectReductionForecaster",
"YfromX",
]

from warnings import warn
Expand Down Expand Up @@ -1622,6 +1623,7 @@ def _create_fcst_df(target_date, origin_df, fill=None):

def _coerce_col_str(X):
"""Coerce columns to string, to satisfy sklearn convention."""
X = X.copy()
X.columns = [str(x) for x in X.columns]
return X

Expand Down Expand Up @@ -1684,15 +1686,18 @@ def _get_expected_pred_idx(self, fh):
if isinstance(fh, ForecastingHorizon):
fh_idx = pd.Index(fh.to_absolute_index(self.cutoff))
else:
fh_idx = pd.Index(fh.to_pandas())
fh_idx = pd.Index(fh)
y_index = self._y.index

if isinstance(y_index, pd.MultiIndex):
y_inst_idx = y_index.droplevel(-1).unique()
if isinstance(y_inst_idx, pd.MultiIndex):
fh_idx = pd.Index([x + (y,) for y in fh_idx for x in y_inst_idx])
fh_idx = pd.Index([x + (y,) for x in y_inst_idx for y in fh_idx])
else:
fh_idx = pd.Index([(x, y) for y in fh_idx for x in y_inst_idx])
fh_idx = pd.Index([(x, y) for x in y_inst_idx for y in fh_idx])

if hasattr(y_index, "names") and y_index.names is not None:
fh_idx.names = y_index.names

return fh_idx

Expand Down Expand Up @@ -2388,3 +2393,198 @@ def get_test_params(cls, parameter_set="default"):
}

return params1


class YfromX(BaseForecaster, _ReducerMixin):
"""Simple reduction predicting endogeneous from concurrent exogeneous variables.

Tabulates all seen `X` and `y` by time index and applies
tabular supervised regression.

In `fit`, given endogeneous time series `y` and exogeneous `X`:
fits `estimator` to feature-label pairs as defined as follows.

features = :math:`y(t)`, labels: :math:`X(t)`
ranging over all :math:`t` where the above have been observed (are in the index)

In `predict`, at a time :math:`t` in the forecasting horizon, uses `estimator`
to predict :math:`y(t)`, from labels: :math:`X(t)`

If no exogeneous data is provided, will predict the mean of `y` seen in `fit`.

In order to use a fit not on the entire historical data
and update periodically, combine this with `UpdateRefitsEvery`.

In order to deal with missing data, combine this with `Imputer`.

To construct an custom direct reducer,
combine with `YtoX`, `Lag`, or `ReducerTransform`.

Parameters
----------
estimator : sklearn regressor, must be compatible with sklearn interface
tabular regression algorithm used in reduction algorithm
pooling : str, one of ["local", "global", "panel"], optional, default="local"
level on which data are pooled to fit the supervised regression model
"local" = unit/instance level, one reduced model per lowest hierarchy level
"global" = top level, one reduced model overall, on pooled data ignoring levels
"panel" = second lowest level, one reduced model per panel level (-2)
if there are 2 or less levels, "global" and "panel" result in the same
if there is only 1 level (single time series), all three settings agree

Example
-------
>>> from sktime.datasets import load_longley
>>> from sktime.forecasting.model_selection import temporal_train_test_split
>>> from sktime.forecasting.compose import YfromX
>>> from sklearn.linear_model import LinearRegression
>>>
>>> y, X = load_longley()
>>> y_train, y_test, X_train, X_test = temporal_train_test_split(y, X)
>>> fh = y_test.index
>>>
>>> f = YfromX(LinearRegression())
>>> f.fit(y=y_train, X=X_train, fh=fh)
YfromX(...)
>>> y_pred = f.predict(X=X_test)
"""

_tags = {
"requires-fh-in-fit": False, # is the forecasting horizon required in fit?
"ignores-exogeneous-X": False,
"X_inner_mtype": ["pd.DataFrame", "pd-multiindex", "pd_multiindex_hier"],
"y_inner_mtype": ["pd.DataFrame", "pd-multiindex", "pd_multiindex_hier"],
}

def __init__(self, estimator, pooling="local"):

self.estimator = estimator
self.pooling = pooling
super(YfromX, self).__init__()

if pooling == "local":
mtypes = "pd.DataFrame"
elif pooling == "global":
mtypes = ["pd.DataFrame", "pd-multiindex", "pd_multiindex_hier"]
elif pooling == "panel":
mtypes = ["pd.DataFrame", "pd-multiindex"]
else:
raise ValueError(
"pooling in DirectReductionForecaster must be one of"
' "local", "global", "panel", '
f"but found {pooling}"
)
self.set_tags(**{"X_inner_mtype": mtypes})
self.set_tags(**{"y_inner_mtype": mtypes})

def _fit(self, y, X=None, fh=None):
"""Fit forecaster to training data.

private _fit containing the core logic, called from fit

Parameters
----------
y : pd.DataFrame
mtype is pd.DataFrame, pd-multiindex, or pd_multiindex_hier
Time series to which to fit the forecaster.
fh : guaranteed to be ForecastingHorizon or None, optional (default=None)
The forecasting horizon with the steps ahead to to predict.
Required (non-optional) here if self.get_tag("requires-fh-in-fit")==True
Otherwise, if not passed in _fit, guaranteed to be passed in _predict
X : pd.DataFrame optional (default=None)
mtype is pd.DataFrame, pd-multiindex, or pd_multiindex_hier
Exogeneous time series to fit to.

Returns
-------
self : reference to self
"""
if X is None:
from sklearn.dummy import DummyRegressor

X = _coerce_col_str(y)
estimator = DummyRegressor()
else:
X = _coerce_col_str(X)
estimator = clone(self.estimator)

y = _coerce_col_str(y)
y = y.values.flatten()

estimator.fit(X, y)
self.estimator_ = estimator

return self

def _predict(self, X=None, fh=None):
"""Forecast time series at future horizon.

private _predict containing the core logic, called from predict

Parameters
----------
fh : guaranteed to be ForecastingHorizon or None, optional (default=None)
The forecasting horizon with the steps ahead to to predict.
If not passed in _fit, guaranteed to be passed here
X : pd.DataFrame, optional (default=None)
mtype is pd.DataFrame, pd-multiindex, or pd_multiindex_hier
Exogeneous time series for the forecast

Returns
-------
y_pred : pd.DataFrame, same type as y in _fit
Point predictions
"""
fh_idx = self._get_expected_pred_idx(fh=fh)
y_cols = self._y.columns

if X is not None and self._X is not None:
X_pool = X.combine_first(self._X)
elif X is None and self._X is not None:
X_pool = self._X
elif X is not None:
X_pool = X
else:
X_pool = pd.DataFrame(0, index=fh_idx, columns=y_cols)

X_pool = _coerce_col_str(X_pool)

X_idx = X_pool.loc[fh_idx]

y_pred = self.estimator_.predict(X_idx)
y_pred = pd.DataFrame(y_pred, index=fh_idx, columns=y_cols)

return y_pred

@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator.

Parameters
----------
parameter_set : str, default="default"
Name of the set of test parameters to return, for use in tests. If no
special parameters are defined for a value, will return `"default"` set.

Returns
-------
params : dict or list of dict, default = {}
Parameters to create testing instances of the class
Each dict are parameters to construct an "interesting" test instance, i.e.,
`MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance.
`create_test_instance` uses the first (or only) dictionary in `params`
"""
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression

params1 = {
"estimator": LinearRegression(),
"pooling": "local",
}

params2 = {
"estimator": RandomForestRegressor(),
"pooling": "global", # all internal mtypes are tested across scenarios
}

return [params1, params2]