In [1]:
import numpy as np

In [2]:
import copy

In [3]:
from sklearn.model_selection import cross_val_predict

In [4]:
from sklearn.base import BaseEstimator

In [5]:
from sklearn.model_selection import StratifiedKFold

In [6]:
from sklearn.base import ClassifierMixin, RegressorMixin

In [7]:
class StackingClassifier(BaseEstimator, ClassifierMixin):
    '''Class for stacking scikit-learn classifiers'''
    
    def __init__(self, models, est_methods):
        '''initialization:
           models --- list of lists, contains classifiers;
           est_methods --- list of lists, contains string names of prediction methods for each classifier,
                           do not use simple predict'''
        
        self._models = copy.deepcopy(models)
        self._est_methods = copy.deepcopy(est_methods)
        
        self._sizes = []
        for models_layer in self._models:
            self._sizes.append(len(models_layer))
        
    def fit(self, X, y=None, cv=5, err=0.01, random_state=np.array([None]), n_jobs=1):
        '''fit classifiers in the stack:
           X --- training data (numpy ndarray);
           y --- training target variable (numpy array);
           cv --- number of folds in StratifiedKFold;
           err --- scale in regularization noise;
           random_state --- array of seeds for pseudo random numbers generator;
           n_jobs --- number of jobs in training'''
        
        if not random_state[0] is None:
            np.random.seed(random_state[0])
        self._classes_num = np.unique(y).shape[0]
        self._features_per_layer = []
        self._features_per_layer.append(X.copy())
        
        for size in self._sizes:
            self._features_per_layer.append(np.zeros((X.shape[0], self._classes_num * size)))
        
        self._n = len(random_state)
        for layer_num, (models_layer, est_methods_layer) in enumerate(zip(self._models, self._est_methods)):
            for seed in random_state:
                skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=seed)
                for idx, (model, method) in enumerate(zip(models_layer, est_methods_layer)):
                    self._features_per_layer[layer_num + 1][:,\
                                             idx * self._classes_num: (idx + 1) * self._classes_num] +=\
                        cross_val_predict(estimator=model,\
                                          X=self._features_per_layer[layer_num],\
                                          y=y, cv=skf, n_jobs=n_jobs, method=method)    
            self._features_per_layer[layer_num + 1] /= self._n
            self._features_per_layer[layer_num + 1] +=\
            err * np.random.randn(*self._features_per_layer[layer_num + 1].shape)
            for model in models_layer:
                model.fit(X=self._features_per_layer[layer_num], y=y)
        del self._features_per_layer
        return self
    
    def predict_proba(self, X, y=None):
        '''Stacking prediction:
           X --- test data (numpy ndarray);
           y --- for compatibility, always ignored'''
        
        self._test_features_per_layer = []
        self._test_features_per_layer.append(X.copy())
        for size in self._sizes:
            self._test_features_per_layer.append(np.zeros((X.shape[0], self._classes_num * size)))
        for layer_num, (models_layer, est_methods_layer) in enumerate(zip(self._models, self._est_methods)):
            for idx, (model, method) in enumerate(zip(models_layer, est_methods_layer)):
                if method == 'decision_function':
                    self._test_features_per_layer[layer_num + 1][:,\
                                             idx * self._classes_num: (idx + 1) * self._classes_num] +=\
                        model.decision_function(self._test_features_per_layer[layer_num])
                elif method == 'predict_proba':
                    self._test_features_per_layer[layer_num + 1][:,\
                                             idx * self._classes_num: (idx + 1) * self._classes_num] +=\
                        model.predict_proba(self._test_features_per_layer[layer_num])
                elif method == 'predict_log_proba':
                    self._test_features_per_layer[layer_num + 1][:,\
                                             idx * self._classes_num: (idx + 1) * self._classes_num] +=\
                        model.predict_log_proba(self._test_features_per_layer[layer_num])
                else:
                    pass
        answer = self._test_features_per_layer[-1].copy()
        del self._test_features_per_layer
        return answer
    
    def get_models(self):
        '''get list of lists of models'''
        
        return copy.deepcopy(self._models)
    
    def get_methods(self):
        '''get list of lists of estimation methods'''
        
        return copy.deepcopy(self._est_methods)
    
    def get_shape(self):
        '''get shape of the stack'''
        
        return copy.deepcopy(self._sizes)
    
    def get_classes_num(self):
        '''get number of estimated classes'''
        
        return self._classes_num
    
    def get_num_of_splits(self):
        '''get number of splits in averaging'''
        
        return self._n

