Skip to content

Commit

Permalink
[ENH] parallel backend selection for forecasting tuners (#5430)
Browse files Browse the repository at this point in the history
This PR adds parallel backend selection for forecasting tuners, using
the `parallelize` utility module.

Towards #5380

Also makes the following changes:
* in tuners, deprecates some of the "backend parameters" for 0.26.0 in
order to make the backend parameter interface consistent across the
codebase.
* adds default fixtures for different parallel backend settings, also
exported by the `utils.parallel` module
* tests that run the tuners with those parallelization settings
  • Loading branch information
fkiraly committed Oct 26, 2023
1 parent bb470b0 commit f2820e1
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 16 deletions.
125 changes: 109 additions & 16 deletions sktime/forecasting/model_selection/_tune.py
Expand Up @@ -23,7 +23,9 @@
from sktime.forecasting.model_evaluation import evaluate
from sktime.performance_metrics.base import BaseMetric
from sktime.split.base import BaseSplitter
from sktime.utils.parallel import parallelize
from sktime.utils.validation.forecasting import check_scoring
from sktime.utils.warnings import warn


class BaseGridSearch(_DelegatedForecaster):
Expand All @@ -36,6 +38,7 @@ class BaseGridSearch(_DelegatedForecaster):
"capability:pred_int:insample": True,
}

# todo 0.26.0: remove n_jobs, pre_dispatch parameters and all related logic
def __init__(
self,
forecaster,
Expand All @@ -52,6 +55,7 @@ def __init__(
error_score=np.nan,
tune_by_instance=False,
tune_by_variable=False,
backend_params=None,
):
self.forecaster = forecaster
self.cv = cv
Expand All @@ -67,6 +71,7 @@ def __init__(
self.error_score = error_score
self.tune_by_instance = tune_by_instance
self.tune_by_variable = tune_by_variable
self.backend_params = backend_params

super().__init__()

Expand Down Expand Up @@ -181,11 +186,25 @@ def _fit(self, y, X, fh):
scoring = check_scoring(self.scoring, obj=self)
scoring_name = f"test_{scoring.name}"

parallel = Parallel(
n_jobs=self.n_jobs, pre_dispatch=self.pre_dispatch, backend=self.backend
)
# todo 0.26.0: remove this logic and only use backend_params
backend = self.backend
backend_params = self.backend_params if self.backend_params else {}
if backend in ["threading", "multiprocessing", "loky"]:
n_jobs = self.n_jobs
pre_dispatch = self.pre_dispatch
backend_params["n_jobs"] = n_jobs
backend_params["pre_dispatch"] = pre_dispatch
if n_jobs is not None or pre_dispatch is not None:
warn(
f"in {self.__class__.__name__}, n_jobs and pre_dispatch "
"parameters are deprecated and will be removed in 0.26.0. "
"Please use n_jobs and pre_dispatch directly in the backend_params "
"argument instead.",
obj=self,
stacklevel=2,
)

def _fit_and_score(params):
def _fit_and_score(params, meta):
# Clone forecaster.
forecaster = self.forecaster.clone()

Expand Down Expand Up @@ -228,8 +247,11 @@ def evaluate_candidates(candidate_params):
)
)

