# Universal Custom Estimators

## Patterns for Adding Custom Functionality

#### Basic `scikit-learn` pipeline

In [None]:
from sklearn import datasets
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV, train_test_split
import numpy as np

X_digits, y_digits = datasets.load_digits(return_X_y=True)

# Define a pipeline to search for the best combination of PCA truncation
# and classifier regularization.
pca = PCA()
# set the tolerance to a large value to make the example faster
logistic = LogisticRegression(max_iter=10000, tol=0.1)
pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)])
# Parameters of pipelines can be set using ‘__’ separated parameter names:
param_grid = {
    'pca__n_components': [5, 15, 30, 45, 64],
    'logistic__C': np.logspace(-4, 4, 4),
}

search = GridSearchCV(pipe, param_grid, n_jobs=-1)

X_train, X_test, y_train, y_test = train_test_split(X_digits, y_digits, random_state=123)

search.fit(X_train, y_train)

best = search.best_estimator_

print(f"Training set score: {best.score(X_train, y_train)}")
print(f"Test set score: {best.score(X_test, y_test)}")

#### Let's make a change to our data...

In [None]:
def mutate(X):
    """Mutates X"""
    # ... do something ...
    return X

pca = PCA(n_components=search.best_params_['pca__n_components'])
logistic = LogisticRegression(
    max_iter=10000, tol=0.1, C=search.best_params_['logistic__C'])

X_train, X_test, y_train, y_test = train_test_split(
    X_digits, y_digits, random_state=123)

X_train = pca.fit_transform(X_train, y_train)
X_train = mutate(X_train)
logistic = logistic.fit(X_train, y_train)

X_test = pca.transform(X_test) # <- Don't call fit again!
X_test = mutate(X_test) # <-Don’t forget to call mutate on X_test!

print(f"Training set score: {logistic.score(X_train, y_train)}")
print(f"Test set score: {logistic.score(X_test, y_test)}")

#### Hm, maybe there is a better way

In [None]:
class PcaLogRegBlock:
    def __init__(self, filepath, n_components, C):
        self.filepath = filepath
        self.n_components = n_components
        self.C = C
    def load_data(self):
        self.df = pd.read_csv(self.filepath)
    def mutate(self, X):
        """Fetches data from alternate source and merges into X.
        """
        # ... mutate X ...
        return X
    def build_model(self):
        X, y = self.df[self.features], self.df[self.target]
        pca = PCA(n_components=self.n_components)
        logistic = LogisticRegression(max_iter=10000, tol=0.1, C=self.C)
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, random_state=123)
        X_train = pca.fit_transform(X_train, y_train)
        X_train = self.mutate(X_train)
        return logistic.fit(X_train, y_train)
    def run(self):
        self.load_data()
        model = self.build_model()
        self.save_model(model)

In [None]:
class PcaLogRegGridSearchBlock:
    def __init__(self, filepath):
        self.filepath = filepath
        self.df = pd.read_csv(self.filepath)
        self.best_params_ = {}
    def run(self):
        X, y = self.df[self.features], self.df[self.target]
        param_grid = {
            'n_components': [5, 15, 30, 45, 64],
        }
        search = GridSearchCV(PCA(), param_grid, n_jobs=-1)
        search.fit(X, y)
        self.best_params_.update(search.best_params_)
        param_grid = {
            'C': np.logspace(-4, 4, 4),
        }
        search = GridSearchCV(LogisticRegression(max_iter=10000, tol=0.1), param_grid, n_jobs=-1)
        search.fit(X, y)
        self.best_params_.update(search.best_params_)

In [None]:
def driver():
    gs = PcaLogRegGridSearchBlock('/path/to/file', ...)
    gs.run()
    # Use best results from grid search to retrain
    pca_logreg = PcaLogRegBlock('/path/to/file', gs.best_params_['n_components'], gs.best_params_['C'])
    pca_logreg.run()

#### Our Best Option for adding code

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from abc import ABCMeta

class Mutate(TransformerMixin, BaseEstimator, metaclass=ABCMeta):
    def fit(self, X, y):
        return self
    
    def transform(self, X):
        """Mutates X"""
        # ... do something ...
        return X

