Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] factor out column ensemble functionality from _ColumnEnsembleForecaster to new base mixin #4231

Merged
merged 5 commits into from Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
170 changes: 168 additions & 2 deletions sktime/base/_meta.py
Expand Up @@ -3,11 +3,14 @@
# copyright: sktime developers, BSD-3-Clause License (see LICENSE file)
"""Implements meta estimator for estimators composed of other estimators."""

__author__ = ["mloning, fkiraly"]
__all__ = ["_HeterogenousMetaEstimator"]
__author__ = ["mloning", "fkiraly"]
__all__ = ["_HeterogenousMetaEstimator", "_ColumnEstimator"]

from inspect import isclass

import numpy as np
import pandas as pd

from sktime.base import BaseEstimator


Expand Down Expand Up @@ -768,3 +771,166 @@ def unflat_len(obj):
def is_flat(obj):
"""Check whether list or tuple is flat, returns true if yes, false if nested."""
return not any(isinstance(x, (list, tuple)) for x in obj)


class _ColumnEstimator:
"""Mixin class with utilities for by-column applicates."""

def _coerce_to_pd_index(self, obj):
"""Coerce obj to pandas Index."""
# replace ints by column names
obj = self._get_indices(self._y, obj)

# deal with numpy int by coercing to python int
if np.issubdtype(type(obj), np.integer):
obj = int(obj)

# coerce to pd.Index
if isinstance(obj, (int, str)):
return pd.Index([obj])
else:
return pd.Index(obj)

def _get_indices(self, y, idx):
"""Convert integer indices if necessary."""

def _get_index(y, ix):
# deal with numpy int by coercing to python int
if np.issubdtype(type(ix), np.integer):
ix = int(ix)

if isinstance(ix, int) and ix not in y.columns and ix < len(y.columns):
return y.columns[ix]
else:
return ix

if isinstance(idx, (list, tuple)):
return [self._get_indices(y, ix) for ix in idx]
else:
return _get_index(y, idx)

def _by_column(self, methodname, **kwargs):
"""Apply self.methodname to kwargs by column, then column-concatenate.

Parameters
----------
methodname : str, one of the methods of self
assumed to take kwargs and return pd.DataFrame
col_multiindex : bool, optional, default=False
if True, will add an additional column multiindex at top, entries = index

Returns
-------
y_pred : pd.DataFrame
result of [f.methodname(**kwargs) for _, f, _ in self.forecsaters_]
column-concatenated with keys being the variable names last seen in y
"""
# get col_multiindex arg from kwargs
col_multiindex = kwargs.pop("col_multiindex", False)

y_preds = []
keys = []
for _, est, index in getattr(self, self._steps_fitted_attr):
y_preds += [getattr(est, methodname)(**kwargs)]
keys += [index]

keys = self._get_indices(self._y, keys)

if col_multiindex:
y_pred = pd.concat(y_preds, axis=1, keys=keys)
else:
y_pred = pd.concat(y_preds, axis=1)
return y_pred

def _check_col_estimators(self, X, X_name="X", est_attr="estimators", cls=None):
"""Check getattr(self, est_attr) attribute, and coerce to (name, est, index).

Checks:

* `getattr(self, est_attr)` is single estimator, or
* `getattr(self, est_attr)` is list of (name, estimator, index)
* all `estimator` above inherit from `cls` (`None` means `BaseEstimator`)
* `X.columns` is disjoint union of `index` appearing above

Parameters
----------
X : `pandas` object with `columns` attribute of `pd.Index` type
X_name : str, optional, default = "X"
name of `X` displayed in error messages
est_attr : str, optional, default = "estimators"
attribute name of the attribute this function checks
also used in error message
cls : type, optional, default = sktime BaseEstimator
class to check inheritance from, for estimators (see above)

Returns
-------
list of (name, estimator, index) such that union of index is `X.columns`;
and estimator is estimator inheriting from `cls`

Raises
------
ValueError if checks fail, with informative error message
"""
if cls is None:
cls = BaseEstimator

estimators = getattr(self, est_attr)

