In [1]:
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
import category_encoders as ce
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder



#Scaling data
from sklearn.preprocessing import RobustScaler, Normalizer

#Regressor
from xgboost import XGBRegressor

The get_params() function looks at the init arguments to figure out what the class parameters are and then assumes that they're the same as the internal variable names. https://stackoverflow.com/questions/61394346/why-is-my-sklearn-custom-transformer-not-saving-an-attribute-when-used-in-a-colu

In [2]:
class CatBoostCustom(BaseEstimator, TransformerMixin):
    '''
    Given a list of categorical variables applies CatBoostEncoder on them
    in a pipeline. 
    '''

    def __init__(self, cat):
        self.cat = cat  
        self.cbe = ce.CatBoostEncoder(cols=cat, handle_unknown='value')
        

    def transform(self, X, y=None):
        X[self.cat] = self.cbe.transform(X[self.cat])
        return X

    def fit(self, X, y):
        self.cbe.fit(X[self.cat], y)
        return self 

In [3]:
class OneHotCustom(BaseEstimator, TransformerMixin):
    '''
    Given a list of variables applies OneHotEncoder on them
    in a pipeline. 
    '''

    def __init__(self, cat):
        self.cat = cat  
        self.ohe = OneHotEncoder(handle_unknown='ignore', sparse=False)
        

    def transform(self, X, y=None):
        X=pd.concat([X.drop(columns=self.cat), pd.DataFrame(self.ohe.transform(X[self.cat]))], axis=1)
        return X

    def fit(self, X, y):
        self.ohe.fit(X[self.cat], y)
#         print(self.ohe.get_feature_names())
        return self 


In [4]:
class CustomImputer(BaseEstimator, TransformerMixin):
    '''
    Simple Imputer that conserves column names. To be able to impute missing values before encoding and creating bivariates
    '''

    def __init__(self, strategy='mean', fill_value=-1):
        self.strategy = strategy
        self.fill_value = fill_value
        self.imp = SimpleImputer(strategy=self.strategy, fill_value=self.fill_value)
        

    def transform(self, X, y=None):
        colnames = X.columns        
        X = pd.DataFrame(self.imp.transform(X))
        X.columns = colnames
        return X

    def fit(self, X, y=None):
        self.imp.fit(X, y)
        return self 

In [5]:
class CustomScaler(BaseEstimator, TransformerMixin):
    '''
    Robust scaler that conserves column names. To be able to scale before encoding and creating bivariates
    '''

    def __init__(self, rng):
        self.rng = rng
        self.scaler = RobustScaler(quantile_range=self.rng)
        

    def transform(self, X, y=None):
        colnames = X.columns        
        X = pd.DataFrame(self.scaler.transform(X))
        X.columns = colnames
        return X

    def fit(self, X, y=None):
        self.scaler.fit(X, y)
        return self 
    

In [6]:
class CatBivariates(BaseEstimator, TransformerMixin):
    '''
    Given a dictionary of bivariate feature names as keys, and tuples of two (string) feature names as values,
    creates the bivariate feature as a result of concatenating the two feature strings.
    By default it drops the original categorical features
    '''
    def __init__(self, features={}, drop=True, dismiss=[]):
        self.features = features
        self.drop = drop
        self.dismiss = dismiss

    def transform(self, X, y=None):
        dropall= True if (self.drop & (len(self.dismiss)==0)) else False
        
        for name, fts in self.features.items():
            X[name] = X[fts[0]].astype(str) + '-' + X[fts[1]].astype(str)
            if dropall:
                self.dismiss.extend([fts[0], fts[1]])
        X.drop(columns=self.dismiss, inplace=True)
        return X

    def fit(self, X, y=None):
        return self 
    
    def getDismissed(self):
        return self.dismiss
    
    def getVars(self):
        return self.features 

