diff --git a/.gitignore b/.gitignore index ea506615..c07e3ac1 100644 --- a/.gitignore +++ b/.gitignore @@ -283,3 +283,5 @@ mlinsights/mlmodel/direct*.c mlinsights/mlmodel/_*.c _unittests/unittests.out _doc/notebooks/explore/simages/* +_unittests/ut_mlbatch/cache__2/ + diff --git a/_unittests/ut_mlbatch/test_pipeline_cache.py b/_unittests/ut_mlbatch/test_pipeline_cache.py new file mode 100644 index 00000000..98735e20 --- /dev/null +++ b/_unittests/ut_mlbatch/test_pipeline_cache.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +""" +@brief test log(time=2s) +""" +import unittest +from sklearn.datasets import make_classification +from sklearn.decomposition import PCA, TruncatedSVD as SVD +from sklearn.linear_model import LogisticRegression +from sklearn.model_selection import GridSearchCV +from sklearn.pipeline import Pipeline +from pyquickhelper.pycode import ExtTestCase +from mlinsights.mlbatch.pipeline_cache import PipelineCache +from mlinsights.mlbatch.cache_model import MLCache + + +class TestPipelineCache(ExtTestCase): + + def test_make_classification(self): + X, y = make_classification(random_state=42) + pipe = PipelineCache([('pca', PCA(2)), + ('lr', LogisticRegression())], + 'cache__') + pipe.fit(X, y) + cache = MLCache.get_cache('cache__') + self.assertEqual(len(cache), 1) + key = list(cache.keys())[0] + self.assertIn("[('X',", key) + self.assertIn("('copy', 'True')", key) + MLCache.remove_cache('cache__') + + def test_grid_search(self): + X, y = make_classification(random_state=42) + param_grid = {'pca__n_components': [2, 3], + 'pca__whiten': [True, False], + 'lr__fit_intercept': [True, False]} + pipe = Pipeline([('pca', PCA(2)), + ('lr', LogisticRegression())]) + grid0 = GridSearchCV(pipe, param_grid, error_score='raise') + grid0.fit(X, y) + + pipe = PipelineCache([('pca', PCA(2)), + ('lr', LogisticRegression())], + 'cache__2') + grid = GridSearchCV(pipe, param_grid, error_score='raise') + + grid.fit(X, y) + cache = MLCache.get_cache('cache__2') + self.assertEqual(len(cache), 13) + key = list(cache.keys())[0] + self.assertIn("[('X',", key) + self.assertIn("('copy', 'True')", key) + MLCache.remove_cache('cache__2') + self.assertEqual(grid0.best_params_, grid.best_params_) + + def test_grid_search_1(self): + X, y = make_classification(random_state=42) + param_grid = {'pca__n_components': [2, 3], + 'pca__whiten': [True, False], + 'lr__fit_intercept': [True, False]} + pipe = Pipeline([('pca', PCA(2)), + ('lr', LogisticRegression())]) + grid0 = GridSearchCV(pipe, param_grid, error_score='raise', n_jobs=1) + grid0.fit(X, y) + + pipe = PipelineCache([('pca', PCA(2)), + ('lr', LogisticRegression())], + 'cache__1') + grid = GridSearchCV(pipe, param_grid, error_score='raise', n_jobs=1) + + grid.fit(X, y) + cache = MLCache.get_cache('cache__1') + self.assertEqual(len(cache), 13) + key = list(cache.keys())[0] + self.assertIn("[('X',", key) + self.assertIn("('copy', 'True')", key) + MLCache.remove_cache('cache__1') + self.assertEqual(grid0.best_params_, grid.best_params_) + + def test_grid_search_model(self): + X, y = make_classification(random_state=42) + param_grid = [{'pca': [PCA(2)], 'lr__fit_intercept': [False, True]}, + {'pca': [SVD(2)], 'lr__fit_intercept': [False, True]}] + pipe = Pipeline([('pca', 'passthrough'), + ('lr', LogisticRegression())]) + grid0 = GridSearchCV(pipe, param_grid, error_score='raise') + grid0.fit(X, y) + + pipe = PipelineCache([('pca', 'passthrough'), + ('lr', LogisticRegression())], + 'cache__3') + grid = GridSearchCV(pipe, param_grid, error_score='raise') + + grid.fit(X, y) + cache = MLCache.get_cache('cache__3') + self.assertEqual(len(cache), 7) + key = list(cache.keys())[0] + self.assertIn("[('X',", key) + self.assertIn("('copy', 'True')", key) + MLCache.remove_cache('cache__3') + self.assertEqual(grid0.best_params_, grid.best_params_) + + +if __name__ == "__main__": + unittest.main() diff --git a/mlinsights/mlbatch/__init__.py b/mlinsights/mlbatch/__init__.py new file mode 100644 index 00000000..e99f73e1 --- /dev/null +++ b/mlinsights/mlbatch/__init__.py @@ -0,0 +1,7 @@ +""" +@file +@brief Shortcuts to *mlbatch*. +""" + +from .cache_model import MLCache +from .pipeline_cache import PipelineCache diff --git a/mlinsights/mlbatch/cache_model.py b/mlinsights/mlbatch/cache_model.py new file mode 100644 index 00000000..8c015f4d --- /dev/null +++ b/mlinsights/mlbatch/cache_model.py @@ -0,0 +1,159 @@ +""" +@file +@brief Caches to cache training. +""" +import numpy + +_caches = {} + + +class MLCache: + """ + Implements a cache to reduce the number of trainings + a grid search has to do. + """ + + def __init__(self, name): + """ + @param name name of the cache + """ + self.name = name + self.cached = {} + self.count_ = {} + + def cache(self, params, value): + """ + Caches one object. + + @param params dictionary of parameters + @param value value to cache + """ + key = MLCache.as_key(params) + if key in self.cached: + raise KeyError("Key {0} already exists".format(params)) + self.cached[key] = value + self.count_[key] = 0 + + def get(self, params, default=None): + """ + Retrieves an element from the cache. + + @param params dictionary of parameters + @param default if not found + @return value or None if it does not exists + """ + key = MLCache.as_key(params) + res = self.cached.get(key, default) + if res != default: + self.count_[key] += 1 + return res + + def count(self, params): + """ + Retrieves the number of times + an elements was retrieved from the cache. + + @param params dictionary of parameters + @return int + """ + key = MLCache.as_key(params) + return self.count_.get(key, 0) + + @staticmethod + def as_key(params): + """ + Converts a list of parameters into a key. + + @param params dictionary + @return key as a string + """ + if isinstance(params, str): + return params + els = [] + for k, v in sorted(params.items()): + if isinstance(v, (int, float, str)): + sv = str(v) + elif isinstance(v, tuple): + if not all(map(lambda e: isinstance(e, (int, float, str)), v)): + raise TypeError( + "Unable to create a key with value '{0}':{1}".format(k, v)) + return str(v) + elif isinstance(v, numpy.ndarray): + # id(v) may have been better but + # it does not play well with joblib. + sv = hash(v.tostring()) + elif v is None: + sv = "" + else: + raise TypeError( + "Unable to create a key with value '{0}':{1}".format(k, v)) + els.append((k, sv)) + return str(els) + + def __len__(self): + """ + Returns the number of cached items. + """ + return len(self.cached) + + def items(self): + """ + Enumerates all cached items. + """ + for item in self.cached.items(): + yield item + + def keys(self): + """ + Enumerates all cached keys. + """ + for k in self.cached.keys(): # pylint: disable=C0201 + yield k + + @staticmethod + def create_cache(name): + """ + Creates a new cache. + + @param name name + @return created cache + """ + global _caches # pylint: disable=W0603 + if name in _caches: + raise RuntimeError("cache '{0}' already exists.".format(name)) + + cache = MLCache(name) + _caches[name] = cache + return cache + + @staticmethod + def remove_cache(name): + """ + Removes a cache with a given name. + + @param name name + """ + global _caches # pylint: disable=W0603 + del _caches[name] + + @staticmethod + def get_cache(name): + """ + Gets a cache with a given name. + + @param name name + @return created cache + """ + global _caches # pylint: disable=W0603 + return _caches[name] + + @staticmethod + def has_cache(name): + """ + Tells if cache *name* is present. + + @param name name + @return boolean + """ + global _caches # pylint: disable=W0603 + return name in _caches diff --git a/mlinsights/mlbatch/pipeline_cache.py b/mlinsights/mlbatch/pipeline_cache.py new file mode 100644 index 00000000..fbaa8518 --- /dev/null +++ b/mlinsights/mlbatch/pipeline_cache.py @@ -0,0 +1,94 @@ +""" +@file +@brief Caches training. +""" +from sklearn.base import clone +from sklearn.pipeline import Pipeline, _fit_transform_one +from sklearn.utils import _print_elapsed_time +from .cache_model import MLCache + + +class PipelineCache(Pipeline): + """ + Same as :epkg:`sklearn:pipeline:Pipeline` but it can + skip training if it detects a step was already trained + the model was already trained accross + even in a different pipeline. + + Parameters + ---------- + + steps : list + List of (name, transform) tuples (implementing fit/transform) that are + chained, in the order in which they are chained, with the last object + an estimator. + cache_name : name of the cache, if None, a new name is created + verbose : boolean, optional + If True, the time elapsed while fitting each step will be printed as it + is completed. + + Attributes + ---------- + named_steps : bunch object, a dictionary with attribute access + Read-only attribute to access any step parameter by user given name. + Keys are step names and values are steps parameters. + """ + + def __init__(self, steps, cache_name=None, verbose=False): + Pipeline.__init__(self, steps, memory=None, verbose=verbose) + if cache_name is None: + cache_name = "Pipeline%d" % id(self) + self.cache_name = cache_name + + def _get_fit_params_steps(self, fit_params): + fit_params_steps = {name: {} for name, step in self.steps + if step is not None} + + for pname, pval in fit_params.items(): + if '__' not in pname: + raise ValueError( + "Pipeline.fit does not accept the {} parameter. " + "You can pass parameters to specific steps of your " + "pipeline using the stepname__parameter format, e.g. " + "`Pipeline.fit(X, y, logisticregression__sample_weight" + "=sample_weight)`.".format(pname)) + step, param = pname.split('__', 1) + fit_params_steps[step][param] = pval + return fit_params_steps + + def _fit(self, X, y=None, **fit_params): + + self.steps = list(self.steps) + self._validate_steps() + fit_params_steps = self._get_fit_params_steps(fit_params) + if not MLCache.has_cache(self.cache_name): + self.cache_ = MLCache.create_cache(self.cache_name) + else: + self.cache_ = MLCache.get_cache(self.cache_name) + Xt = X + for (step_idx, name, transformer) in self._iter(with_final=False, filter_passthrough=False): + if (transformer is None or transformer == 'passthrough'): + with _print_elapsed_time('Pipeline', self._log_message(step_idx)): + continue + + params = transformer.get_params() + params['__class__'] = transformer.__class__.__name__ + params['X'] = Xt + params['y'] = y + cached = self.cache_.get(params) + if cached is None: + cloned_transformer = clone(transformer) + Xt, fitted_transformer = _fit_transform_one( + cloned_transformer, Xt, y, None, + message_clsname='PipelineCache', + message=self._log_message(step_idx), + **fit_params_steps[name]) + self.cache_.cache(params, fitted_transformer) + else: + fitted_transformer = cached + Xt = fitted_transformer.transform(Xt) + + self.steps[step_idx] = (name, fitted_transformer) + if self._final_estimator == 'passthrough': + return Xt, {} + return Xt, fit_params_steps[self.steps[-1][0]]