In [None]:
import pandas as pd
import numpy as np
from sklearn.tree import DecisionTreeClassifier
from sklearn import tree
import re
from sklearn.cluster import KMeans

In [None]:
class TreeClassifierBinner:
    def __init__(self,
                 data: pd.DataFrame,
                 col_to_bin: str,
                 target_col: str,
                 min_samples_split: float = 0.3,
                 max_leaf_nodes: int = 10):
        self.col_to_bin = col_to_bin
        self.target_col = target_col
        self.max_leaf_nodes = max_leaf_nodes
        self.min_samples_split = min_samples_split
        self.clf = DecisionTreeClassifier(max_leaf_nodes=self.max_leaf_nodes, random_state=42)
        self.data = data
        self.x = data[[self.col_to_bin]]
        self.y = data[self.target_col]
        self.clf.fit(self.x, self.y)
        self.univariate_thresholds = self.uniVariableDecisionTreeThresholds()
    
    def plot(self):
        tree.plot_tree(self.clf)
    
    def uniVariableDecisionTreeThresholds(self):
        self.text_tree = tree.export_text(self.clf, feature_names=[self.col_to_bin])
        result = str(self.text_tree)
        result = result.replace(f"{self.col_to_bin}", "")
        result = re.sub("class: [0-9]", "", result)
        result = re.sub("[0-9., ]", "", result)
        result = re.sub(" +", ", ", result.strip()).split(", ")
        result = list(set([float(x) for x in result])) + [float("-inf"), float("+inf")]
        result.sort()
        return result
    
    def withBands(self, df):
        result = df.copy()
        result[f'{self.col_to_bin}'] = result[f'{self.col_to_bin}'].fillna(result[f'{self.col_to_bin}'].mean())
        result[f'{self.col_to_bin}_bands'] = pd.cut(result[self.col_to_bin], self.univariate_thresholds, labels=False)
        return result

    def defaultRatesByGroups(self, df):
        result = df.groupby(f'{self.col_to_bin}_bands', observed=False)[self.target_col].agg(['count', 'sum'])
        result['total_count'] = result['count'].sum()
        result['m_rate'] = result['sum']/result['count']
        return result.reset_index()

    def calculateWoe(self):
        grouped_data = self.data\
            .pipe(self.withBands)\
            .pipe(self.defaultRatesByGroups)
        grouped_data['non_defaults'] = grouped_data['count'] - grouped_data['sum']
        total_defaults = grouped_data['sum'].sum()
        total_non_defaults = grouped_data['non_defaults'].sum()
        grouped_data['perc_defaults'] = np.clip(grouped_data['sum']/ total_defaults, 0.00001, 0.99999)
        grouped_data['perc_non_defaults'] = np.clip(grouped_data['non_defaults']/ total_non_defaults, 0.00001, 0.99999)
        grouped_data['woe'] = np.log(grouped_data['perc_defaults'] / grouped_data['perc_non_defaults'])
        grouped_data['iv'] = np.sum(-grouped_data['woe'] * (grouped_data['perc_non_defaults'] - grouped_data['perc_defaults']))
        self.woe = grouped_data[[f'{self.col_to_bin}_bands', 'woe', 'iv']]
    
    def addWoeColumn(self, df, na_values=None):
        result = df.pipe(self.withBands)
        result = result.merge(
            self.woe,
            how = 'left',
            on = f'{self.col_to_bin}_bands'
        )
        result = result.drop(columns=['iv'])
        result = result.rename(columns={'woe':f'{self.col_to_bin}_woe'})
        if na_values:
            result[f'{self.col_to_bin}_woe'] = result[f'{self.col_to_bin}_woe'].fillna(na_values)
        return result
    
    
    