In [7]:
class NumBivariates(BaseEstimator, TransformerMixin):
    '''
    Creates bivariates named as the feature dictionary keys, according to method:
    mulBiv:
    dictionary values should be two features of the dataFrame, returns the result of multiplying them
    
    sumBiv:
    dictionary values should be a list of features, returns the result of adding them
    
    difBiv:
    dictionary values should be two features of the dataFrame, returns the difference.
    
    boolBiv:
    dictionary values should be a features and a threshold, 1 if feature value is higher than threshold, 0 otherwise.
    
    binsBiv:
    dictionary values should be a feature, a list of binedges, a list of labels, applies pd.cut with those parameters.
    
    
    By default it does not drop features. If drop is true and dismiss is empty, drops all features used in mulBiv, or difBiv methods.
    There would be an error if trying to use this with other methods.
    
    If dismiss contains a list, drops all features in dismiss.
    
    If asDummies is True, returns dummies of the created bivariates.
    '''
    def __init__(self, features={}, drop=False, dismiss=[], method='mulBiv', asDummies=False):
        self.features = features
        self.drop = drop
        self.dismiss = dismiss
        self.method = method # getattr(self, method)
        self.asDummies = asDummies
    
    def transform(self, X, y=None):
        dropall= True if (self.drop & (len(self.dismiss)==0)) else False
       
        for name, fts in self.features.items():
            X[name] = getattr(self, self.method)(X, fts) 
            if self.asDummies:
                X = pd.get_dummies(X, columns=[name], drop_first=False)
            if dropall:
#                 X.drop(columns=[fts[0], fts[1]], inplace=True)
                self.dismiss.extend([fts[0], fts[1]])
        X.drop(columns=self.dismiss, inplace=True)
        return X

    def fit(self, X, y=None):
        return self 
    
    def getDismissed(self):
        return self.dismissed 
    
    def getVars(self):
        return self.features
    
    def mulBiv(self, X, fts):
        return X[fts[0]] * X[fts[1]]

    def boolBiv(self, X, fts):
        return (X[fts[0]] > fts[1]).astype(int)
    
    def sumBiv(self, X, fts):
        return X[fts].sum(axis=1)
    
    def difBiv(self, X, fts):
        return X[fts[0]] - X[fts[1]]
    
    def binsBiv(self, X, fts):
        return pd.cut(X[fts[0]], bins=fts[1], labels=fts[2]).astype(int)    

    

In [8]:
class NumBivariatesBool(BaseEstimator, TransformerMixin):
    '''
    This was included as a method in NumBivariates, I keep this for older versions
    Given a dictionary of bivariate feature names as keys, and tuples of one feature and a lower limit,
    returns a boolean variable (cast as int) as the result of dividing the feature values by the limit.
    Ex: ('Num', 15) returns 1 for every value in num strictly higher than 15, 0 otherwise.
    By default it does not drop the original features
    '''
    def __init__(self, features={}, drop=False, dismiss=[]):
        self.features = features
        self.drop = drop
        self.dismiss = dismiss
    
    def transform(self, X, y=None):
        dropall= True if (self.drop & (len(self.dismiss)==0)) else False

        for name, fts in self.features.items():
            X[name] = (X[fts[0]] > fts[1]).astype(int)
            #X = pd.concat([X,new], axis=1)
            if dropall:
#                 X.drop(columns=[fts[0]], inplace=True)
                self.dismiss.append(fts[0])
        X.drop(columns=self.dismiss, inplace=True)
        return X

    def fit(self, X, y=None):
        return self 
    
    def getDismissed(self):
        #"ALO"
        return self.dismiss 
    
    def getVars(self):
        return self.features

In [9]:
class LogRtTransformer(BaseEstimator, TransformerMixin):
    '''
    Applies log transform to features in log list
    cube root transform to features in cbrt list
    and square root for features in sqrt
    '''
#     def __init__(self):
    def __init__(self, log=[], cbrt=[], sqrt=[]):
        self.log = log
        self.cbrt = cbrt
        self.sqrt = sqrt
        
    def transform(self, X, y=None):
        for ftl in self.log:
            X[ftl] = np.log(X[ftl])
        for ftc in self.cbrt:
            X[ftc] = np.cbrt(X[ftc])
        for fts in self.sqrt:
            X[fts] = np.sqrt(X[fts])
        return X

    def fit(self, X, y=None):
        return self 
    
    def getVars(self):
        return self.log, self.cbrt
