In [1]:
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
pd.set_option('display.max_rows', 10)
pd.set_option('display.max_columns', 50)
pd.set_option('display.width', 1000)

# Загрузка Датасета на котором будут тестироваться функции

In [4]:
# save load_iris() sklearn dataset to iris
# if you'd like to check dataset type use: type(load_iris())
# if you'd like to view list of attributes use: dir(load_iris())
iris = load_iris()

# np.c_ is the numpy concatenate function
# which is used to concat iris['data'] and iris['target'] arrays 
# for pandas column argument: concat iris['feature_names'] list
# and string list (in this case one string); you can make this anything you'd like..  
# the original dataset would probably call this ['Species']
df_test = pd.DataFrame(data= np.c_[iris['data'], iris['target']],
                       columns= iris['feature_names'] + ['target'])

df_test.loc[df_test['target'] == 0, 'source_code'] = 'A'
df_test.loc[df_test['target'] == 1, 'source_code'] = 'B'
df_test.loc[df_test['target'] == 2, 'source_code'] = 'C'

df_test.loc[df_test['target'] == 0, 'confirm_date'] = pd.to_datetime('2018-01-01', format='%Y-%m-%d')
df_test.loc[df_test['target'] == 1, 'confirm_date'] = pd.to_datetime('2019-01-01', format='%Y-%m-%d')
df_test.loc[df_test['target'] == 2, 'confirm_date'] = pd.to_datetime('2020-01-01', format='%Y-%m-%d')

df_test.loc[df_test['target'] == 0, 'flag_maturity'] = True
df_test.loc[df_test['target'] == 1, 'flag_maturity'] = True
df_test.loc[df_test['target'] == 2, 'flag_maturity'] = True

# Фунция расчёта IV по переменным в тотале и по годам

