In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from typing import Iterable, Callable
from itertools import product
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as stats
from sklearn.base import BaseEstimator
import sklearn.metrics as metr
import sklearn.preprocessing as pre
import sklearn.ensemble as ens
import sklearn.linear_model as lin
import sklearn.model_selection as sel

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        fullpath = os.path.join(dirname, filename)
        
        if filename == 'credit_train.csv':
            train = pd.read_csv(fullpath)

        if filename == 'credit_test.csv':
            test = pd.read_csv(fullpath)

In [None]:
cols = train.columns.to_list()
cols

In [None]:
features_cols = [
 'Current Loan Amount',
 'Term',
 'Credit Score',
 'Annual Income',
 'Years in current job',
 'Home Ownership',
 'Purpose',
 'Monthly Debt',
 'Years of Credit History',
 'Months since last delinquent',
 'Number of Open Accounts',
 'Number of Credit Problems',
 'Current Credit Balance',
 'Maximum Open Credit',
 'Bankruptcies',
 'Tax Liens'
]

features = train[features_cols]
features

In [None]:
cols_types = features.dtypes
cols_types

In [None]:
col_nans = features.isna().sum()
col_nans

In [None]:
deliq_vals = features['Months since last delinquent'].unique()
deliq_vals

In [None]:
features['delinquent_score'] = features['Months since last delinquent']\
    .apply(lambda x: 1 / np.log10(x + 10) if pd.notna(x) else 0)

features = features.drop('Months since last delinquent', axis=1)

In [None]:
cur_job_d_vals = features['Years in current job'].unique()
cur_job_d_vals

In [None]:
def map_cur_job_d(cur_job: str) -> int:
    if pd.isna(cur_job):
        return 0

    mapper = {
        '< 1 year': 1,
        '1 year': 2,
        '2 years': 3,
        '3 years': 4,
        '4 years': 5,
        '5 years': 6,
        '6 years': 7,
        '7 years': 8,
        '8 years': 9,
        '9 years': 10,
        '10+ years': 11
    }
    
    mapped = mapper[cur_job]
    return mapped

features['job_years_code'] = features['Years in current job'].apply(map_cur_job_d)

features = features.drop('Years in current job', axis=1)

In [None]:
op_cr_d_vals = features['Maximum Open Credit'].unique()
op_cr_d_vals

In [None]:
features['Maximum Open Credit'] = features['Maximum Open Credit']\
    .fillna(features['Current Loan Amount'])

In [None]:
bnkrpt_d_vals = features['Bankruptcies'].unique()
bnkrpt_d_vals

In [None]:
features['Bankruptcies'] = features['Bankruptcies'].fillna(0)

In [None]:
taxl_d_vals = features['Tax Liens'].unique()
taxl_d_vals

In [None]:
features['Tax Liens'] = features['Tax Liens'].fillna(0)

In [None]:
col_nans = features.isna().sum()
col_nans

In [None]:
h_own_d_vals = features['Home Ownership'].unique()
h_own_d_vals

In [None]:
features['Home Ownership'] = features['Home Ownership']\
    .replace('HaveMortgage', 'Home Mortgage')

In [None]:
purp_d_vals = features['Purpose'].unique()
purp_d_vals

In [None]:
purp_vals_c = features.groupby('Purpose').size()
purp_vals_c

In [None]:
def purpose_transformer(df: pd.DataFrame) -> pd.DataFrame:
    trans = {
        'small_business': 'Business Loan',
        'other': 'Other',
        'renewable_energy': 'Other',
        'major_purchase': 'Major Purchase',
        'Take a Trip': 'Recreation',
        'vacation': 'Recreation',
        'Educational Expenses': 'Other',
    }
    
    df['Purpose'] = df['Purpose'].apply(lambda v: trans[v] if v in trans.keys() else v)
    return df

features = purpose_transformer(features)

purp_vals_c = features.groupby('Purpose').size()
purp_vals_c

In [None]:
status = train['Loan Status']\
    .apply(lambda x: 1 if x == 'Fully Paid' else 0)