pca = PCA(n_components=search.best_params_['pca__n_components'])
logistic = LogisticRegression(max_iter=10000, tol=0.1, C=search.best_params_['logistic__C'])

pipe = Pipeline(steps=[('pca', pca), ('mutate', Mutate()), ('logistic', logistic)])

X_train, X_test, y_train, y_test = train_test_split(X_digits, y_digits, random_state=123)

pipe.fit(X_train, y_train)
print(f"Training set score: {pipe.score(X_train, y_train)}")
print(f"Test set score: {pipe.score(X_test, y_test)}")

## Lessons Learned and Best Practices

- Follow the sci-kit learn API: https://scikit-learn.org/stable/developers/develop.html#rolling-your-own-estimator
- Favor array-like interface data structures internally in estimators. These work better across numpy, Dask, and Rapids that dataframe collections. If you need dataframe operations (like groupby, etc.) consider moving those to a transformer and converting to an array-like for processing.
- All attributes learned during .fit should be concrete, i.e. they should not be dask collections.
- To the extent possible, transformers should support
        numpy.ndarray
        pandas.DataFrame
        dask.Array
        dask.DataFrame
- If possible, transformers should accept a columns keyword to limit the transformation to just those columns, while passing through other columns untouched. inverse_transform should behave similarly (ignoring other columns) so that inverse_transform(transform(X)) equals X.
- Methods returning arrays (like .transform, .predict), should return the same type as the input. So if a dask.array is passed in, a dask.array with the same chunks should be returned.


last 4 from: https://ml.dask.org/contributing.html#conventions

## Using `dask-ml` to Train a Model

In [None]:
from sklearn.datasets import make_classification
import pandas as pd

X_array, y_array = make_classification(
    n_samples=10_000,
    n_features=50,
    random_state=123,
)

X = pd.DataFrame(X_array, columns = [f"var{i}" for i in range(0,50)])
y = pd.Series(y_array)

In [None]:
from dask_ml.datasets import make_classification_df

X_dask, y_dask = make_classification_df(
    n_samples=10_000,
    n_features=50,
    random_state=123,
    chunks=1000,
)

### Transformer: Add Feature

In [None]:
import numpy as np
new_feature = pd.Series(np.random.randn(10_000))

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from abc import ABCMeta

class AddFeature(TransformerMixin, BaseEstimator, metaclass=ABCMeta):
    def fit(self, X, y):
        return self
    
    def transform(self, X):
        """Add Feature to X"""
        # your code here
        
        return X

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from abc import ABCMeta
import dask.dataframe as dd

class AddFeature(TransformerMixin, BaseEstimator, metaclass=ABCMeta):
    def fit(self, X, y):
        return self
    
    def transform(self, X):
        """Add Feature to X"""
        if isinstance(X, pd.DataFrame):
            X['var50'] = new_feature
        elif isinstance(X, dd.DataFrame):
            X['var50'] = dd.from_pandas(new_feature, npartitions=X.npartitions)
        
        return X

In [None]:
AF = AddFeature()
X = AF.transform(X)
X

In [None]:
AF = AddFeature()
X_dask = AF.transform(X_dask)
X_dask

### Custom Estimator

In [None]:
from sklearn.base import BaseEstimator
import logging

logger = logging.getLogger('exp a')

class CustomEstimator(BaseEstimator):    
    def __init__(self, estimator, logger):
        self.estimator = estimator        
        self.logger = logger
    
    def fit(self, X, y=None, **fit_kws):
        self.estimator.fit(X, y)        
        self.logger.info("... log things ...")
        return self

In [None]:
import xgboost as xgb

clf = CustomEstimator(xgb.XGBClassifier(n_jobs=-1, eval_metric='error', use_label_encoder=False), logger)
clf.fit(X, y)

In [None]:
clf.estimator.feature_importances_

### Custom Estimator with Dask

In [None]:
from dask.distributed import Client, progress

client = Client()
client

In [None]:
# fit your custom estimator with Dask

In [None]:
import xgboost.dask as dxgb