out = parallel(
delayed(_fit_and_score)(params) for params in candidate_params
out = parallelize(
fun=_fit_and_score,
iter=candidate_params,
backend=backend,
backend_params=backend_params,
)

if len(out) < 1:
Expand Down Expand Up @@ -436,9 +458,19 @@ class ForecastingGridSearchCV(BaseGridSearch):
error_score : numeric value or the str 'raise', optional (default=np.nan)
The test score returned when a forecaster fails to be fitted.
return_train_score : bool, optional (default=False)
backend : str, optional (default="loky")
Specify the parallelisation backend implementation in joblib, where
"loky" is used by default.
backend : {"dask", "loky", "multiprocessing", "threading"}, by default "loky".
Runs parallel evaluate if specified and `strategy` is set as "refit".
- "None": executes loop sequentally, simple list comprehension
- "loky", "multiprocessing" and "threading": uses ``joblib.Parallel`` loops
- "dask": uses ``dask``, requires ``dask`` package in environment
Recommendation: Use "dask" or "loky" for parallel evaluate.
"threading" is unlikely to see speed ups due to the GIL and the serialization
backend (``cloudpickle``) for "dask" and "loky" is generally more robust
than the standard ``pickle`` library used in "multiprocessing".
error_score : "raise" or numeric, default=np.nan
Value to assign to the score if an exception occurs in estimator fitting. If set
to "raise", the exception is raised. If a numeric value is given,
Expand All @@ -460,6 +492,18 @@ class ForecastingGridSearchCV(BaseGridSearch):
Has the same effect as applying ColumnEnsembleForecaster wrapper to self.
If False, the same best parameter is selected for all variables.
backend_params : dict, optional
additional parameters passed to the backend as config.
Directly passed to ``utils.parallel.parallelize``.
Valid keys depend on the value of ``backend``:
- "None": no additional parameters, ``backend_params`` is ignored
- "loky", "multiprocessing" and "threading":
any valid keys for ``joblib.Parallel`` can be passed here,
e.g., ``n_jobs``, with the exception of ``backend``
which is directly controlled by ``backend``
- "dask": any valid keys for ``dask.compute`` can be passed, e.g., ``scheduler``
Attributes
----------
best_index_ : int
Expand Down Expand Up @@ -569,6 +613,7 @@ def __init__(
error_score=np.nan,
tune_by_instance=False,
tune_by_variable=False,
backend_params=None,
):
super().__init__(
forecaster=forecaster,
Expand All @@ -585,6 +630,7 @@ def __init__(
error_score=error_score,
tune_by_instance=tune_by_instance,
tune_by_variable=tune_by_variable,
backend_params=backend_params,
)
self.param_grid = param_grid

Expand Down Expand Up @@ -738,16 +784,25 @@ class ForecastingRandomizedSearchCV(BaseGridSearch):
return_n_best_forecasters: int, default=1
In case the n best forecaster should be returned, this value can be set
and the n best forecasters will be assigned to n_best_forecasters_
pre_dispatch : str, optional (default='2*n_jobs')
random_state : int, RandomState instance or None, default=None
Pseudo random number generator state used for random uniform sampling
from lists of possible values instead of scipy.stats distributions.
Pass an int for reproducible output across multiple
function calls.
pre_dispatch : str, optional (default='2*n_jobs')
backend : str, optional (default="loky")
Specify the parallelisation backend implementation in joblib, where
"loky" is used by default.
backend : {"dask", "loky", "multiprocessing", "threading"}, by default "loky".
Runs parallel evaluate if specified and `strategy` is set as "refit".
- "None": executes loop sequentally, simple list comprehension
- "loky", "multiprocessing" and "threading": uses ``joblib.Parallel`` loops
- "dask": uses ``dask``, requires ``dask`` package in environment
Recommendation: Use "dask" or "loky" for parallel evaluate.
"threading" is unlikely to see speed ups due to the GIL and the serialization
backend (``cloudpickle``) for "dask" and "loky" is generally more robust
than the standard ``pickle`` library used in "multiprocessing".
error_score : "raise" or numeric, default=np.nan
Value to assign to the score if an exception occurs in estimator fitting. If set
to "raise", the exception is raised. If a numeric value is given,
Expand All @@ -769,6 +824,18 @@ class ForecastingRandomizedSearchCV(BaseGridSearch):
Has the same effect as applying ColumnEnsembleForecaster wrapper to self.
If False, the same best parameter is selected for all variables.
backend_params : dict, optional
additional parameters passed to the backend as config.
Directly passed to ``utils.parallel.parallelize``.
Valid keys depend on the value of ``backend``:
- "None": no additional parameters, ``backend_params`` is ignored
- "loky", "multiprocessing" and "threading":
any valid keys for ``joblib.Parallel`` can be passed here,
e.g., ``n_jobs``, with the exception of ``backend``
which is directly controlled by ``backend``
- "dask": any valid keys for ``dask.compute`` can be passed, e.g., ``scheduler``
Attributes
----------
best_index_ : int
Expand Down Expand Up @@ -812,6 +879,7 @@ def __init__(
error_score=np.nan,
tune_by_instance=False,
tune_by_variable=False,
backend_params=None,
):
super().__init__(
forecaster=forecaster,
Expand All @@ -828,6 +896,7 @@ def __init__(
error_score=error_score,
tune_by_instance=tune_by_instance,
tune_by_variable=tune_by_variable,
backend_params=backend_params,
)
self.param_distributions = param_distributions
self.n_iter = n_iter
Expand Down Expand Up @@ -980,9 +1049,19 @@ class ForecastingSkoptSearchCV(BaseGridSearch):
Number of jobs to run in parallel.
None means 1 unless in a joblib.parallel_backend context.
-1 means using all processors.
backend : str, optional (default="loky")
Specify the parallelisation backend implementation in joblib, where
"loky" is used by default.
backend : {"dask", "loky", "multiprocessing", "threading"}, by default "loky".
Runs parallel evaluate if specified and `strategy` is set as "refit".
- "None": executes loop sequentally, simple list comprehension
- "loky", "multiprocessing" and "threading": uses ``joblib.Parallel`` loops
- "dask": uses ``dask``, requires ``dask`` package in environment
Recommendation: Use "dask" or "loky" for parallel evaluate.
"threading" is unlikely to see speed ups due to the GIL and the serialization
backend (``cloudpickle``) for "dask" and "loky" is generally more robust
than the standard ``pickle`` library used in "multiprocessing".
tune_by_instance : bool, optional (default=False)
Whether to tune parameter by each time series instance separately,
in case of Panel or Hierarchical data passed to the tuning estimator.
Expand All @@ -1000,6 +1079,18 @@ class ForecastingSkoptSearchCV(BaseGridSearch):
Has the same effect as applying ColumnEnsembleForecaster wrapper to self.
If False, the same best parameter is selected for all variables.
backend_params : dict, optional
additional parameters passed to the backend as config.
Directly passed to ``utils.parallel.parallelize``.
Valid keys depend on the value of ``backend``:
- "None": no additional parameters, ``backend_params`` is ignored
- "loky", "multiprocessing" and "threading":
any valid keys for ``joblib.Parallel`` can be passed here,
e.g., ``n_jobs``, with the exception of ``backend``
which is directly controlled by ``backend``
- "dask": any valid keys for ``dask.compute`` can be passed, e.g., ``scheduler``
Attributes
----------
best_index_ : int
Expand Down Expand Up @@ -1082,6 +1173,7 @@ def __init__(
error_score=np.nan,
tune_by_instance=False,
tune_by_variable=False,
backend_params=None,
):
self.param_distributions = param_distributions
self.n_iter = n_iter
Expand All @@ -1103,6 +1195,7 @@ def __init__(
error_score=error_score,
tune_by_instance=tune_by_instance,
tune_by_variable=tune_by_variable,
backend_params=backend_params,
)

def _fit(self, y, X=None, fh=None):
Expand Down
28 changes: 28 additions & 0 deletions sktime/forecasting/model_selection/tests/test_tune.py
Expand Up @@ -37,6 +37,7 @@
from sktime.transformations.series.detrend import Detrender
from sktime.transformations.series.impute import Imputer
from sktime.utils._testing.hierarchical import _make_hierarchical
from sktime.utils.parallel import _get_parallel_test_fixtures

TEST_METRICS = [MeanAbsolutePercentageError(symmetric=True), MeanSquaredError()]
TEST_METRICS_PROBA = [CRPS(), PinballLoss()]
Expand Down Expand Up @@ -322,3 +323,30 @@ def test_skoptcv_multiple_forecaster():
)
sscv.fit(y, X)
assert len(sscv.cv_results_) == 5


BACKEND_TEST = _get_parallel_test_fixtures()


@pytest.mark.skipif(
not run_test_for_class(ForecastingGridSearchCV),
reason="run test only if softdeps are present and incrementally (if requested)",
)
@pytest.mark.parametrize("backend_set", BACKEND_TEST)
def test_gscv_backends(backend_set):
"""Test ForecastingGridSearchCV."""
backend = backend_set["backend"]
backend_params = backend_set["backend_params"]

y, X = load_longley()

gscv = ForecastingGridSearchCV(
PIPE,
param_grid=PIPE_GRID,
cv=CVs[0],
scoring=TEST_METRICS[0],
error_score=ERROR_SCORES[0],
backend=backend,
backend_params=backend_params,
)
gscv.fit(y, X)
27 changes: 27 additions & 0 deletions sktime/utils/parallel.py
Expand Up @@ -116,3 +116,30 @@ def _parallelize_dask(fun, iter, meta, backend, backend_params):


para_dict["dask"] = _parallelize_dask


def _get_parallel_test_fixtures():
"""Return fixtures for parallelization tests.
Returns a list of parameter fixtures, where each fixture
is a dict with keys "backend" and "backend_params".
"""
from sktime.utils.validation._dependencies import _check_soft_dependencies

fixtures = []

# test no parallelization
fixtures.append({"backend": "None", "backend_params": {}})

# test joblib backends
for backend in ["loky", "multiprocessing", "threading"]:
fixtures.append({"backend": backend, "backend_params": {}})
fixtures.append({"backend": backend, "backend_params": {"n_jobs": 2}})
fixtures.append({"backend": backend, "backend_params": {"n_jobs": -1}})

# test dask backends
if _check_soft_dependencies("dask", severity="none"):
fixtures.append({"backend": "dask", "backend_params": {}})
fixtures.append({"backend": "dask", "backend_params": {"scheduler": "sync"}})

return fixtures

0 comments on commit f2820e1

Please sign in to comment.