In [8]:
class StackingRegressor(BaseEstimator, RegressorMixin):
    '''Class for stacking scikit-learn regressor'''
    
    def __init__(self, models):
        '''initialization:
           models --- list of lists, contains regressors'''
        
        self._models = copy.deepcopy(models)
        
        self._sizes = []
        for models_layer in self._models:
            self._sizes.append(len(models_layer))
        
    def fit(self, X, y=None, cv=5, err=0.01, random_state=np.array([None]), n_jobs=1, mode=0):
        '''fit regressors in the stack:
           X --- training data (numpy ndarray);
           y --- training target variable (numpy ndarray (always in mode == 1));
           cv --- number of folds in StratifiedKFold;
           err --- scale in regularization noise;
           random_state --- array of seeds for pseudo random numbers generator;
           n_jobs --- number of jobs in training;
           mode --- stacking mode: 0 --- classical stacking, 1 --- own target for each model'''
        
        if not random_state[0] is None:
            np.random.seed(random_state[0])
        self._features_per_layer = []
        self._features_per_layer.append(X.copy())
        
        if mode == 0:
            if len(y.shape) == 1:
                self._dim = 1
            else:
                self._dim = y.shape[1]
            
            for size in self._sizes:
                self._features_per_layer.append(np.zeros((X.shape[0], self._dim * size)))
            
            self._n = len(random_state)
            for layer_num, models_layer in enumerate(self._models):
                for seed in random_state:
                    skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=seed)
                    for idx, model in enumerate(models_layer):
                        if self._dim > 1:
                            self._features_per_layer[layer_num + 1][:,\
                                                     idx * self._dim: (idx + 1) * self._dim] +=\
                                cross_val_predict(estimator=model,\
                                                  X=self._features_per_layer[layer_num],\
                                                  y=y, cv=skf, n_jobs=n_jobs, method='predict')
                        else:
                            self._features_per_layer[layer_num + 1][:,\
                                                     idx * self._dim: (idx + 1) * self._dim] +=\
                                cross_val_predict(estimator=model,\
                                                  X=self._features_per_layer[layer_num],\
                                                  y=y, cv=skf, n_jobs=n_jobs, method='predict')[:, np.newaxis]
                self._features_per_layer[layer_num + 1] /= self._n
                self._features_per_layer[layer_num + 1] +=\
                err * np.random.randn(*self._features_per_layer[layer_num + 1].shape)
                for model in models_layer:
                    model.fit(X=self._features_per_layer[layer_num], y=y)
        else:
            if len(y[0].shape) == 1:
                self._dim = 1
            else:
                self._dim = y[0].shape[1] // self._sizes[0]
            
            for size in self._sizes:
                self._features_per_layer.append(np.zeros((X.shape[0], self._dim * size)))
            
            self._n = len(random_state)
            for layer_num, models_layer in enumerate(self._models):
                for seed in random_state:
                    skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=seed)
                    for idx, model in enumerate(models_layer):
                        if self._dim > 1:
                            self._features_per_layer[layer_num + 1][:,\
                                                     idx * self._dim: (idx + 1) * self._dim] +=\
                                cross_val_predict(estimator=model,\
                                                  X=self._features_per_layer[layer_num],\
                                                  y=y[layer_num][:, idx * self._dim: (idx + 1) * self._dim],\
                                                  cv=skf, n_jobs=n_jobs, method='predict')
                        else:
                            self._features_per_layer[layer_num + 1][:,\
                                                     idx * self._dim: (idx + 1) * self._dim] +=\
                                cross_val_predict(estimator=model,\
                                                  X=self._features_per_layer[layer_num],\
                                                  y=y[layer_num][:,\
                                                                 idx * self._dim: (idx + 1) * self._dim].ravel(),\
                                                  cv=skf, n_jobs=n_jobs, method='predict')[:, np.newaxis]
                self._features_per_layer[layer_num + 1] /= self._n
                self._features_per_layer[layer_num + 1] +=\
                err * np.random.randn(*self._features_per_layer[layer_num + 1].shape)
                for idx, model in enumerate(models_layer):
                    if self._dim > 1:
                        model.fit(X=self._features_per_layer[layer_num],\
                                  y=y[layer_num][:, idx * self._dim: (idx + 1) * self._dim])
                    else:
                        model.fit(X=self._features_per_layer[layer_num],\
                                  y=y[layer_num][:, idx * self._dim: (idx + 1) * self._dim].ravel())
        del self._features_per_layer
        return self
    
    def predict(self, X, y=None):
        '''Stacking prediction:
           X --- test data (numpy ndarray);
           y --- for compatibility, always ignored'''
        
        self._test_features_per_layer = []
        self._test_features_per_layer.append(X.copy())
        for size in self._sizes:
            self._test_features_per_layer.append(np.zeros((X.shape[0], self._dim * size)))
        for layer_num, models_layer in enumerate(self._models):
            for idx, model in enumerate(models_layer):
                if self._dim > 1:
                    self._test_features_per_layer[layer_num + 1][:,\
                                                  idx * self._dim: (idx + 1) * self._dim] +=\
                        model.predict(self._test_features_per_layer[layer_num])
                else:
                    self._test_features_per_layer[layer_num + 1][:,\
                                                  idx * self._dim: (idx + 1) * self._dim] +=\
                        model.predict(self._test_features_per_layer[layer_num])[:, np.newaxis]
        answer = self._test_features_per_layer[-1].copy()
        del self._test_features_per_layer
        if self._dim == 1:
            answer = answer.ravel()
        return answer
    
    def get_models(self):
        '''get list of lists of models'''
        
        return copy.deepcopy(self._models)
    
    def get_shape(self):
        '''get shape of the stack'''
        
        return copy.deepcopy(self._sizes)
    
    def get_dimension(self):
        '''get dimension of target variable'''
        
        return self._dim
    
    def get_num_of_splits(self):
        '''get number of splits in averaging'''
        
        return self._n