In [None]:
def mask_fs_and_stat(fs: pd.DataFrame, stat: pd.Series, mask: pd.Series)\
                    -> tuple[pd.DataFrame, pd.Series]:
    fs_mskd = fs[mask]
    stat_mskd = stat[mask]
    
    return fs_mskd, stat_mskd

def split_fs_and_stat(fs: pd.DataFrame, stat: pd.Series, mask: pd.Series)\
                     -> tuple[pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]:
    fs_pos, stat_pos = mask_fs_and_stat(fs, stat, mask)
    fs_neg, stat_neg = mask_fs_and_stat(fs, stat, ~mask)
    
    return fs_neg, stat_neg, fs_neg, stat_neg

def outlier_clean_mask(fs: pd.DataFrame, col: str, sigmas=3.0, trans=lambda x: x)\
                      -> pd.Series:
    
    col_t = fs[col].apply(trans)
    
    mean = col_t.mean()
    std = col_t.std()
    
    col_t = col_t.fillna(mean)

    mask = (col_t - mean) ** 2 < (sigmas * std) ** 2
    return mask


m_loan = outlier_clean_mask(features, 'Current Loan Amount',\
                           trans=lambda x: np.log10(x + 1))
features, status = mask_fs_and_stat(features, status, m_loan)

m_debt_mask = outlier_clean_mask(features, 'Monthly Debt',\
                                trans=lambda x: np.log10(x + 1))
features, status = mask_fs_and_stat(features, status, m_debt_mask)

o_acc_mask = outlier_clean_mask(features, 'Number of Open Accounts',\
                                trans=lambda x: np.log10(x + 1))
features, status = mask_fs_and_stat(features, status, o_acc_mask)

cr_h_mask = outlier_clean_mask(features, 'Years of Credit History',\
                                trans=lambda x: np.log10(x + 1))
features, status = mask_fs_and_stat(features, status, cr_h_mask)

cr_bl_mask = outlier_clean_mask(features, 'Current Credit Balance',\
                                trans=lambda x: np.log10(x + 1))
features, status = mask_fs_and_stat(features, status, cr_bl_mask)

max_cr_mask = outlier_clean_mask(features, 'Maximum Open Credit',\
                                trans=lambda x: np.log10(x + 1))
features, status = mask_fs_and_stat(features, status, max_cr_mask)

inc_mask = outlier_clean_mask(features, 'Annual Income',\
                             trans=lambda x: np.log10(x + 1), sigmas=4)
features, status = mask_fs_and_stat(features, status, inc_mask)

len(features)

In [None]:
fig = plt.figure(figsize=(16, 32))
gr = plt.GridSpec(6, 3, figure=fig)

fig.subplots_adjust(hspace=0.43)

term = fig.add_subplot(gr[0, 0])
sns.histplot(x=features['Term'], hue=status, ax=term, log_scale=(False, True))

h_own = fig.add_subplot(gr[0, 1])
sns.histplot(x=features['Home Ownership'], hue=status, ax=h_own, log_scale=(False, True))

purp = fig.add_subplot(gr[1, :2])
sns.histplot(x=features['Purpose'], hue=status, ax=purp, log_scale=(False, True))
purp.set_xticklabels(purp.get_xticklabels(), rotation=25, ha='right')

jb_y = fig.add_subplot(gr[0, 2])
sns.histplot(x=features['job_years_code'], hue=status, ax=jb_y, log_scale=(False, True), binwidth=1)

m_debt = fig.add_subplot(gr[1, 2])
sns.kdeplot(x=features['Monthly Debt'], hue=status, ax=m_debt)

cr_h = fig.add_subplot(gr[2, 0])
sns.histplot(x=features['Years of Credit History'], hue=status, ax=cr_h, log_scale=(False, True), binwidth=4)

o_acc = fig.add_subplot(gr[2, 1])
sns.histplot(x=features['Number of Open Accounts'], hue=status, ax=o_acc, log_scale=(False, True), binwidth=4)

cr_pr = fig.add_subplot(gr[2, 2])
sns.histplot(x=features['Number of Credit Problems'], hue=status, ax=cr_pr, log_scale=(False, True), binwidth=1)

сr_bl = fig.add_subplot(gr[3, 0])
sns.kdeplot(x=features['Current Credit Balance'], hue=status, ax=сr_bl)