In [5]:
def calc_iv(df, feature, target, dict_bin_split, list_cat_feature, list_numeric_feature, pr=False):
    """
    Данная функция выводит удобную для анализа таблицу по заданным вручную бакетам.

    Аргументы функции:
    df - DataFrame в котором мы хотим трансформировать / посчитать IV признаки.
    feature - название признака, который мы хотим трансформировать.
    target - наименование целевой переменной
    dict_bin_split - словарь, где по ключу находится значение признака, а по значению пороги бакетов, на которые мы хотим разбить переменную

    Для ручных порогов лучше использовать эту функцию, а не OptBinning. Потому что OptBinning может схлопывать несколько бакетов в один, если в одном из бакетов отсутствуют дефолты. Это неудобно для однофакторного анализа и презентации результатов.
    ВАЖНО!!! Если в df класть оригинал DataFrame, а не его копию (т.е. DataFrame.copy()), то значения признаков в исходном DataFrame автоматически заменяться на бакеты
    """

    if feature in list_cat_feature: # если признак является категориальным, то идем сюда
        lst = []
        for i_bin in dict_bin_split[feature]:
            df.loc[df[feature].isin(i_bin), feature] = f"{i_bin}"

        for i in range(pd.Series(dict_bin_split[feature]).astype(str).nunique()): # сразу иду циклом по всем возможным бакетам для разбиения указанных в dict_bin_split
            val = list(pd.Series(dict_bin_split[feature]).astype(str).unique())[i] # сразу иду циклом по всем возможным бакетам для разбиения указанных в dict_bin_split
            lst.append([feature, # Variable
                        val, # Value
                        df[df[feature] == val].count()[feature], # All
                        df[(df[feature] == val) & (df[target] == 0)].count()[feature], # Good
                        df[(df[feature] == val) & (df[target] == 1)].count()[feature]]) # BAD
        data = pd.DataFrame(lst, columns=['Variable', 'Bin', 'All', 'Good', 'Bad'])

        data['Share'] = data['All'] / data['All'].sum()
        data['Bad Rate'] = data['Bad'] / data['All']
        data['Distribution Good'] = (data['All'] - data['Bad']) / (data['All'].sum() - data['Bad'].sum())
        data['Distribution Bad'] = data['Bad'] / data['Bad'].sum()
        data['WoE'] = np.log(data['Distribution Good'] / data['Distribution Bad'])

        data = data.replace({'WoE': {np.inf: 0, -np.inf: 0}})

        data['IV'] = data['WoE'] * (data['Distribution Good'] - data['Distribution Bad'])

        data = data.sort_values(by=['Variable', 'Bin'], ascending=[True, True])

        if pr:
            print(data)
            print('IV = ', data['IV'].sum())

        iv = data['IV'].sum()
        data.loc['Total'] = data.sum(numeric_only=True, axis=0)

        for i_col_format in ['Share', 'Bad Rate', 'Distribution Good', 'Distribution Bad']:
            data[i_col_format] = (data[i_col_format]*100).apply('{:.2f}%'.format)

        data.loc['Total', ['Variable', 'Bin', 'Bad Rate', 'WoE']] = '-'
        data[['All', 'Good', 'Bad']] = data[['All', 'Good', 'Bad']].astype(int)

    if feature in list_numeric_feature: # Если признак является непрерывным, то идём сюда
        df[feature] = pd.cut(df[feature], [-np.inf]+dict_bin_split[feature]+[np.inf], right=False).to_frame()
        lst = []

        for i in range(len(pd.arrays.IntervalArray.from_breaks([-np.inf]+dict_bin_split[feature]+[np.inf], closed='left').unique())): # Сразу иду циклом по всем возможным бакетам для разбиения указанных в dict_bin_split
            val  = list(pd.arrays.IntervalArray.from_breaks([-np.inf]+dict_bin_split[feature]+[np.inf], closed='left').unique())[i]
            lst.append([feature, # Variable
                        val, # Value
                        df[df[feature] == val].count()[feature], # All
                        df[(df[feature] == val) & (df[target] == 0)].count()[feature], # Good
                        df[(df[feature] == val) & (df[target] == 1)].count()[feature]]) # BAD
        data = pd.DataFrame(lst, columns=['Variable', 'Bin', 'All', 'Good', 'Bad'])

        data['Share'] = data['All'] / data['All'].sum()
        data['Bad Rate'] = data['Bad'] / data['All']
        data['Distribution Good'] = (data['All'] - data['Bad']) / (data['All'].sum() - data['Bad'].sum())
        data['Distribution Bad'] = data['Bad'] / data['Bad'].sum()
        data['WoE'] = np.log(data['Distribution Good'] / data['Distribution Bad'])

        data = data.replace({'WoE': {np.inf: 0, -np.inf: 0}})

        data['IV'] = data['WoE'] * (data['Distribution Good'] - data['Distribution Bad'])

        data = data.sort_values(by=['Variable', 'Bin'], ascending=[True, True])
        data.index = range(len(data.index))

        if pr:
            print(data)
            print('IV = ', data['IV'].sum())

        iv = data['IV'].sum()
        data.loc['Total'] = data.sum(numeric_only=True, axis=0)

        for i_col_format in ['Share', 'Bad Rate', 'Distribution Good', 'Distribution Bad']:
            data[i_col_format] = (data[i_col_format]*100).apply('{:.2f}%'.format)

        data.loc['Total', ['Variable', 'Bin', 'Bad Rate', 'WoE']] = '-'
        data[['All', 'Good', 'Bad']] = data[['All', 'Good', 'Bad']].astype(int)

    return data[['Variable', 'Bin', 'All', 'Share', 'Good', 'Bad', 'Distribution Bad', 'Bad Rate', 'WoE', 'IV']]

# Пример использования:

In [6]:
df_test['confirm_date'].dt.year.unique()

array([2018, 2019, 2020], dtype=int32)