# if a single estimator is passed, replicate across columns
if isinstance(estimators, cls):
ycols = [str(col) for col in X.columns]
colrange = range(len(ycols))
est_list = [estimators.clone() for _ in colrange]
return list(zip(ycols, est_list, colrange))

if (
estimators is None
or len(estimators) == 0
or not isinstance(estimators, list)
):
raise ValueError(
f"Invalid '{est_attr}' attribute, '{est_attr}' should be a list"
" of (string, estimator, int) tuples."
)
names, ests, indices = zip(*estimators)

# check names, via _HeterogenousMetaEstimator._check_names
if hasattr(self, "_check_names"):
self._check_names(names)

# coerce column names to indices in columns
indices = self._get_indices(X, indices)

for est in ests:
if not isinstance(est, cls):
raise ValueError(
f"The estimator {est.__class__.__name__} should be of type "
f"{cls}."
)

index_flat = flatten(indices)
index_set = set(index_flat)
not_in_y_idx = index_set.difference(X.columns)
y_cols_not_found = set(X.columns).difference(index_set)

if len(not_in_y_idx) > 0:
raise ValueError(
f"Column identifier must be indices in {X_name}.columns, or integers "
f"within the range of the total number of columns, "
f"but found column identifiers that are neither: {list(not_in_y_idx)}"
)
if len(y_cols_not_found) > 0:
raise ValueError(
f"All columns of {X_name} must be indexed by column identifiers, but "
f"the following columns of {X_name} are not indexed: "
f"{list(y_cols_not_found)}"
)

if len(index_set) != len(index_flat):
raise ValueError(
f"One estimator per column required. Found {len(index_set)} unique"
f" column names in {est_attr} arg, required {len(index_flat)}"
)

return estimators
147 changes: 24 additions & 123 deletions sktime/forecasting/compose/_column_ensemble.py
Expand Up @@ -6,10 +6,7 @@
__author__ = ["GuzalBulatova", "mloning", "fkiraly"]
__all__ = ["ColumnEnsembleForecaster"]

import numpy as np
import pandas as pd

from sktime.base._meta import flatten
from sktime.base._meta import _ColumnEstimator
from sktime.forecasting.base._base import BaseForecaster
from sktime.forecasting.base._meta import _HeterogenousEnsembleForecaster

Expand All @@ -18,7 +15,7 @@
PANDAS_MTYPES = ["pd.DataFrame", "pd-multiindex", "pd_multiindex_hier"]


class ColumnEnsembleForecaster(_HeterogenousEnsembleForecaster):
class ColumnEnsembleForecaster(_HeterogenousEnsembleForecaster, _ColumnEstimator):
"""Forecast each series with separate forecaster.

Applies different forecasters by columns.
Expand All @@ -44,6 +41,7 @@ class ColumnEnsembleForecaster(_HeterogenousEnsembleForecaster):

Examples
--------
>>> import pandas as pd
>>> from sktime.forecasting.compose import ColumnEnsembleForecaster
>>> from sktime.forecasting.naive import NaiveForecaster
>>> from sktime.forecasting.trend import PolynomialTrendForecaster
Expand Down Expand Up @@ -149,21 +147,6 @@ def _forecasters(self, value):
)
]

def _coerce_to_pd_index(self, obj):
"""Coerce obj to pandas Index."""
# replace ints by column names
obj = self._get_indices(self._y, obj)

# deal with numpy int by coercing to python int
if np.issubdtype(type(obj), np.integer):
obj = int(obj)

# coerce to pd.Index
if isinstance(obj, (int, str)):
return pd.Index([obj])
else:
return pd.Index(obj)

def _fit(self, y, X=None, fh=None):
"""Fit to training data.

Expand Down Expand Up @@ -213,39 +196,6 @@ def _update(self, y, X=None, update_params=True):
forecaster.update(y.loc[:, pd_index], X, update_params=update_params)
return self