max_cr = fig.add_subplot(gr[3, 1])
sns.kdeplot(x=features['Maximum Open Credit'], hue=status, ax=max_cr)

bnkrts = fig.add_subplot(gr[3, 2])
sns.histplot(x=features['Bankruptcies'], hue=status, ax=bnkrts, log_scale=(False, True), binwidth=1)

txlys = fig.add_subplot(gr[4, 0])
sns.histplot(x=features['Tax Liens'], hue=status, ax=txlys, log_scale=(False, True), binwidth=1)

deliq = fig.add_subplot(gr[5, 0])
sns.kdeplot(x=features['delinquent_score'], hue=status, ax=deliq)

cr_sc = fig.add_subplot(gr[4, 1:])
sns.histplot(x=features['Credit Score'], hue=status, ax=cr_sc, log_scale=(False, True), bins=100)

inc = fig.add_subplot(gr[5, 1:])
sns.histplot(x=features['Annual Income'], hue=status, ax=inc, log_scale=(False, True), bins=50)

plt.show()

In [None]:
csna_mask = features['Credit Score'].isna()
fs_csna, stat_csna, fs_csnona, stat_csnona = split_fs_and_stat(features, status, csna_mask)

In [None]:
def draw_dist_curve(fs: pd.DataFrame, col: str, ax=plt, dist=stats.norm) -> None:
    vals = fs[col]
    
    params = dist.fit(vals)
    
    stats.probplot(vals, dist=dist, sparams=params, plot=ax)
    
dist_f = plt.figure(figsize=(16, 18))
dist_g = plt.GridSpec(3, 2, figure=dist_f)

dplt_m_debt = dist_f.add_subplot(dist_g[0, 0])
draw_dist_curve(features, 'Monthly Debt', ax=dplt_m_debt, dist=stats.lognorm)
dplt_m_debt.set_title('Monthly Debt lognormal Q-Q')

dplt_cr_h = dist_f.add_subplot(dist_g[0, 1])
draw_dist_curve(features, 'Years of Credit History', ax=dplt_cr_h, dist=stats.gamma)
dplt_cr_h.set_title('Years of Credit History gamma Q-Q')

dplt_cr_bl = dist_f.add_subplot(dist_g[1, 0])
draw_dist_curve(features, 'Current Credit Balance', ax=dplt_cr_bl, dist=stats.lognorm)
dplt_cr_bl.set_title('Current Credit Balance lognormal Q-Q')

dplt_max_cr = dist_f.add_subplot(dist_g[1, 1])
draw_dist_curve(features, 'Maximum Open Credit', ax=dplt_max_cr, dist=stats.lognorm)
dplt_max_cr.set_title('Maximum Open Credit lognormal Q-Q')

dplt_inc = dist_f.add_subplot(dist_g[2, 0])
draw_dist_curve(fs_csnona, 'Annual Income', ax=dplt_inc, dist=stats.lognorm)
dplt_inc.set_title('Annual Income lognormal Q-Q')

plt.show()

In [None]:
def sep_by_status(fs: pd.DataFrame, stat: pd.Series) -> tuple[pd.DataFrame, pd.DataFrame]:
    pos = fs[stat == 1]
    neg = fs[stat == 0]
    
    return pos, neg

def ttest_col_by_status(fs: pd.DataFrame, col: str, stat: pd.Series, logx=False) -> float:
    pos, neg = sep_by_status(fs, stat)
    
    pos_v = pos[col]
    neg_v = neg[col]
    
    if logx:
        pos_v_t = pos_v.apply(np.log)
        neg_v_t = neg_v.apply(np.log)
        
    else:
        pos_v_t = pos_v
        neg_v_t = neg_v
    
    ttest = stats.ttest_ind(pos_v_t, neg_v_t)
    
    pvalue = ttest.pvalue
    return pvalue

def manwhi_col_by_status(fs: pd.DataFrame, col: str, stat: pd.Series) -> float:
    pos, neg = sep_by_status(fs, stat)
    
    pos_v = pos[col]
    neg_v = neg[col]
    
    manwhi = stats.mannwhitneyu(pos_v, neg_v)
    
    pvalue = manwhi.pvalue
    return pvalue
    