In [7]:
dict_bin_split = {
    'petal length (cm)': [1, 3],
    'source_code': np.array([['A', 'B'], ['C']], dtype=object)
}
list_numeric_feature = ['petal length (cm)']
list_cat_feature = ['source_code']

## Отчёт за всё время

In [8]:
df_woe_iv_report = pd.DataFrame()
for i_feat in list(dict_bin_split.keys()):
    data_iv = calc_iv(df=df_test.copy(), feature=i_feat, target='target', dict_bin_split=dict_bin_split,
                      list_cat_feature=list_cat_feature, list_numeric_feature=list_numeric_feature)
    df_woe_iv_report = pd.concat([df_woe_iv_report, data_iv], axis=0)

df_woe_iv_report

Unnamed: 0,Variable,Bin,All,Share,Good,Bad,Distribution Bad,Bad Rate,WoE,IV
0,petal length (cm),"[-inf, 1.0)",0,0.00%,0,0,0.00%,nan%,,
1,petal length (cm),"[1.0, 3.0)",50,33.33%,50,0,0.00%,0.00%,0.0,0.0
2,petal length (cm),"[3.0, inf)",100,66.67%,0,50,100.00%,50.00%,-0.693147,0.346574
Total,-,-,150,100.00%,50,50,100.00%,-,-,0.346574
0,source_code,"['A', 'B']",100,66.67%,50,50,100.00%,50.00%,-0.693147,0.346574
1,source_code,['C'],50,33.33%,0,0,0.00%,0.00%,0.0,0.0
Total,-,-,150,100.00%,50,50,100.00%,-,-,0.346574


## Отчёт по каждому году

In [9]:
df_woe_iv_report_by_year = pd.DataFrame()
cnt = -1
for i_year in df_test['confirm_date'].dt.year.unique():
    df_woe_iv_report = pd.DataFrame()
    for i_feat in list(dict_bin_split.keys()):
        data_iv = calc_iv(df=df_test[df_test['confirm_date'].dt.year == i_year].copy(), feature=i_feat, target='target', 
                          dict_bin_split=dict_bin_split, list_cat_feature=list_cat_feature, list_numeric_feature=list_numeric_feature)
        df_woe_iv_report = pd.concat([df_woe_iv_report, data_iv], axis=0)
    if cnt < 0:
        df_woe_iv_report = df_woe_iv_report[['Variable', 'Bin', 'Share', 'All', 'Bad', 'Bad Rate']]
        df_woe_iv_report[f"{i_year}"] = "<---"
        df_woe_iv_report_by_year = pd.concat([df_woe_iv_report_by_year, df_woe_iv_report], axis=1)
    else:
        df_woe_iv_report = df_woe_iv_report[['Share', 'Variable', 'Bin', 'Share', 'All', 'Bad', 'Bad Rate']]
        df_woe_iv_report[f"{i_year}"] = "<---"
        df_woe_iv_report_by_year = pd.concat([df_woe_iv_report_by_year, df_woe_iv_report], axis=1)
    cnt += 1

In [10]:
df_woe_iv_report_by_year