class categoryClassifierBinner:
    def __init__(self,
                    data: pd.DataFrame,
                    col_to_bin: str,
                    target_col: str,
                    n_splits:  int = 10,
                    n_samples: int = 10000,
                    n_groups:  int = 50):
        self.n_splits = n_splits
        self.col_to_bin = col_to_bin
        self.target_col = target_col
        self.data = data
        self.n_samples = n_samples
        self.n_groups = n_groups
        self.applyKMeans()
        self.calculateWoe()

    def applyKMeans(self):
        sample = self.data\
        .pipe(self.withDFResampled, self.n_samples, self.n_groups)\
        .pipe(self.defaultRatesByGroups, [self.col_to_bin, 'groups'])

        k_fold_sample = pd.pivot_table(sample, values='m_rate', index=self.col_to_bin, columns='groups').reset_index().set_index(self.col_to_bin)
        kmeans = KMeans(n_clusters=self.n_splits, random_state=42).fit(k_fold_sample)
        k_fold_sample['k_fold_groups'] = kmeans.predict(k_fold_sample)
        k_fold_sample = k_fold_sample.reset_index()
        self.groups_dict = k_fold_sample[[self.col_to_bin, 'k_fold_groups']].set_index(self.col_to_bin).to_dict()
        self.clusters = {}
        for group_i in k_fold_sample['k_fold_groups'].drop_duplicates():
            self.clusters[group_i] = k_fold_sample[k_fold_sample['k_fold_groups']==group_i][self.col_to_bin].to_list()
    
    def withDFResampled(self, df, n_samples, n_groups):
        sample = df.copy()
        sample[self.col_to_bin] = sample[self.col_to_bin].fillna("None")
        sample = sample[[self.col_to_bin, self.target_col]].groupby(self.col_to_bin).sample(n_samples, replace=True, random_state=42).reset_index(drop=True)
        sample['count'] = sample.groupby(self.col_to_bin)[self.target].expanding().count().reset_index()[self.target_col]
        sample['groups'] = pd.cut(sample['count'], n_groups, labels=False)
        return sample

    def withBands(self, df):
        result = df.copy()
        result[f'{self.col_to_bin}'] = result[f'{self.col_to_bin}'].fillna('None')
        result[f'{self.col_to_bin}_bands'] = result[self.col_to_bin].map(self.groups_dict['k_fold_groups'])
        return result
    
    def defaultRatesByGroups(self, df, col_list: list):
        result = df.groupby(col_list, observed=False)[self.target_col].agg(['count', 'sum'])
        result['total_count'] = result['count'].sum()
        result['m_rate'] = np.clip(result['sum']/result['count'], 0.00001, None)
        return result.reset_index()
    
    def calculateWoe(self):
        grouped_data = self.data\
            .pipe(self.withBands)\
            .pipe(self.defaultRatesByGroups, [f"{self.col_to_bin}_bands"])
        grouped_data['non_defaults'] = grouped_data['count'] - grouped_data['sum']
        total_defaults = grouped_data['sum'].sum()
        total_non_defaults = grouped_data['non_defaults'].sum()
        grouped_data['perc_defaults'] = np.clip(grouped_data['sum']/ total_defaults, 0.00001, 0.99999)
        grouped_data['perc_non_defaults'] = np.clip(grouped_data['non_defaults']/ total_non_defaults, 0.00001, 0.99999)
        grouped_data['woe'] = np.log(grouped_data['perc_defaults'] / grouped_data['perc_non_defaults'])
        grouped_data['iv'] = np.sum(-grouped_data['woe'] * (grouped_data['perc_non_defaults'] - grouped_data['perc_defaults']))
        self.woe = grouped_data[[f'{self.col_to_bin}_bands', 'woe', 'iv']]

    def addWoeColumn(self, df, na_values=None):
        result = df.pipe(self.withBands)
        result = result.merge(
            self.woe,
            how = 'left',
            on = f'{self.col_to_bin}_bands'
        )
        result = result.drop(columns=['iv'])
        result = result.rename(columns={'woe':f'{self.col_to_bin}_woe'})
        if na_values:
            result[f'{self.col_to_bin}_woe'] = result[f'{self.col_to_bin}_woe'].fillna(na_values)
        return result


