Skip to content

Fixes #55, explore caching for scikit-learn pipeline #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,5 @@ mlinsights/mlmodel/direct*.c
mlinsights/mlmodel/_*.c
_unittests/unittests.out
_doc/notebooks/explore/simages/*
_unittests/ut_mlbatch/cache__2/

104 changes: 104 additions & 0 deletions _unittests/ut_mlbatch/test_pipeline_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
"""
@brief test log(time=2s)
"""
import unittest
from sklearn.datasets import make_classification
from sklearn.decomposition import PCA, TruncatedSVD as SVD
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from pyquickhelper.pycode import ExtTestCase
from mlinsights.mlbatch.pipeline_cache import PipelineCache
from mlinsights.mlbatch.cache_model import MLCache


class TestPipelineCache(ExtTestCase):

def test_make_classification(self):
X, y = make_classification(random_state=42)
pipe = PipelineCache([('pca', PCA(2)),
('lr', LogisticRegression())],
'cache__')
pipe.fit(X, y)
cache = MLCache.get_cache('cache__')
self.assertEqual(len(cache), 1)
key = list(cache.keys())[0]
self.assertIn("[('X',", key)
self.assertIn("('copy', 'True')", key)
MLCache.remove_cache('cache__')

def test_grid_search(self):
X, y = make_classification(random_state=42)
param_grid = {'pca__n_components': [2, 3],
'pca__whiten': [True, False],
'lr__fit_intercept': [True, False]}
pipe = Pipeline([('pca', PCA(2)),
('lr', LogisticRegression())])
grid0 = GridSearchCV(pipe, param_grid, error_score='raise')
grid0.fit(X, y)

pipe = PipelineCache([('pca', PCA(2)),
('lr', LogisticRegression())],
'cache__2')
grid = GridSearchCV(pipe, param_grid, error_score='raise')

grid.fit(X, y)
cache = MLCache.get_cache('cache__2')
self.assertEqual(len(cache), 13)
key = list(cache.keys())[0]
self.assertIn("[('X',", key)
self.assertIn("('copy', 'True')", key)
MLCache.remove_cache('cache__2')
self.assertEqual(grid0.best_params_, grid.best_params_)

def test_grid_search_1(self):
X, y = make_classification(random_state=42)
param_grid = {'pca__n_components': [2, 3],
'pca__whiten': [True, False],
'lr__fit_intercept': [True, False]}
pipe = Pipeline([('pca', PCA(2)),
('lr', LogisticRegression())])
grid0 = GridSearchCV(pipe, param_grid, error_score='raise', n_jobs=1)
grid0.fit(X, y)

pipe = PipelineCache([('pca', PCA(2)),
('lr', LogisticRegression())],
'cache__1')
grid = GridSearchCV(pipe, param_grid, error_score='raise', n_jobs=1)

grid.fit(X, y)
cache = MLCache.get_cache('cache__1')
self.assertEqual(len(cache), 13)
key = list(cache.keys())[0]
self.assertIn("[('X',", key)
self.assertIn("('copy', 'True')", key)
MLCache.remove_cache('cache__1')
self.assertEqual(grid0.best_params_, grid.best_params_)

def test_grid_search_model(self):
X, y = make_classification(random_state=42)
param_grid = [{'pca': [PCA(2)], 'lr__fit_intercept': [False, True]},
{'pca': [SVD(2)], 'lr__fit_intercept': [False, True]}]
pipe = Pipeline([('pca', 'passthrough'),
('lr', LogisticRegression())])
grid0 = GridSearchCV(pipe, param_grid, error_score='raise')
grid0.fit(X, y)

pipe = PipelineCache([('pca', 'passthrough'),
('lr', LogisticRegression())],
'cache__3')
grid = GridSearchCV(pipe, param_grid, error_score='raise')

grid.fit(X, y)
cache = MLCache.get_cache('cache__3')
self.assertEqual(len(cache), 7)
key = list(cache.keys())[0]
self.assertIn("[('X',", key)
self.assertIn("('copy', 'True')", key)
MLCache.remove_cache('cache__3')
self.assertEqual(grid0.best_params_, grid.best_params_)


if __name__ == "__main__":
unittest.main()
7 changes: 7 additions & 0 deletions mlinsights/mlbatch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
@file
@brief Shortcuts to *mlbatch*.
"""

from .cache_model import MLCache
from .pipeline_cache import PipelineCache
159 changes: 159 additions & 0 deletions mlinsights/mlbatch/cache_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""
@file
@brief Caches to cache training.
"""
import numpy

_caches = {}


class MLCache:
"""
Implements a cache to reduce the number of trainings
a grid search has to do.
"""

def __init__(self, name):
"""
@param name name of the cache
"""
self.name = name
self.cached = {}
self.count_ = {}

def cache(self, params, value):
"""
Caches one object.

@param params dictionary of parameters
@param value value to cache
"""
key = MLCache.as_key(params)
if key in self.cached:
raise KeyError("Key {0} already exists".format(params))
self.cached[key] = value
self.count_[key] = 0

def get(self, params, default=None):
"""
Retrieves an element from the cache.

@param params dictionary of parameters
@param default if not found
@return value or None if it does not exists
"""
key = MLCache.as_key(params)
res = self.cached.get(key, default)
if res != default:
self.count_[key] += 1
return res

def count(self, params):
"""
Retrieves the number of times
an elements was retrieved from the cache.

@param params dictionary of parameters
@return int
"""
key = MLCache.as_key(params)
return self.count_.get(key, 0)

@staticmethod
def as_key(params):
"""
Converts a list of parameters into a key.

@param params dictionary
@return key as a string
"""
if isinstance(params, str):
return params
els = []
for k, v in sorted(params.items()):
if isinstance(v, (int, float, str)):
sv = str(v)
elif isinstance(v, tuple):
if not all(map(lambda e: isinstance(e, (int, float, str)), v)):
raise TypeError(
"Unable to create a key with value '{0}':{1}".format(k, v))
return str(v)
elif isinstance(v, numpy.ndarray):
# id(v) may have been better but
# it does not play well with joblib.
sv = hash(v.tostring())
elif v is None:
sv = ""
else:
raise TypeError(
"Unable to create a key with value '{0}':{1}".format(k, v))
els.append((k, sv))
return str(els)

def __len__(self):
"""
Returns the number of cached items.
"""
return len(self.cached)

def items(self):
"""
Enumerates all cached items.
"""
for item in self.cached.items():
yield item

def keys(self):
"""
Enumerates all cached keys.
"""
for k in self.cached.keys(): # pylint: disable=C0201
yield k

@staticmethod
def create_cache(name):
"""
Creates a new cache.

@param name name
@return created cache
"""
global _caches # pylint: disable=W0603
if name in _caches:
raise RuntimeError("cache '{0}' already exists.".format(name))

cache = MLCache(name)
_caches[name] = cache
return cache

@staticmethod
def remove_cache(name):
"""
Removes a cache with a given name.

@param name name
"""
global _caches # pylint: disable=W0603
del _caches[name]

@staticmethod
def get_cache(name):
"""
Gets a cache with a given name.

@param name name
@return created cache
"""
global _caches # pylint: disable=W0603
return _caches[name]

@staticmethod
def has_cache(name):
"""
Tells if cache *name* is present.

@param name name
@return boolean
"""
global _caches # pylint: disable=W0603
return name in _caches
94 changes: 94 additions & 0 deletions mlinsights/mlbatch/pipeline_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""
@file
@brief Caches training.
"""
from sklearn.base import clone
from sklearn.pipeline import Pipeline, _fit_transform_one
from sklearn.utils import _print_elapsed_time
from .cache_model import MLCache


class PipelineCache(Pipeline):
"""
Same as :epkg:`sklearn:pipeline:Pipeline` but it can
skip training if it detects a step was already trained
the model was already trained accross
even in a different pipeline.

Parameters
----------

steps : list
List of (name, transform) tuples (implementing fit/transform) that are
chained, in the order in which they are chained, with the last object
an estimator.
cache_name : name of the cache, if None, a new name is created
verbose : boolean, optional
If True, the time elapsed while fitting each step will be printed as it
is completed.

Attributes
----------
named_steps : bunch object, a dictionary with attribute access
Read-only attribute to access any step parameter by user given name.
Keys are step names and values are steps parameters.
"""

def __init__(self, steps, cache_name=None, verbose=False):
Pipeline.__init__(self, steps, memory=None, verbose=verbose)
if cache_name is None:
cache_name = "Pipeline%d" % id(self)
self.cache_name = cache_name

def _get_fit_params_steps(self, fit_params):
fit_params_steps = {name: {} for name, step in self.steps
if step is not None}

for pname, pval in fit_params.items():
if '__' not in pname:
raise ValueError(
"Pipeline.fit does not accept the {} parameter. "
"You can pass parameters to specific steps of your "
"pipeline using the stepname__parameter format, e.g. "
"`Pipeline.fit(X, y, logisticregression__sample_weight"
"=sample_weight)`.".format(pname))
step, param = pname.split('__', 1)
fit_params_steps[step][param] = pval
return fit_params_steps

def _fit(self, X, y=None, **fit_params):

self.steps = list(self.steps)
self._validate_steps()
fit_params_steps = self._get_fit_params_steps(fit_params)
if not MLCache.has_cache(self.cache_name):
self.cache_ = MLCache.create_cache(self.cache_name)
else:
self.cache_ = MLCache.get_cache(self.cache_name)
Xt = X
for (step_idx, name, transformer) in self._iter(with_final=False, filter_passthrough=False):
if (transformer is None or transformer == 'passthrough'):
with _print_elapsed_time('Pipeline', self._log_message(step_idx)):
continue

params = transformer.get_params()
params['__class__'] = transformer.__class__.__name__
params['X'] = Xt
params['y'] = y
cached = self.cache_.get(params)
if cached is None:
cloned_transformer = clone(transformer)
Xt, fitted_transformer = _fit_transform_one(
cloned_transformer, Xt, y, None,
message_clsname='PipelineCache',
message=self._log_message(step_idx),
**fit_params_steps[name])
self.cache_.cache(params, fitted_transformer)
else:
fitted_transformer = cached
Xt = fitted_transformer.transform(Xt)

self.steps[step_idx] = (name, fitted_transformer)
if self._final_estimator == 'passthrough':
return Xt, {}
return Xt, fit_params_steps[self.steps[-1][0]]