Unnamed: 0,Variable,Bin,Share,All,Bad,Bad Rate,2018,Share.1,Variable.1,Bin.1,Share.2,All.1,Bad.1,Bad Rate.1,2019,Share.3,Variable.2,Bin.2,Share.4,All.2,Bad.2,Bad Rate.2,2020
0,petal length (cm),"[-inf, 1.0)",0.00%,0,0,nan%,<---,0.00%,petal length (cm),"[-inf, 1.0)",0.00%,0,0,nan%,<---,0.00%,petal length (cm),"[-inf, 1.0)",0.00%,0,0,nan%,<---
1,petal length (cm),"[1.0, 3.0)",100.00%,50,0,0.00%,<---,0.00%,petal length (cm),"[1.0, 3.0)",0.00%,0,0,nan%,<---,0.00%,petal length (cm),"[1.0, 3.0)",0.00%,0,0,nan%,<---
2,petal length (cm),"[3.0, inf)",0.00%,0,0,nan%,<---,100.00%,petal length (cm),"[3.0, inf)",100.00%,50,50,100.00%,<---,100.00%,petal length (cm),"[3.0, inf)",100.00%,50,0,0.00%,<---
Total,-,-,100.00%,50,0,-,<---,100.00%,-,-,100.00%,50,50,-,<---,100.00%,-,-,100.00%,50,0,-,<---
0,source_code,"['A', 'B']",100.00%,50,0,0.00%,<---,100.00%,source_code,"['A', 'B']",100.00%,50,50,100.00%,<---,0.00%,source_code,"['A', 'B']",0.00%,0,0,nan%,<---
1,source_code,['C'],0.00%,0,0,nan%,<---,0.00%,source_code,['C'],0.00%,0,0,nan%,<---,100.00%,source_code,['C'],100.00%,50,0,0.00%,<---
Total,-,-,100.00%,50,0,-,<---,100.00%,-,-,100.00%,50,50,-,<---,100.00%,-,-,100.00%,50,0,-,<---


---
---
---

# Функция WoE-трансформации

In [11]:
def transform_calc_iv(df, feature, data_iv, list_cat_feature, list_numeric_feature):
    """
    Данная функция производит WoE-трансформацию признаков

    Аргументы функции:
    df - DataFrame, в котором мы хотим трансформировать признаки. ВАЖНО!!! Помещайте в df исключительно копию исходного датафрейма (через .copy())
    feature - название признака, который мы хотим трансформировать.
    data_iv - результат работы функции calc_iv().
    В зависимости от того является ли трансформируемый признак категориальным (list_cat_feature) или непрерывным (list_numeric_function), алгоритм трансформации будет разным.
    Функция возвращает объект Series - трансформированные в WoE значения исходного признака feature.
    """
    dict_transform = data_iv.loc[~data_iv.index.isin(['Total']), ['Bin', 'WoE']].set_index('Bin')['WoE'].to_dict()

    if feature in list_numeric_feature:
        return df[feature].replace(dict_transform).astype(float)

    if feature in list_cat_feature:
        for i in dict_transform.keys():
            df.loc[df[feature].isin([i]), feature] = dict_transform[i]
        return df[feature].astype(float)

## Пример использования

In [12]:
df_test_fit = df_test.copy()

In [13]:
for i_feat in list(dict_bin_split.keys()):
    data_iv = calc_iv(df=df_test_fit, feature=i_feat, target='target', dict_bin_split=dict_bin_split,
                      list_cat_feature=list_cat_feature, list_numeric_feature=list_numeric_feature)
    df_test_fit[i_feat] = transform_calc_iv(df=df_test_fit.copy(), 
                                           feature=i_feat, 
                                           data_iv=data_iv,
                                           list_cat_feature=list_cat_feature,
                                           list_numeric_feature=list_numeric_feature)
    

In [14]:
df_test_fit[list(dict_bin_split.keys()) + ['target']].dtypes

petal length (cm)    float64
source_code          float64
target               float64
dtype: object

In [15]:
df_test_fit[list(dict_bin_split.keys()) + ['target']].sample(5)

Unnamed: 0,petal length (cm),source_code,target
41,0.0,-0.693147,0.0
47,0.0,-0.693147,0.0
149,-0.693147,0.0,2.0
86,-0.693147,-0.693147,1.0
14,0.0,-0.693147,0.0


---
---
---

# Функция расчёта PSI по переменным