tt_m_debt = ttest_col_by_status(features, 'Monthly Debt', status, logx=True)
print("Monthly Debt ttest p_value {0:.3f}%".format(tt_m_debt * 100))

mn_cr_h = manwhi_col_by_status(features, 'Years of Credit History', status)
print("Years of Credit History Mann-Whitney U rank test p_value {0:.3e}%".format(mn_cr_h * 100))

mn_сr_bl = manwhi_col_by_status(features, 'Current Credit Balance', status)
print("Current Credit Balance Mann-Whitney U rank test p_value {0:.3f}%".format(mn_сr_bl * 100))

mn_max_cr = manwhi_col_by_status(features, 'Maximum Open Credit', status)
print("Maximum Open Credit Mann-Whitney U rank test p_value {0:.3e}%".format(mn_max_cr * 100))

mn_inc = manwhi_col_by_status(fs_csnona, 'Annual Income', stat_csnona)
print("Annual Income p_value Mann-Whitney U rank test {0:.3e}%".format(mn_inc * 100))

In [None]:
def sep_minors(fs: pd.DataFrame, col: str, stat: pd.Series, trshld: int)\
               -> tuple[pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]:
    fs_freq = fs.groupby(col).size()
    mask = fs[col].apply(lambda x: fs_freq[x] < trshld)
    
    fs_minor, stat_minor, fs_major, stat_major = split_fs_and_stat(fs, stat, mask)
    return fs_minor, stat_minor, fs_major, stat_major

def x2test_col_by_status(fs: pd.DataFrame, col: str, stat: pd.Series, trshld=100) -> float:
    fs_minor, stat_minor, fs_major, stat_major = sep_minors(fs, col, stat, trshld=trshld)
    
    enc = pre.LabelEncoder()
    
    col_X = fs_major[col].to_numpy().reshape(-1, 1)
    col_enc = enc.fit_transform(col_X)
    
    matr = metr.confusion_matrix(stat_major, col_enc)[:2, :]
    
    if len(fs_minor) > 0:
        minor_pos = stat_minor.sum()
        minor_neg = len(fs_minor) - minor_pos

        minors_col = np.array([minor_neg, minor_pos]).transpose()
        matr = np.column_stack([matr, minors_col])
    
    x2t = stats.chi2_contingency(matr)
    
    pval = x2t.pvalue
    return pval

x2_term = x2test_col_by_status(features, 'Term', status)
print('Term x2 p_value {0:.3e}%'.format(x2_term * 100))

x2_h_own = x2test_col_by_status(features, 'Home Ownership', status)
print('Home x2 Ownership p_value {0:.3e}%'.format(x2_h_own * 100))

x2_purp = x2test_col_by_status(features, 'Purpose', status)
print('Purpose x2 p_value {0:.3e}%'.format(x2_purp * 100))

In [None]:
def get_freq_matrs(fs: pd.DataFrame, col: str, stat: pd.Series) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    whole = fs.groupby(col).size().to_frame('whole')
    pos = fs[stat == 1].groupby(col).size().to_frame('pos')
    
    whole_pos = whole.join(pos)
    whole_pos['pos'] = whole_pos['pos'].fillna(0)
    
    x = whole_pos.index
    
    whole_m = whole_pos['whole'].to_numpy()
    pos_m = whole_pos['pos'].to_numpy()
    
    return x, whole_m, pos_m

def get_ratio(fs: pd.DataFrame, col: str, stat: pd.Series) -> np.ndarray:
    x, whole_m, pos_m = get_freq_matrs(fs, col, stat)
    
    ratio = pos_m / whole_m
    return x, ratio

r_fig = plt.figure(figsize=(16, 12))
r_grid = plt.GridSpec(2, 3, figure=r_fig)

x_o_acc, r_o_acc = get_ratio(features, 'Number of Open Accounts', status)
rplt_o_acc = r_fig.add_subplot(r_grid[0, 0])
sns.scatterplot(x=x_o_acc, y=r_o_acc, ax=rplt_o_acc)