clf = CustomEstimator(dxgb.DaskXGBClassifier(n_jobs=-1, eval_metric='error', use_label_encoder=False), logger)
clf.fit(X_dask, y_dask)

In [None]:
clf.estimator.feature_importances_

### Pipeline

In [None]:
# Remove the column we added from the previous transformation
X = X.drop(columns=['var50'])
X_dask = X_dask.drop(columns=['var50'])

In [None]:
# confirm we've removed that column
X_dask

In [None]:
# confirm we've removed that column
X

In [None]:
from sklearn.pipeline import Pipeline

custom_estimator = CustomEstimator(xgb.XGBClassifier(n_jobs=-1, eval_metric='error', use_label_encoder=False), logger)
pipe = Pipeline(steps=[('add_feature', AddFeature()), ('custom_estimator', custom_estimator)])

pipe.fit(X, y)

In [None]:
# confirm we added the feature
X

In [None]:
# confirm we have a trained model
custom_estimator.estimator.feature_importances_

In [None]:
from sklearn.pipeline import Pipeline

# your dask pipeline here

In [None]:
from sklearn.pipeline import Pipeline
import xgboost.dask as dxgb

custom_estimator_dask = CustomEstimator(dxgb.DaskXGBClassifier(n_jobs=-1, eval_metric='error', use_label_encoder=False), logger)
pipe_dask = Pipeline(steps=[('add_feature', AddFeature()), ('custom_estimator_dask', custom_estimator_dask)])

pipe_dask.fit(X_dask, y_dask)

In [None]:
# confirm we added the feature
X_dask

In [None]:
# confirm we have a trained model
custom_estimator_dask.estimator.feature_importances_

### Cross Validator

This is an adaptation of the code in this article: https://towardsdatascience.com/time-based-cross-validation-d259b13d42b8

In [None]:
import pandas as pd
import datetime
from datetime import datetime as dt
from dateutil.relativedelta import *
from sklearn.model_selection import BaseCrossValidator

class TimeBasedCV(BaseCrossValidator):
    '''
    Parameters 
    ----------
    train_period: int
        number of time units to include in each train set
        default is 30
    test_period: int
        number of time units to include in each test set
        default is 7
    freq: string
        frequency of input parameters. possible values are: days, months, years, weeks, hours, minutes, seconds
        possible values designed to be used by dateutil.relativedelta class
        deafault is days
    '''
    
    def __init__(self, train_period=30, test_period=7, freq='days'):
        self.train_period = train_period
        self.test_period = test_period
        self.freq = freq
        
        
    def split(self, data, validation_split_date=None, date_column='date', gap=0):
        '''
        Generate indices to split data into training and test set
        
        Parameters 
        ----------
        data: pandas DataFrame
            your data, contain one column for the record date 
        validation_split_date: datetime.date()
            first date to perform the splitting on.
            if not provided will set to be the minimum date in the data after the first training set
        date_column: string, deafult='record_date'
            date of each record
        gap: int, default=0
            for cases the test set does not come right after the train set,
            *gap* days are left between train and test sets
        
        Returns 
        -------
        train_index ,test_index: 
            list of tuples (train index, test index) similar to sklearn model selection
        '''
        
        # check that date_column exist in the data:
        try:
            data[date_column]
        except:
            raise KeyError(date_column)
            
        train_indices_list = []
        test_indices_list = []

        if validation_split_date==None:
            validation_split_date = data[date_column].min().date() + relativedelta(**{self.freq: self.train_period})
        
        start_train = validation_split_date - relativedelta(**{self.freq: self.train_period})
        end_train = start_train + relativedelta(**{self.freq: self.train_period})
        start_test = end_train + relativedelta(**{self.freq: gap})
        end_test = start_test + relativedelta(**{self.freq: self.test_period})

        while end_test < data[date_column].max().date():
            # train indices:
            cur_train_indices = list(data[(data[date_column].dt.date>=start_train) & 
                                     (data[date_column].dt.date<end_train)].index)

            # test indices:
            cur_test_indices = list(data[(data[date_column].dt.date>=start_test) &
                                    (data[date_column].dt.date<end_test)].index)
            
            print("Train period:",start_train,"-" , end_train, ", Test period", start_test, "-", end_test,
                  "# train records", len(cur_train_indices), ", # test records", len(cur_test_indices))

            train_indices_list.append(cur_train_indices)
            test_indices_list.append(cur_test_indices)

            # update dates:
            
            start_train = start_train + relativedelta(**{self.freq: self.test_period})
            end_train = start_train + relativedelta(**{self.freq: self.train_period})
            start_test = end_train + relativedelta(**{self.freq: gap})
            end_test = start_test + relativedelta(**{self.freq: self.test_period})

        # mimic sklearn output  
        index_output = [(train,test) for train,test in zip(train_indices_list,test_indices_list)]

        self.n_splits = len(index_output)
        
        return index_output
    
    
    def get_n_splits(self):
        """Returns the number of splitting iterations in the cross-validator
        Returns
        -------
        n_splits : int
            Returns the number of splitting iterations in the cross-validator.
        """
        return self.n_splits