def _by_column(self, methodname, **kwargs):
"""Apply self.methdoname to kwargs by column, then column-concatenate.

Parameters
----------
methodname : str, one of the methods of self
assumed to take kwargs and return pd.DataFrame
col_multiindex : bool, optional, default=False
if True, will add an additional column multiindex at top, entries = index

Returns
-------
y_pred : pd.DataFrame
result of [f.methodname(**kwargs) for _, f, _ in self.forecsaters_]
column-concatenated with keys being the variable names last seen in y
"""
# get col_multiindex arg from kwargs
col_multiindex = kwargs.pop("col_multiindex", False)

y_preds = []
keys = []
for _, forecaster, index in self.forecasters_:
y_preds += [getattr(forecaster, methodname)(**kwargs)]
keys += [index]

keys = self._get_indices(self._y, keys)

if col_multiindex:
y_pred = pd.concat(y_preds, axis=1, keys=keys)
else:
y_pred = pd.concat(y_preds, axis=1)
return y_pred

def _predict(self, fh=None, X=None):
"""Forecast time series at future horizon.

Expand Down Expand Up @@ -391,81 +341,32 @@ def _predict_var(self, fh, X=None, cov=False):
"""
return self._by_column("predict_var", fh=fh, X=X, cov=cov, col_multiindex=True)

def _get_indices(self, y, idx):
"""Convert integer indices if necessary."""

def _get_index(y, ix):
# deal with numpy int by coercing to python int
if np.issubdtype(type(ix), np.integer):
ix = int(ix)
def _check_forecasters(self, y):
"""Check self.forecasters parameter and coerce to (name, est, index).

if isinstance(ix, int) and ix not in y.columns and ix < len(y.columns):
return y.columns[ix]
else:
return ix
Checks:

if isinstance(idx, (list, tuple)):
return [self._get_indices(y, ix) for ix in idx]
else:
return _get_index(y, idx)
* `self.forecasters` is single forecaster, or
* `self.forecasters` is list of (name, forecaster, index)
* all `forecaster` above inherit from `BaseForecaster`
* `y.columns` is disjoint union of `index` appearing above

def _check_forecasters(self, y):
Parameters
----------
y : `pandas` object with `columns` attribute of `pd.Index` type

# if a single estimator is passed, replicate across columns
if isinstance(self.forecasters, BaseForecaster):
ycols = [str(col) for col in y.columns]
colrange = range(len(ycols))
forecaster_list = [self.forecasters.clone() for _ in colrange]
return list(zip(ycols, forecaster_list, colrange))

if (
self.forecasters is None
or len(self.forecasters) == 0
or not isinstance(self.forecasters, list)
):
raise ValueError(
"Invalid 'forecasters' attribute, 'forecasters' should be a list"
" of (string, estimator, int) tuples."
)
names, forecasters, indices = zip(*self.forecasters)

# check names, defined by _HeterogenousEnsembleForecaster
self._check_names(names)

# coerce column names to indices in columns
indices = self._get_indices(y, indices)

for forecaster in forecasters:
if not isinstance(forecaster, BaseForecaster):
raise ValueError(
f"The estimator {forecaster.__class__.__name__} should be a "
f"Forecaster."
)
Returns
-------
list of (name, estimator, index) such that union of index is `y.columns`;
and estimator is estimator inheriting from `BaseForecaster`

index_flat = flatten(indices)
index_set = set(index_flat)
not_in_y_idx = index_set.difference(y.columns)
y_cols_not_found = set(y.columns).difference(index_set)

if len(not_in_y_idx) > 0:
raise ValueError(
f"Column identifier must be indices in y.columns, or integers within "
f"the range of the total number of columns, "
f"but found column identifiers that are neither: {list(not_in_y_idx)}"
)
if len(y_cols_not_found) > 0:
raise ValueError(
f"All columns of y must be indexed by column identifiers, but "
f"the following columns of y are not indexed: {list(y_cols_not_found)}"
)

if len(index_set) != len(index_flat):
raise ValueError(
f"One estimator per column required. Found {len(index_set)} unique"
f" column names in forecasters arg, required {len(index_flat)}"
)

return self.forecasters
Raises
------
ValueError if checks fail, with informative error message
"""
return self._check_col_estimators(
X=y, X_name="y", est_attr="forecasters", cls=BaseForecaster
)

@classmethod
def get_test_params(cls, parameter_set="default"):
Expand Down