x_cr_pr, r_cr_pr = get_ratio(features, 'Number of Credit Problems', status)
rplt_cr_pr = r_fig.add_subplot(r_grid[0, 1])
sns.scatterplot(x=x_cr_pr, y=r_cr_pr, ax=rplt_cr_pr)

x_bnkrts, r_bnkrts = get_ratio(features, 'Bankruptcies', status)
rplt_bnkrts = r_fig.add_subplot(r_grid[0, 2])
sns.scatterplot(x=x_bnkrts, y=r_bnkrts, ax=rplt_bnkrts)

x_txlys, r_txlys = get_ratio(features, 'Tax Liens', status)
rplt_txlys = r_fig.add_subplot(r_grid[1, 0])
sns.scatterplot(x=x_txlys, y=r_txlys, ax=rplt_txlys)

x_jb_y, r_jb_y = get_ratio(features, 'job_years_code', status)
rplt_jb_y = r_fig.add_subplot(r_grid[1, 1])
sns.scatterplot(x=x_jb_y, y=r_jb_y, ax=rplt_jb_y)

for ax in r_fig.get_axes():
    ax.set_ylabel('Pay Ratio')
    ax.set_ylim(-0.05, 1.05)

In [None]:
def get_ratio_corr(x_col: np.ndarray, r_col: np.ndarray) -> tuple[float, float]:
    pear = stats.spearmanr(x_col, r_col)
    corr = pear.correlation
    pval = pear.pvalue
    
    return corr, pval
    
o_acc_corr, o_acc_pval = get_ratio_corr(x_o_acc, r_o_acc)
print('Number of Open Accounts Rank correlation {0:.3f}, p_value {1:.2f}%'.format(o_acc_corr, o_acc_pval * 100))

cr_pr_corr, cr_pr_pval = get_ratio_corr(x_cr_pr, r_cr_pr)
print('Number of Credit Problems Rank correlation {0:.3f}, p_value {1:.2f}%'.format(cr_pr_corr, cr_pr_pval * 100))

bnkrts_corr, bnkrts_pval = get_ratio_corr(x_bnkrts, r_bnkrts)
print('Bankruptcies Rank correlation {0:.3f}, p_value {1:.2f}%'.format(bnkrts_corr, bnkrts_pval * 100))

txlys_corr, txlys_pval = get_ratio_corr(x_txlys, r_txlys)
print('Tax Liens Rank correlation {0:.3f}, p_value {1:.2f}%'.format(txlys_corr, txlys_pval * 100))

jb_y_corr, jb_y_pval = get_ratio_corr(x_jb_y, r_jb_y)
print('job_years_code correlation {0:.3f}, p_value {1:.2f}%'.format(jb_y_corr, jb_y_pval * 100))

In [None]:
x2_cr_pr = x2test_col_by_status(features, 'Number of Credit Problems', status)
print('Number of Credit Problems x2 p_value {0:.2f}%'.format(x2_cr_pr * 100))

x2_bnkrts = x2test_col_by_status(features, 'Bankruptcies', status)
print('Bankruptcies x2 p_value {0:.2f}%'.format(x2_bnkrts * 100))

x2_txlys = x2test_col_by_status(features, 'Tax Liens', status)
print('Tax Liens x2 p_value {0:.2f}%'.format(x2_txlys * 100))

x2_jb_y = x2test_col_by_status(features, 'job_years_code', status)
print('job_years_code x2 p_value {0:.2e}%'.format(x2_jb_y * 100))

In [None]:
is_deliq = features['delinquent_score']\
    .apply(lambda x: 1 if x > 0 else 0)\
    .to_frame('is_delinquent')

x2_is_deliq = x2test_col_by_status(is_deliq, 'is_delinquent', status)
print('is_delinquent x2 p_value {0:.2f}%'.format(x2_is_deliq * 100))

In [None]:
deliq_mask = is_deliq['is_delinquent'] == 1
fs_deliq, stat_deliq = mask_fs_and_stat(features, status, deliq_mask)

mn_deliq = manwhi_col_by_status(fs_deliq, 'delinquent_score', stat_deliq)
print('non-zero delinquent_score Mann-Whitney U rank test {0:.2f}%'.format(mn_deliq * 100))