### Grid Search

In [None]:
from sklearn.datasets import make_classification
from dask_ml.datasets import random_date
from datetime import date

import numpy as np

X, y = make_classification(
    n_samples=1_000,
    n_features=5,
    random_state=123,
)
dates = (date(2020, 1, 1), date(2021, 1, 1))
columns = ["var" + str(i) for i in range(np.shape(X)[1])]
y_series = pd.Series(y, name='target')
X_df = pd.DataFrame(X, columns=columns)
X_df['date'] = [random_date(*dates) for _ in range(len(X_df))]
X_df['date'] = X_df['date'].astype('datetime64')
X_df

In [None]:
tscv = TimeBasedCV(train_period=90, test_period=30)
index_output = tscv.split(X_df, validation_split_date=date(2020,6,1))

In [None]:
from sklearn.model_selection import GridSearchCV

params = {
    'custom_estimator__estimator__min_child_weight': [1, 5, 10],
    # Leaving these out to run faster for testing
    # 'gamma': [0.5, 1, 1.5, 2, 5],
    # 'subsample': [0.6, 0.8, 1.0],
    # 'colsample_bytree': [0.6, 0.8, 1.0],
    # 'max_depth': [3, 4, 5]
}

grid_search = GridSearchCV(
    estimator = pipe,
    n_jobs = -1,
    param_grid = params,
    cv = index_output,
    scoring='roc_auc',
    verbose=3
)

In [None]:
%%time
grid_search.fit(X_df.drop('date', axis=1), y_series)

In [None]:
grid_search.best_params_

### Dask Grid Search

In [None]:
from dask_ml.datasets import random_date
from datetime import date

import dask.dataframe as dd
import pandas as pd
from dask_ml.datasets import make_classification_df

import numpy as np

X, y = make_classification_df(
    n_samples=1_00,
    n_features=5,
    random_state=123,
    chunks=1000
)

columns = ["var" + str(i) for i in range(np.shape(X)[1])]
dates_col = pd.Series([random_date(*(date(2020, 1, 1), date(2021, 1, 1))) for _ in range(len(X))])
X['date'] = dates_col
X['date'] = X['date'].astype('datetime64')
X

In [None]:
tscv = TimeBasedCV(train_period=90, test_period=30)
index_output = tscv.split(X, validation_split_date=date(2020,6,1))

In [None]:
from dask_ml.model_selection import GridSearchCV

params = {
    'custom_estimator_dask__estimator__min_child_weight': [1, 5, 10],
    # Leaving these out to run faster for testing
    # 'gamma': [0.5, 1, 1.5, 2, 5],
    # 'subsample': [0.6, 0.8, 1.0],
    # 'colsample_bytree': [0.6, 0.8, 1.0],
    # 'max_depth': [3, 4, 5]
}

grid_search = GridSearchCV(
    estimator = pipe_dask,
    n_jobs = -1,
    param_grid = params,
    cv = index_output,
    scoring='roc_auc',
    verbose=3
)

In [None]:
%%time
grid_search.fit(X.drop('date', axis=1), y_series)