# Custom Estimators on Dask

Code examples taken from [blog post](https://www.capitalone.com/tech/machine-learning/custom-machine-learning-estimators-on-dask-and-rapids/).

## Scikit-learn example

From the scikit-learn [documentation](https://scikit-learn.org/stable/tutorial/statistical_inference/putting_together.html#pipelining) for illustration.

In [None]:
from sklearn import datasets
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV, train_test_split
import numpy as np

X_digits, y_digits = datasets.load_digits(return_X_y=True)

# Define a pipeline to search for the best combination of PCA truncation
# and classifier regularization.
pca = PCA()
# set the tolerance to a large value to make the example faster
logistic = LogisticRegression(max_iter=10000, tol=0.1)
pipe = Pipeline(steps=[('pca', pca), ('logistic', logistic)])
# Parameters of pipelines can be set using ‘__’ separated parameter names:
param_grid = {
    'pca__n_components': [5, 15, 30, 45, 64],
    'logistic__C': np.logspace(-4, 4, 4),
}

search = GridSearchCV(pipe, param_grid, n_jobs=-1)

X_train, X_test, y_train, y_test = train_test_split(X_digits, y_digits, random_state=123)

search.fit(X_train, y_train)

best = search.best_estimator_

print(f"Training set score: {best.score(X_train, y_train)}")
print(f"Test set score: {best.score(X_test, y_test)}")

## Option 1 for Customization

In [None]:
def mutate(X):
    """Mutates X"""
    # ... do something ...
    return X

pca = PCA(n_components=search.best_params_['pca__n_components'])
logistic = LogisticRegression(
    max_iter=10000, tol=0.1, C=search.best_params_['logistic__C'])

X_train, X_test, y_train, y_test = train_test_split(
    X_digits, y_digits, random_state=123)

X_train = pca.fit_transform(X_train, y_train)
X_train = mutate(X_train)
logistic = logistic.fit(X_train, y_train)

X_test = pca.transform(X_test) # <- Don't call fit again!
X_test = mutate(X_test) # <-Don’t forget to call mutate on X_test!

print(f"Training set score: {logistic.score(X_train, y_train)}")
print(f"Test set score: {logistic.score(X_test, y_test)}")

## Option 2 for Customization

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from abc import ABCMeta

class Mutate(TransformerMixin, BaseEstimator, metaclass=ABCMeta):
    def fit(self, X, y):
        return self
    
    def transform(self, X):
        """Mutates X"""
        # ... do something ...
        return X

pca = PCA(n_components=search.best_params_['pca__n_components'])
logistic = LogisticRegression(max_iter=10000, tol=0.1, C=search.best_params_['logistic__C'])

pipe = Pipeline(steps=[('pca', pca), ('mutate', Mutate()), ('logistic', logistic)])

X_train, X_test, y_train, y_test = train_test_split(X_digits, y_digits, random_state=123)

pipe.fit(X_train, y_train)
print(f"Training set score: {pipe.score(X_train, y_train)}")
print(f"Test set score: {pipe.score(X_test, y_test)}")

# Another example

In [None]:
from sklearn.base import BaseEstimator, clone
from sklearn.model_selection import train_test_split
import copy
import pandas as pd
import numpy as np
import logging

logger = logging.getLogger('exp a')

class CustomSearchCV(BaseEstimator):
    
    def __init__(self, estimator, cv, logger):
        self.estimator = estimator
        self.cv = cv
        
        self.logger = logger
        
    
    def fit(self, X, y=None, **fit_kws):
        if isinstance(X, pd.DataFrame):
            X = X.values
        # Insert more guards here!
            
        X_base, X_holdout, y_base, y_holdout = train_test_split(
            X, y, random_state=123)
        
        self.split_scores_ = []
        self.holdout_scores_ = []
        self.estimators_ = []            
        
        for train_idx, test_idx in self.cv.split(X_base, y_base):
            X_test, y_test = X_base[test_idx], y_base[test_idx]
            X_train, y_train = X_base[train_idx], y_base[train_idx]

            estimator_ = clone(self.estimator)
            estimator_.fit(X_train, y_train, **fit_kws)

            self.logger.info("... log things ...")
            self.estimators_.append(estimator_)
            self.split_scores_.append(estimator_.score(X_test, y_test))            
            self.holdout_scores_.append(
                estimator_.score(X_holdout, y_holdout))
    
        self.best_estimator_ = \
                self.estimators_[np.argmax(self.holdout_scores_)]
        return self

In [None]:
from sklearn.model_selection import RepeatedKFold
import xgboost as xgb

from sklearn.datasets import make_classification

X, y = make_classification(
    n_samples=100_000,
    n_features=100,
    weights=[0.75, 0.25],
    flip_y=0.75,
    random_state=123,
)

cv = RepeatedKFold(n_splits=2, n_repeats=2, random_state=2652124)
clf = CustomSearchCV(xgb.XGBClassifier(n_jobs=-1, eval_metric='error', use_label_encoder=False), cv, logger)

clf.fit(X, y)
clf.best_estimator_

# Scale with Dask on Coiled

In [None]:
# This creates a Dask cluster with Coiled. To reproduce this
# with coiled you'd need to
# - visit https://coiled.io to sign up for an account.
# - Do their initial setup (login, create an environment)
# - change `account="dask"` to your account.
# Alternatively, you can deploy Dask on your own using one
# of the methods at https://docs.dask.org/en/latest/setup.html.

import coiled
cluster = coiled.Cluster(n_workers=20, software="custom-estimators-env")

from dask.distributed import Client
client = Client(cluster)
client

In [None]:
from sklearn.base import BaseEstimator, clone
from sklearn.model_selection import train_test_split
import copy
from dask.base import is_dask_collection
import dask.dataframe as dd
import pandas as pd
import numpy as np
import logging

logger = logging.getLogger('exp a')

class CustomSearchCV(BaseEstimator):
    
    def __init__(self, estimator, cv, logger):
        self.estimator = estimator
        self.cv = cv
        
        self.logger = logger
        
    
    def fit(self, X, y=None, **fit_kws):
        if isinstance(X, dd.DataFrame):
            X = X.to_dask_array(lengths=True)
        elif isinstance(X, pd.DataFrame):
            X = X.values
        if is_dask_collection(X):
            print("dask")
            from dask_ml.model_selection import train_test_split
        else:
            from sklearn.model_selection import train_test_split
        # Insert more guards here!
            
        X_base, X_holdout, y_base, y_holdout = train_test_split(
            X, y, random_state=123)
        
        self.split_scores_ = []
        self.holdout_scores_ = []
        self.estimators_ = []            
        
        for train_idx, test_idx in self.cv.split(X_base, y_base):
            X_test, y_test = X_base[test_idx], y_base[test_idx]
            X_train, y_train = X_base[train_idx], y_base[train_idx]

            estimator_ = clone(self.estimator)
            estimator_.fit(X_train, y_train, **fit_kws)

            self.logger.info("... log things ...")
            self.estimators_.append(estimator_)
            self.split_scores_.append(estimator_.score(X_test, y_test))            
            self.holdout_scores_.append(
                estimator_.score(X_holdout, y_holdout))
    
        self.best_estimator_ = \
                self.estimators_[np.argmax(self.holdout_scores_)]
        return self

In [None]:
from dask_ml.model_selection import KFold
import xgboost as xgb

from dask_ml.datasets import make_classification

X, y = make_classification(
    n_samples=1_000_000,
    n_features=100,
    weights=[0.75, 0.25],
    flip_y=0.75,
    random_state=123,
    chunks=100000
)

cv = KFold(n_splits=2, random_state=2652124)
est = xgb.dask.DaskXGBClassifier(eval_metric='error', use_label_encoder=False)
est.client = client
clf = CustomSearchCV(est, cv, logger)

clf.fit(X, y)
clf.best_estimator_