In [16]:
def _calc_bucket_stat(X, features, year_dev_start, year_dev_end, 
                      dt_col='confirm_date', feature_types='discrete', bin_count=5):
    df = X.copy()
    df = df.reset_index()
    df['moment_year'] = df[dt_col].dt.year
    years = sorted(df['moment_year'].unique())
    total_bucket = pd.DataFrame()
    for feature in features:
        # В случае, если переменная непрерывная то бьем её на бины через qcut
        if feature_types == 'cont':
            df.loc[:, feature] = pd.qcut(df.loc[:, feature], bin_count)
        # Если фича имеет категориальные значения (например, после WOE-трансформации)
        feature_bucket = pd.DataFrame(columns=['feature']+years, index=sorted(df[feature].unique()))
        feature_bucket.index = feature_bucket.index.set_names(['var_group'])
        feature_bucket['feature'] = feature
        # Распределение в total рассчитывается на основании базового периода
        feature_bucket['total'] = df.loc[(df['moment_year'] >= year_dev_start) & (df['moment_year'] <= year_dev_end) & df.flag_maturity, feature].value_counts(normalize=True)

        for year in years:
            df_year = df.loc[df['moment_year'] == year, :]
            feature_bucket[year] = df_year[feature].value_counts(normalize=True)

        feature_bucket = feature_bucket.reset_index()
        total_bucket = pd.concat([total_bucket, feature_bucket])
    total_bucket = total_bucket[['feature', 'var_group', 'total'] + years]
    total_bucket = total_bucket.reset_index(drop=True)
    return total_bucket

In [17]:
def calc_psi_features(X, features, year_dev_start, year_dev_end,
                      year_target, dt_col='confirm_date', 
                      feature_types='descrete', bin_counts=5):
    """
    Данная функция рассчитывает PSI для признаков из датасета X.

    Аргументы фунции:
    X - исходные датасет на основании которого мы хотим рассчитать PSI по переменной.
    year_dev_start - год начала базового периода
    year_dev_end - год конца базового периода
    year_target - целевой период (за этот год считается PSI по переменной относительно распределения этой переменной в базовом периоде)
    feature_types
    """
    psi_buckets = pd.DataFrame()
    years = sorted(X.reset_index()[dt_col].dt.year.unique())
    buckets = _calc_bucket_stat(X, features, year_dev_start, year_dev_end, dt_col, feature_types, bin_counts)
    for feat in features:
        feature_buckets = buckets.loc[buckets['feature'] == feat, :]
        psi_feature = {
            col: ((feature_buckets[col] - feature_buckets['total']) * (np.log(feature_buckets[col] / feature_buckets['total']))).sum() * 100 for col in years
        }
        temp = pd.DataFrame({f'{feat}': psi_feature}).T
        psi_buckets = pd.concat([psi_buckets, temp])
    return psi_buckets.loc[:, year_target], buckets

## Пример использования

In [18]:
df_psi = df_test_fit.copy()

In [19]:
df_psi.head()

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target,source_code,confirm_date,flag_maturity
0,5.1,3.5,0.0,0.2,0.0,-0.693147,2018-01-01,True
1,4.9,3.0,0.0,0.2,0.0,-0.693147,2018-01-01,True
2,4.7,3.2,0.0,0.2,0.0,-0.693147,2018-01-01,True
3,4.6,3.1,0.0,0.2,0.0,-0.693147,2018-01-01,True
4,5.0,3.6,0.0,0.2,0.0,-0.693147,2018-01-01,True


In [20]:
feat_psi, feat_buckets = calc_psi_features(X=df_psi,
                                           features=list(dict_bin_split.keys()),
                                           year_dev_start=2018,
                                           year_dev_end=2019,
                                           year_target=[2020]
                                          )

In [21]:
feat_buckets

Unnamed: 0,feature,var_group,total,2018,2019,2020
0,petal length (cm),-0.693147,0.5,,1.0,1.0
1,petal length (cm),0.0,0.5,1.0,,
2,source_code,-0.693147,1.0,1.0,1.0,
3,source_code,0.0,,,,1.0


In [22]:
feat_psi

Unnamed: 0,2020
petal length (cm),34.657359
source_code,0.0