class binnerSelector:
    def __init__(self,
                 data: pd.DataFrame,
                 cols_to_bin: list,
                 target_col: str,
                 iv_threshold: float = 0.02):
        self.data = data
        self.cols_to_bin = cols_to_bin
        self.cols_dict = self.data[cols_to_bin].dtypes.to_dict()
        self.target_col = target_col
        self.removed_cols = {}
        self.selected_cols = {}
        self.runForAllColl()
        self.iv_threshold = iv_threshold
        self.removeFeaturesWithLowIV()
    
    def binCol(self, col_i, n_splits):
        if self.cols_dict[col_i] in ['float', 'int']:
            cat_binner = TreeClassifierBinner(self.data, col_i, self.target_col, max_leaf_nodes=n_splits, min_samples_split=0.01)
        else:
            cat_binner = categoryClassifierBinner(self.data, col_i, self.target_col, n_splits=n_splits, n_samples=10000, n_groups=50)
        return cat_binner

    def selectCategBestBin(self, col_i):
        results = {}
        n_split = 5
        max_splits = len(self.data[col_i].drop_duplicates())
        if max_splits == 1:
            self.removed_cols[col_i] = 'unique value'
            return None
        if max_splits < 3:
            n_split = max_splits
            results[max_splits] = self.binCol(col_i, max_splits)
            print(f"No better adjustment found: bins: {n_split}, iv: {results[n_split].woe['iv'][0]}")
            results['best'] = results[n_split]
            return results
        elif n_split >= max_splits:
            n_split = max_splits - 1
        print(f"starting binning for column: {col_i}")
        results[n_split]   = self.binCol(col_i, n_split)
        results[n_split+1] = self.binCol(col_i, n_split+1)
        results[n_split-1] = self.binCol(col_i, n_split-1)
        while results[n_split+1].woe['iv'][0] > results[n_split].woe['iv'][0]:
            if n_split + 2 > max_splits:
                n_split = n_split + 1
                print("reached maximun splits")
                break
            print(f"bins: {n_split}, iv: {results[n_split].woe['iv'][0]}")
            print(f"bins: {n_split+1}, iv: {results[n_split+1].woe['iv'][0]}")
            n_split = n_split + 1
            results[n_split+1] = self.binCol(col_i, n_split+1)
            if n_split + 3 > max_splits:
                n_split = n_split + 1
                print("reached maximun splits")
                break
        while results[n_split-1].woe['iv'][0] > results[n_split].woe['iv'][0]:
            if n_split -1 < 3:
                print("reached minimum splits")
                break
            else:
                print(f"bins: {n_split}, iv: {results[n_split].woe['iv'][0]}")
                print(f"bins: {n_split-1}, iv: {results[n_split-1].woe['iv'][0]}")
                n_split = n_split - 1
                results[n_split-1] = self.binCol(col_i, n_split-1)
        else:
            print(f"No better adjustment found: bins: {n_split}, iv: {results[n_split].woe['iv'][0]}")
            results['best'] = results[n_split]
        return results
    
    def selectMonotonicBestBin(self, col_i):
        results = {}
        n_split = 5
        max_splits = len(self.data[col_i].drop_duplicates())
        print(f"starting binning for column: {col_i}")
        if max_splits == 1:
            self.removed_cols[col_i] = 'unique value'
            return None
        results[n_split]      =  self.binCol(col_i, n_split)
        results[n_split + 1]  =  self.binCol(col_i, n_split + 1)
        results[n_split - 1]  =  self.binCol(col_i, n_split - 1)
        while results[n_split + 1].woe['iv'][0] > results[n_split].woe['iv'][0]:
            print(f'bins: {n_split}, iv: {results[n_split].woe['iv'][0]}')
            print(f'bins: {n_split+1}, iv: {results[n_split+1].woe['iv'][0]}')
            if results[n_split+1].woe['woe'].is_monotonic_decreasing or results[n_split+1].woe['woe'].is_monotonic_increasing:
                n_split = n_split + 1
                results[n_split + 1] = self.binCol(col_i, n_split + 1)
            else:
                print(f"bins: {n_split + 1} has no monotonic behaviour")
                break
        while results[n_split-1].woe['iv'][0] > results[n_split].woe['iv'][0] \
        or (results[n_split].woe['woe'].is_monotonic_increasing == False\
            and results[n_split].woe['woe'].is_monotonic_decreasing == False):
            if n_split - 1 <=2:
                print("reached minimum split")
                n_split = n_split -1
                break
            else:
                print(f"bins: {n_split}, iv: {results[n_split].woe['iv'][0]}")
                print(f"bins: {n_split-1}, iv: {results[n_split-1].woe['iv'][0]}")
                n_split = n_split-1
                results[n_split-1] = self.binCol(col_i, n_split-1)
        
        print(f"No better adjustment found: bins: {n_split}, iv: {results[n_split].woe['iv'][0]}")
        results['best'] = results[n_split]
        return results
    
    def runForAllCols(self):
        self.results_per_column = {}
        for col_i in self.cols_to_bin:
            if self.cols_dict[col_i] in ['float', 'int', 'double']:
                result = self.selectMonotonicBestBin(col_i)
                if result:
                    self.results_per_column[col_i] = result
            else:
                self.results_per_column[col_i] = self.selectCategBestBin(col_i)
    
    def removeFeaturesWithLowIV(self):
        self.ivs = []
        for i, col_i in enumerate(self.results_per_column):
            self.ivs.append({'variable': col_i, 'iv': self.results_per_column[col_i]['best'].woe['iv'][0]})
            if self.results_per_column[col_i]['best'].woe['iv'][0] > self.iv_threshold:
                self.selected_cols[col_i] = self.results_per_column[col_i]['best']
            else:
                self.removed_cols[col_i] = 'low IV'
        self.ivs = pd.DataFrame(self.ivs, columns=['variable', 'iv'])
        print(f"selected cols: {len(self.selected_cols.keys())}")
        print(f"removed cols: {len(self.removed_cols.keys())}")
    
    def addWoeColumn(self, df):
        result = df.copy()
        for col_i, cat_binner_i in self.selected_cols.items():
            result = cat_binner_i.addWoeColumn(result)
        return result
    
    def removeHighlyCorrelatedFeatures(self):
        self.woe_df = self.addWoeColumn(self.data)[[f"{col_i}_woe" for col_i in self.selected_cols.keys()]]
        corr = self.woe_df.corr()
        self.corr = corr
        corr = corr.reset_index().melt(id_vars='index')
        highCorr = corr[(corr['index']!=corr['variable'])
                        &(np.abs(corr['value']) > 0.7)]\
                        .reset_index(drop=True)
        highCorr = highCorr.merge(
            self.ivs,
            how='left',
            on='variable'
        ).merge(
            self.ivs,
            how='left',
            right_on='variable',
            left_on='index',
            suffixes=['_1', '_2']
        )
        highCorr['dropped_variables'] = np.where(highCorr['iv_1'] < highCorr['iv_2'], highCorr['variable_1'], highCorr['variable_2'])
        self.highCorr = highCorr
        dropped_cols = highCorr[::2]['dropped_variables'].drop_duplicates()
        for col_i in dropped_cols:
            self.removed_cols[col_i] = "high correl"
            del self.selected_cols[col_i]
        print(f"removed {len(dropped_cols)} cols due to high corr")
        print(f"selected cols: {len(self.selected_cols)}")
        