In [None]:
norm_cr_sc = (features['Credit Score'] < 1000) & (features['Credit Score'].notna())
fs_norm_csc, stat_norm_csc = mask_fs_and_stat(features, status, norm_cr_sc) 

nm_norm_csc = manwhi_col_by_status(fs_norm_csc, 'Credit Score', stat_norm_csc)
print('normal Credit Score Mann-Whitney U rank test {0:.2e}%'.format(nm_norm_csc * 100))

sns.kdeplot(x=fs_norm_csc['Credit Score'], hue=stat_norm_csc)
plt.show()

In [None]:
loan_fig, loan_ax = plt.subplots()
loan_fig.set_figwidth(18)
loan_fig.set_figheight(6)

sns.histplot(x=features['Current Loan Amount'], hue=status, log_scale=(False, True), bins=200, ax=loan_ax)
plt.show()

In [None]:
minr_loan_m = features['Current Loan Amount'] < 1e+7
fs_minr_loan, stat_minr_loan = mask_fs_and_stat(features, status, minr_loan_m) 

nm_minr_loan = manwhi_col_by_status(fs_minr_loan, 'Current Loan Amount', stat_minr_loan)
print('Mann-Whitney U rank test for minor Current Loan Amount {0:.2e}%'.format(nm_minr_loan * 100))

loan_minr_f, loan_minr_ax = plt.subplots()
loan_minr_f.set_figwidth(12)
loan_minr_f.set_figheight(6)

sns.kdeplot(x=fs_minr_loan['Current Loan Amount'], hue=stat_minr_loan, ax=loan_minr_ax)
plt.show()

In [None]:
fs_sel_cols = [
 'Current Loan Amount',
 'Term',
 'Credit Score',
 'Annual Income',
 'Home Ownership',
 'Purpose',
 'Monthly Debt',
 'Years of Credit History',
 'Number of Open Accounts',
 'Current Credit Balance',
 'Maximum Open Credit',
 'Tax Liens',
 'job_years_code',
 'delinquent_score'
]

fs_sel = features[fs_sel_cols]

In [None]:
def fit_model_col(col: pd.Series, stat: pd.Series, model: BaseEstimator, metr: str,\
                  param_sets: dict[str, Iterable[float]]) -> dict:
    col_X = col.to_numpy().reshape(-1, 1)
    stat_y = stat.to_numpy()

    kf = sel.KFold(n_splits=5)
    
    gr = sel.GridSearchCV(model, param_sets, cv=kf, scoring=metr)
    gr.fit(col_X, stat_y)
    
    def predictor(col: np.ndarray) -> np.ndarray:
        col_X = col.reshape(-1, 1)
        
        probas = gr.predict_proba(col_X)
        proba = probas[:,1].transpose()
        
        return proba
    
    params = gr.best_params_
    score = gr.best_score_
    
    res_dict = {
        'predictor': predictor,
        'params': params,
        'score': score
    }
    return res_dict

def get_graph_grid(col: pd.Series, n=500) -> np.ndarray:
    gmin = col.min()
    gmax = col.max()
    step = (gmax - gmin) / n
    
    grid = np.arange(gmin, gmax, step)
    return grid

In [None]:
loan_ens_params = {
    'n_estimators': [10, 25, 50],
    'min_samples_leaf': [5, 20, 50, 100],
    'max_depth': [2, 5, 10]
}

fssl_mnr_ln, stat_mnr_ln = mask_fs_and_stat(fs_sel, status, minr_loan_m)

loan_rf = fit_model_col(
    fssl_mnr_ln['Current Loan Amount'], 
    stat_mnr_ln, 
    ens.RandomForestClassifier(),
    'neg_brier_score', 
    loan_ens_params
)

loan_rf_sc = abs(loan_rf['score'])
print(f'Current Loan Amount Random Forest Brier score {loan_rf_sc:.2f}')

loan_xgr = get_graph_grid(fssl_mnr_ln['Current Loan Amount'])
loan_rf_p = loan_rf['predictor'](loan_xgr)

sns.lineplot(x=loan_xgr, y=loan_rf_p)
plt.show()

In [None]:
loan_rf['params']