(**Click the icon below to open this notebook in Colab**)

[![Open InColab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/xiangshiyin/machine-learning-for-actuarial-science/blob/main/2025-spring/week09/notebook/demo.ipynb)

# Imbalanced Data

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, recall_score

def generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.05, random_state=1):
    X, y = make_classification(n_samples=n_sample, n_features=2, n_redundant=0, n_clusters_per_class=1,
                               weights=[1 - minority_ratio], class_sep=1, flip_y=0, random_state=random_state)
    return X, y

# Function to plot decision boundaries
def plot_decision_boundaries(model, X, y, ax, title):
    x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
    xx, yy = np.meshgrid(np.linspace(x_min, x_max, 100),
                         np.linspace(y_min, y_max, 100))

    Z = model.predict(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)

    ax.contourf(xx, yy, Z, alpha=0.3, cmap=plt.cm.coolwarm)
    scatter = ax.scatter(X[:, 0], X[:, 1], c=y, cmap=plt.cm.coolwarm, edgecolors='k')
    ax.set_title(title)
    ax.set_xlim(xx.min(), xx.max())
    ax.set_ylim(yy.min(), yy.max())

inputs_to_visualize = []
for minority_ratio in [0.001, 0.005, 0.01, 0.05, 0.1, 0.5]:
    # Generate an imbalanced dataset
    X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=minority_ratio)
    # Split the dataset
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=12, stratify=y)
    # Fit a logistic regression model
    model = LogisticRegression()
    model.fit(X_train, y_train)
    # Check the accuracy
    accuracy = accuracy_score(y_test, model.predict(X_test))
    # Store the input data for visualization
    inputs_to_visualize.append((minority_ratio, model, X_test, y_test, accuracy))

# visualize all in a 1x3 grid
nrow, ncol = 3, 2
fig, axes = plt.subplots(nrow, ncol, figsize=(8, 8))
for i, (minority_ratio, model, X_input, y_input, accuracy) in enumerate(inputs_to_visualize):
    plot_decision_boundaries(model, X_input, y_input, axes[i//ncol][i%ncol], f"Ratio: {minority_ratio}, Accuracy: {accuracy:.4f}")
plt.tight_layout()
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, recall_score

def generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.05, random_state=1):
    X, y = make_classification(n_samples=n_sample, n_features=2, n_redundant=0, n_clusters_per_class=1,
                               weights=[1 - minority_ratio], class_sep=1, flip_y=0, random_state=random_state)
    return X, y

# Function to plot decision boundaries
def plot_decision_boundaries(model, X, y, ax, title):
    x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
    xx, yy = np.meshgrid(np.linspace(x_min, x_max, 100),
                         np.linspace(y_min, y_max, 100))

    Z = model.predict(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)

    ax.contourf(xx, yy, Z, alpha=0.3, cmap=plt.cm.coolwarm)
    scatter = ax.scatter(X[:, 0], X[:, 1], c=y, cmap=plt.cm.coolwarm, edgecolors='k')
    ax.set_title(title)
    ax.set_xlim(xx.min(), xx.max())
    ax.set_ylim(yy.min(), yy.max())

inputs_to_visualize = []
for minority_ratio in [0.001, 0.005, 0.01, 0.05, 0.1, 0.5]:
    # Generate an imbalanced dataset
    X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=minority_ratio)
    # Split the dataset
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=12, stratify=y)
    # Fit a logistic regression model
    model = LogisticRegression()
    model.fit(X_train, y_train)
    # Check the accuracy and recall
    accuracy = accuracy_score(y_test, model.predict(X_test))
    recall = recall_score(y_test, model.predict(X_test))
    # Store the input data for visualization
    inputs_to_visualize.append((minority_ratio, model, X_test, y_test, accuracy, recall))

# visualize all in a 1x3 grid
nrow, ncol = 3, 2
fig, axes = plt.subplots(nrow, ncol, figsize=(8, 8))
for i, (minority_ratio, model, X_input, y_input, accuracy, recall) in enumerate(inputs_to_visualize):
    plot_decision_boundaries(model, X_input, y_input, axes[i//ncol][i%ncol], f"Ratio: {minority_ratio}, Accuracy: {accuracy:.4f}, Recall: {recall:.4f}")
plt.tight_layout()
plt.show()


# Model Evaluation with Imbalanced Data

- Cross validation score prameter [[doc](https://scikit-learn.org/stable/modules/model_evaluation.html#the-scoring-parameter-defining-model-evaluation-rules)]

In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedStratifiedKFold

X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.01)
lr = LogisticRegression()

cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
roc_aucs = cross_val_score(lr, X, y, scoring='roc_auc', cv=cv, n_jobs=-1)
accuracies = cross_val_score(lr, X, y, scoring='accuracy', cv=cv, n_jobs=-1)
recalls = cross_val_score(lr, X, y, scoring='recall', cv=cv, n_jobs=-1)
f1s = cross_val_score(lr, X, y, scoring='f1', cv=cv, n_jobs=-1)

print('Mean ROC AUC: %.4f' % np.mean(roc_aucs))
print('Mean Accuracy: %.4f' % np.mean(accuracies))
print('Mean Recall: %.4f' % np.mean(recalls))
print('Mean F1: %.4f' % np.mean(f1s))

## ROC-AUC

In [None]:
# example of a roc curve for a predictive model
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, roc_auc_score
import matplotlib.pyplot as plt

# generate 2 class dataset
X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.5)
# split into train/test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=2, stratify=y)
# fit a model
lr = LogisticRegression()
lr.fit(X_train, y_train)
# predict probabilities
yhat = lr.predict_proba(X_test)
# retrieve just the probabilities for the positive class
pos_probs = yhat[:, 1]
# plot no skill roc curve
plt.plot([0, 1], [0, 1], linestyle='--', label='No Skill')
# calculate roc curve for the model
fpr, tpr, thresholds = roc_curve(y_test, pos_probs)
roc_auc_lr = roc_auc_score(y_test, pos_probs)
# plot lr roc curve
plt.plot(fpr, tpr, marker='.', label='Logistic')
# axis labels
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
# show the legend
plt.legend()
# show the plot
plt.title(f'ROC curve for Logistic Regression Model. AUC = {roc_auc_lr:.2f}')
plt.show()

In [None]:
from sklearn.dummy import DummyClassifier

dc = DummyClassifier(strategy='stratified')
dc.fit(X_train, y_train)
yhat_dummy = dc.predict_proba(X_test)
pos_probs_dummy = yhat_dummy[:, 1]
# calculate roc auc
roc_auc_dummy = roc_auc_score(y_test, pos_probs_dummy)
print(f'ROC AUC = {roc_auc_dummy:.2f}')

In [None]:
pos_probs_dummy = yhat_dummy[:, 1]

plt.hist(pos_probs_dummy, bins=10, label='Positive Class Distribution', density=True)
plt.legend()
plt.show()

In [None]:
pos_probs = yhat[:, 1]
neg_probs = yhat[:, 0]

plt.hist(pos_probs, bins=100, label='Positive Class Distribution', density=True)
plt.axvline(x=0.5, color='red', linestyle='--', linewidth=2, label='Threshold = 0.5')
plt.legend()
plt.show()

In [None]:
recall = recall_score(y_test, lr.predict(X_test))
print(f"Recall: {recall}")

In [None]:
np.sum(pos_probs>=0.5), np.sum(pos_probs<0.5)

https://scikit-learn.org/stable/modules/generated/sklearn.metrics.confusion_matrix.html


|               | Negative Prediction | Positive Prediction |
|--------------|--------------------|--------------------|
| **Negative Class** | True Negative (TN)  | False Positive (FP) |
| **Positive Class** | False Negative (FN) | True Positive (TP) |

In [None]:
from sklearn.metrics import confusion_matrix

matrix = confusion_matrix(y_test, lr.predict(X_test))
print(matrix)

## PR-AUC

In [None]:
from sklearn.metrics import precision_recall_curve

# retrieve just the probabilities for the positive class
pos_probs = yhat[:, 1]
# calculate the no skill line as the proportion of the positive class
no_skill = len(y[y==1]) / len(y)
# plot the no skill precision-recall curve
plt.plot([0, 1], [no_skill, no_skill], linestyle='--', label='No Skill')
# calculate model precision-recall curve
precision, recall, _ = precision_recall_curve(y_test, pos_probs)
# plot the model precision-recall curve
plt.plot(recall, precision, marker='.', label='Logistic')
# axis labels
plt.xlabel('Recall')
plt.ylabel('Precision')
# show the legend
plt.legend()
# show the plot
plt.show()

In [None]:
from sklearn.metrics import auc

auc_score = auc(recall, precision)
print('PR AUC: %.3f' % auc_score)

In [None]:


precision_dummy, recall_dummy, _ = precision_recall_curve(y_test, pos_probs_dummy)
auc_score = auc(recall_dummy, precision_dummy)
print('PR AUC: %.3f' % auc_score)

## Severly Imbalanced Data

### ROC-AUC

In [None]:
# example of a roc curve for a predictive model
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, roc_auc_score
import matplotlib.pyplot as plt

# generate 2 class dataset
X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.01)
# split into train/test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=2, stratify=y)
# fit a model
lr = LogisticRegression()
lr.fit(X_train, y_train)
# predict probabilities
yhat = lr.predict_proba(X_test)
# retrieve just the probabilities for the positive class
pos_probs = yhat[:, 1]
# plot no skill roc curve
plt.plot([0, 1], [0, 1], linestyle='--', label='No Skill')
# calculate roc curve for the model
fpr, tpr, thresholds = roc_curve(y_test, pos_probs)
roc_auc_lr = roc_auc_score(y_test, pos_probs)
# plot lr roc curve
plt.plot(fpr, tpr, marker='.', label='Logistic')
# axis labels
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
# show the legend
plt.legend()
# show the plot
plt.title(f'ROC curve for Logistic Regression Model. AUC = {roc_auc_lr:.2f}')
plt.show()

In [None]:
pos_probs = yhat[:, 1]

plt.hist(pos_probs, bins=100, label='Positive Class Distribution', density=True)
plt.axvline(x=0.5, color='red', linestyle='--', linewidth=2, label='Threshold = 0.5')
plt.legend()
plt.show()

In [None]:
recall = recall_score(y_test, lr.predict(X_test))
print(f"Recall: {recall}")

In [None]:
np.sum(pos_probs>=0.5), np.sum(pos_probs<0.5)

In [None]:
from sklearn.metrics import confusion_matrix

matrix = confusion_matrix(y_test, lr.predict(X_test))
print(matrix)

In [None]:
7 / 20

### PR-AUC

In [None]:
from sklearn.metrics import precision_recall_curve

# retrieve just the probabilities for the positive class
pos_probs = yhat[:, 1]
# calculate the no skill line as the proportion of the positive class
no_skill = len(y[y==1]) / len(y)
# plot the no skill precision-recall curve
plt.plot([0, 1], [no_skill, no_skill], linestyle='--', label='No Skill')
# calculate model precision-recall curve
precision, recall, _ = precision_recall_curve(y_test, pos_probs)
# plot the model precision-recall curve
plt.plot(recall, precision, marker='.', label='Logistic')
# axis labels
plt.xlabel('Recall')
plt.ylabel('Precision')
# show the legend
plt.legend()
# show the plot
plt.show()

To explain why the ROC and PR curves tell a different story, recall that the PR curve focuses on the minority class, whereas the ROC curve covers both classes.

In [None]:
from sklearn.metrics import auc

auc_score = auc(recall, precision)
print('PR AUC: %.3f' % auc_score)

# SMOTE (Synthetic Minority Over-sampling Technique)

In [None]:
import imblearn

imblearn.__version__

In [None]:
from collections import Counter

# define dataset
X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.01, random_state=1)
# summarize class distribution
counter = Counter(y)
print(counter)

for label in counter:
    row_ix = np.where(y == label)[0]
    plt.scatter(X[row_ix, 0], X[row_ix, 1], label=str(label))
    
plt.legend()
plt.show()

In [None]:
from imblearn.over_sampling import SMOTE

X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.01, random_state=1)
# transform the dataset
oversample = SMOTE()
X, y = oversample.fit_resample(X, y)
counter = Counter(y)
print(counter)

for label in counter:
    row_ix = np.where(y == label)[0]
    plt.scatter(X[row_ix, 0], X[row_ix, 1], label=str(label))
    
plt.legend()
plt.show()

The original paper on SMOTE suggested combining SMOTE with random undersampling of the majority class.

In [None]:
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline

X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.01, random_state=1)
counter = Counter(y)
print(counter)

over = SMOTE(sampling_strategy=0.1)
X, y = over.fit_resample(X, y)
counter = Counter(y)
print(counter)

for label in counter:
    row_ix = np.where(y == label)[0]
    plt.scatter(X[row_ix, 0], X[row_ix, 1], label=str(label))
    
plt.legend()
plt.show()

In [None]:
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline

X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.01, random_state=1)
counter = Counter(y)
print(counter)

over = SMOTE(sampling_strategy=0.1)
under = RandomUnderSampler(sampling_strategy=0.5)
steps = [('o', over), ('u', under)]
pipeline = Pipeline(steps=steps)
# transform the dataset
X, y = pipeline.fit_resample(X, y)
# summarize the new class distribution
counter = Counter(y)
print(counter)
# scatter plot of examples by class label
for label in counter:
	row_ix = np.where(y == label)[0]
	plt.scatter(X[row_ix, 0], X[row_ix, 1], label=str(label))
plt.legend()
plt.show()

# Classification with SMOTE

In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedStratifiedKFold

X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.01)
lr = LogisticRegression()

cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
roc_aucs = cross_val_score(lr, X, y, scoring='roc_auc', cv=cv, n_jobs=-1)
accuracies = cross_val_score(lr, X, y, scoring='accuracy', cv=cv, n_jobs=-1)
recalls = cross_val_score(lr, X, y, scoring='recall', cv=cv, n_jobs=-1)
f1s = cross_val_score(lr, X, y, scoring='f1', cv=cv, n_jobs=-1)

print('Mean ROC AUC: %.4f' % np.mean(roc_aucs))
print('Mean Accuracy: %.4f' % np.mean(accuracies))
print('Mean Recall: %.4f' % np.mean(recalls))
print('Mean F1: %.4f' % np.mean(f1s))

In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedStratifiedKFold
from imblearn.pipeline import Pipeline

X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.01)
lr = LogisticRegression()
oversample = SMOTE()

steps = [
    ('over', oversample),
    ('model', lr)
]
pipeline = Pipeline(steps=steps)

cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
roc_aucs = cross_val_score(pipeline, X, y, scoring='roc_auc', cv=cv, n_jobs=-1)
accuracies = cross_val_score(pipeline, X, y, scoring='accuracy', cv=cv, n_jobs=-1)
recalls = cross_val_score(pipeline, X, y, scoring='recall', cv=cv, n_jobs=-1)
f1s = cross_val_score(pipeline, X, y, scoring='f1', cv=cv, n_jobs=-1)

print('Mean ROC AUC: %.4f' % np.mean(roc_aucs))
print('Mean Accuracy: %.4f' % np.mean(accuracies))
print('Mean Recall: %.4f' % np.mean(recalls))
print('Mean F1: %.4f' % np.mean(f1s))

In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedStratifiedKFold
from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler

X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.01)
lr = LogisticRegression()
oversample = SMOTE(sampling_strategy=0.1)
undersample = RandomUnderSampler(sampling_strategy=0.5)

steps = [
    ('over', oversample),
    ('under', undersample),
    ('model', lr)
]
pipeline = Pipeline(steps=steps)

cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
roc_aucs = cross_val_score(pipeline, X, y, scoring='roc_auc', cv=cv, n_jobs=-1)
accuracies = cross_val_score(pipeline, X, y, scoring='accuracy', cv=cv, n_jobs=-1)
recalls = cross_val_score(pipeline, X, y, scoring='recall', cv=cv, n_jobs=-1)
precisions = cross_val_score(pipeline, X, y, scoring='precision', cv=cv, n_jobs=-1)
f1s = cross_val_score(pipeline, X, y, scoring='f1', cv=cv, n_jobs=-1)

print('Mean ROC AUC: %.4f' % np.mean(roc_aucs))
print('Mean Accuracy: %.4f' % np.mean(accuracies))
print('Mean Recall: %.4f' % np.mean(recalls))
print('Mean Precision: %.4f' % np.mean(precisions))
print('Mean F1: %.4f' % np.mean(f1s))

In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedStratifiedKFold
from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler

X, y = generate_imbalanced_dataset(n_sample=10000, minority_ratio=0.01)

k_values = [1, 2, 3, 4, 5, 6, 7]

for k in k_values:
    print('Testing k=%d' % k)
    lr = LogisticRegression()
    oversample = SMOTE(sampling_strategy=0.1, k_neighbors=k)
    undersample = RandomUnderSampler(sampling_strategy=0.5)

    steps = [
        ('over', oversample),
        ('under', undersample),
        ('model', lr)
    ]
    pipeline = Pipeline(steps=steps)

    cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
    roc_aucs = cross_val_score(pipeline, X, y, scoring='roc_auc', cv=cv, n_jobs=-1)
    accuracies = cross_val_score(pipeline, X, y, scoring='accuracy', cv=cv, n_jobs=-1)
    recalls = cross_val_score(pipeline, X, y, scoring='recall', cv=cv, n_jobs=-1)
    precisions = cross_val_score(pipeline, X, y, scoring='precision', cv=cv, n_jobs=-1)
    f1s = cross_val_score(pipeline, X, y, scoring='f1', cv=cv, n_jobs=-1)

    print('Mean ROC AUC: %.4f' % np.mean(roc_aucs))
    print('Mean Accuracy: %.4f' % np.mean(accuracies))
    print('Mean Recall: %.4f' % np.mean(recalls))
    print('Mean Precision: %.4f' % np.mean(precisions))
    print('Mean F1: %.4f' % np.mean(f1s))

# Example: German Credit Risk data
- https://www.kaggle.com/datasets/uciml/german-credit

In this dataset
- Each entry represents a person who takes a credit by a bank. 
- Each person is classified as good or bad credit risks according to the set of attributes.

The selected attributes are:
- **Age** (numeric)
- **Sex** (text: male, female)
- **Job** (numeric: 0 - unskilled and non-resident, 1 - unskilled and resident, 2 - skilled, 3 - highly skilled)
- **Housing** (text: own, rent, or free)
- **Saving accounts** (text - little, moderate, quite rich, rich)
- **Checking account** (numeric, in DM - Deutsch Mark)
- **Credit amount** (numeric, in DM)
- **Duration** (numeric, in month)
- **Purpose** (text: car, furniture/equipment, radio/TV, domestic appliances, repairs, education, business, vacation/others)

In [None]:
import pandas as pd

credit = pd.read_csv('../data/credit_data_risk.csv')
credit.head(3)

In [None]:
credit.drop(columns=['Unnamed: 0'], inplace=True)

## Data Exploration

In [None]:
credit.info()

In [None]:
credit['Saving accounts'].value_counts()

In [None]:
credit['Checking account'].value_counts()

In [None]:
credit['Risk'].value_counts()

In [None]:
numerical_credit = credit.select_dtypes(exclude='O')
numerical_credit.columns

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt


fig, axes = plt.subplots(2, 2, figsize=(10, 10))
sns.histplot(x='Age', data=credit, ax=axes[0, 0])
sns.histplot(x='Job', data=credit, ax=axes[0, 1])
sns.histplot(x='Credit amount', data=credit, ax=axes[1, 0])
sns.histplot(x='Duration', data=credit, ax=axes[1, 1])

plt.tight_layout()
plt.show()

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(10, 10))
sns.kdeplot(x='Age', hue='Risk', data=credit, ax=axes[0, 0], common_norm=False)
sns.kdeplot(x='Job', hue='Risk', data=credit, ax=axes[0, 1], common_norm=False)
sns.kdeplot(x='Credit amount', hue='Risk', data=credit, ax=axes[1, 0], common_norm=False)
sns.kdeplot(x='Duration', hue='Risk', data=credit, ax=axes[1, 1], common_norm=False)

plt.tight_layout()
plt.show()

In [None]:
cat_credit = credit.select_dtypes(include='O')
cat_credit.columns

In [None]:
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
sns.countplot(x='Sex', data=credit, ax=axes[0, 0])
sns.countplot(x='Housing', data=credit, ax=axes[0, 1])
sns.countplot(x='Saving accounts', data=credit, ax=axes[1, 0])
sns.countplot(x='Checking account', data=credit, ax=axes[1, 1])
sns.countplot(x='Purpose', data=credit, ax=axes[1, 2])

plt.tight_layout()
plt.xticks(rotation=45)
plt.show()

In [None]:
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
sns.countplot(x='Sex', hue='Risk', data=credit, ax=axes[0, 0])
sns.countplot(x='Housing', hue='Risk', data=credit, ax=axes[0, 1])
sns.countplot(x='Saving accounts', hue='Risk', data=credit, ax=axes[1, 0])
sns.countplot(x='Checking account', hue='Risk', data=credit, ax=axes[1, 1])
sns.countplot(x='Purpose', hue='Risk', data=credit, ax=axes[1, 2])

plt.tight_layout()
plt.xticks(rotation=45)
plt.show()

## Customer Segmentation

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans

scaler = StandardScaler()
scaled_credit = scaler.fit_transform(numerical_credit)

## try different k values
kmeans_per_k = [KMeans(n_clusters=k).fit(scaled_credit) for k in range(1, 10)]
inertias = [model.inertia_ for model in kmeans_per_k]

plt.figure(figsize=(8, 3.5))
plt.plot(range(1, 10), inertias, "bo-")
plt.xlabel("$k$", fontsize=14)
plt.ylabel("Inertia", fontsize=14)
plt.annotate('Elbow',
             xy=(4, inertias[3]),
             xytext=(0.55, 0.55),
             textcoords='figure fraction',
             fontsize=16,
             arrowprops=dict(facecolor='black', shrink=0.1)
            )
# plt.ylim(0, 1300)
plt.show()

In [None]:
from sklearn.metrics import silhouette_score

silhouette_scores = [silhouette_score(scaled_credit, model.labels_) for model in kmeans_per_k[1:]]
plt.figure(figsize=(8, 3))
plt.plot(range(2, 10), silhouette_scores, "bo-")
plt.xlabel("$k$", fontsize=14)
plt.ylabel("Silhouette score", fontsize=14)
plt.annotate('Optimal K',
             xy=(2, silhouette_scores[0]),
             xytext=(0.33, 0.33),
             textcoords='figure fraction',
             fontsize=12,
             arrowprops=dict(facecolor='black', shrink=0.1)
            )
plt.show()

### When `k=2`

In [None]:
kmeans = KMeans(n_clusters=2)
clusters = kmeans.fit_predict(scaled_credit)

In [None]:
numerical_credit.head(3)

In [None]:
plt.figure(figsize=(10, 12))
plt.subplot(311)
plt.scatter(scaled_credit[:, 0], scaled_credit[:, 2], c=kmeans.labels_, cmap='viridis')
plt.scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 2], s = 80, marker= 'x', color = 'k')
plt.xlabel('Age')
plt.ylabel('Credit')
plt.title('Age vs Credit')

plt.subplot(312)
plt.scatter(scaled_credit[:, 0], scaled_credit[:, 2], c=kmeans.labels_, cmap='viridis')
plt.scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 2], s=80, marker='x', color='k')
plt.xlabel('Credit')
plt.ylabel('Duration')
plt.title('Credit vs Duration')

plt.subplot(313)
plt.scatter(scaled_credit[:, 2], scaled_credit[:, 3], c=kmeans.labels_, cmap='viridis')
plt.scatter(kmeans.cluster_centers_[:, 2], kmeans.cluster_centers_[:, 3], s=120, marker='x', color='k')
plt.xlabel('Duration')
plt.ylabel('Age')
plt.title('Age vs Duration')

plt.tight_layout()
plt.show()

### When `k=4`

In [None]:
kmeans = KMeans(n_clusters=4)
clusters = kmeans.fit_predict(scaled_credit)

plt.figure(figsize=(10, 12))
plt.subplot(311)
plt.scatter(scaled_credit[:, 0], scaled_credit[:, 2], c=kmeans.labels_, cmap='viridis')
plt.scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 2], s = 80, marker= 'x', color = 'k')
plt.xlabel('Age')
plt.ylabel('Credit')
plt.title('Age vs Credit')

plt.subplot(312)
plt.scatter(scaled_credit[:, 0], scaled_credit[:, 2], c=kmeans.labels_, cmap='viridis')
plt.scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 2], s=80, marker='x', color='k')
plt.xlabel('Credit')
plt.ylabel('Duration')
plt.title('Credit vs Duration')

plt.subplot(313)
plt.scatter(scaled_credit[:, 2], scaled_credit[:, 3], c=kmeans.labels_, cmap='viridis')
plt.scatter(kmeans.cluster_centers_[:, 2], kmeans.cluster_centers_[:, 3], s=120, marker='x', color='k')
plt.xlabel('Duration')
plt.ylabel('Age')
plt.title('Age vs Duration')

plt.tight_layout()
plt.show()

## Data Transformations & Model Fit

In [None]:
# convert all column names to lowercase and remove spaces
def clean_column_names(df):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace(r'\s+', '_', regex=True)
    return df

In [None]:
credit = clean_column_names(credit)
credit.head(3)

In [None]:
credit.info()

In [None]:
credit['checking_account'].value_counts()

In [None]:
credit['saving_accounts'].value_counts()

In [None]:
# Append the cluster labels to the original dataframe
kmeans = KMeans(n_clusters=2)
kmeans.fit(scaled_credit)

df = credit.copy()
df['cluster'] = kmeans.labels_
df.head(3)

In [None]:
X = df.drop(columns=['risk'])
y = df['risk'].map(lambda x: 1 if x == 'bad' else 0)

In [None]:
y[:3], df['risk'][:3]

### Transformation Pipeline

In [None]:
X.head(3)

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

num_features = ['age', 'job', 'credit_amount', 'duration']
cat_features = ['sex', 'housing', 'saving_accounts', 'checking_account', 'purpose']

num_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='mean')),  # Fill missing numeric values with mean
    ('scaler', StandardScaler())
])

cat_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='unknown')),  # Replace NaN with 'unknown'
    ('encoder', OneHotEncoder(handle_unknown='ignore'))  # Encode categorical variables
])

preprocessor = ColumnTransformer(transformers=[
    ('num', num_transformer, num_features),
    ('cat', cat_transformer, cat_features)
], remainder='passthrough')  # Keep unlisted columns unchanged


### 1-Model fit

In [None]:
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.linear_model import LogisticRegression

# Apply the transformation
X_transformed = preprocessor.fit_transform(X)
lr = LogisticRegression()

cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
roc_aucs = cross_val_score(lr, X_transformed, y, scoring='roc_auc', cv=cv, n_jobs=-1)
accuracies = cross_val_score(lr, X_transformed, y, scoring='accuracy', cv=cv, n_jobs=-1)
recalls = cross_val_score(lr, X_transformed, y, scoring='recall', cv=cv, n_jobs=-1)
precisions = cross_val_score(lr, X_transformed, y, scoring='precision', cv=cv, n_jobs=-1)
f1s = cross_val_score(lr, X_transformed, y, scoring='f1', cv=cv, n_jobs=-1)

print('Mean ROC AUC: %.4f' % np.mean(roc_aucs))
print('Mean Accuracy: %.4f' % np.mean(accuracies))
print('Mean Recall: %.4f' % np.mean(recalls))
print('Mean Precision: %.4f' % np.mean(precisions))
print('Mean F1: %.4f' % np.mean(f1s))

In [None]:
import numpy as np
from sklearn.model_selection import cross_val_score, RepeatedStratifiedKFold
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import BaggingClassifier

# Apply the transformation
X_transformed = preprocessor.fit_transform(X)

# Create a bagging ensemble of logistic regression models
base_lr = LogisticRegression(solver='liblinear')  # Use liblinear solver for small datasets
bagged_lr = BaggingClassifier(estimator=base_lr, n_estimators=50, random_state=1, max_features=0.8, bootstrap=False, n_jobs=-1)

# Define cross-validation
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)

# Evaluate performance
roc_aucs = cross_val_score(bagged_lr, X_transformed, y, scoring='roc_auc', cv=cv, n_jobs=-1)
accuracies = cross_val_score(bagged_lr, X_transformed, y, scoring='accuracy', cv=cv, n_jobs=-1)
recalls = cross_val_score(bagged_lr, X_transformed, y, scoring='recall', cv=cv, n_jobs=-1)
precisions = cross_val_score(bagged_lr, X_transformed, y, scoring='precision', cv=cv, n_jobs=-1)
f1s = cross_val_score(bagged_lr, X_transformed, y, scoring='f1', cv=cv, n_jobs=-1)

# Print results
print('Mean ROC AUC: %.4f' % np.mean(roc_aucs))
print('Mean Accuracy: %.4f' % np.mean(accuracies))
print('Mean Recall: %.4f' % np.mean(recalls))
print('Mean Precision: %.4f' % np.mean(precisions))
print('Mean F1: %.4f' % np.mean(f1s))


In [None]:
import numpy as np
from sklearn.model_selection import cross_val_score, RepeatedStratifiedKFold
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, StackingClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

# Apply the transformation
X_transformed = preprocessor.fit_transform(X)

# Define base models
base_models = [
    ('logreg', LogisticRegression(solver='liblinear')),  # Logistic Regression
    ('rf', RandomForestClassifier(n_estimators=100, random_state=1))  # Random Forest
]

# Define meta-classifier (final model)
meta_classifier = LogisticRegression(solver='liblinear')

# Create stacking classifier
stacked_model = StackingClassifier(estimators=base_models, final_estimator=meta_classifier, n_jobs=-1)

# Define cross-validation
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)

# Evaluate performance
roc_aucs = cross_val_score(stacked_model, X_transformed, y, scoring='roc_auc', cv=cv, n_jobs=-1)
accuracies = cross_val_score(stacked_model, X_transformed, y, scoring='accuracy', cv=cv, n_jobs=-1)
recalls = cross_val_score(stacked_model, X_transformed, y, scoring='recall', cv=cv, n_jobs=-1)
precisions = cross_val_score(stacked_model, X_transformed, y, scoring='precision', cv=cv, n_jobs=-1)
f1s = cross_val_score(stacked_model, X_transformed, y, scoring='f1', cv=cv, n_jobs=-1)

# Print results
print('Mean ROC AUC: %.4f' % np.mean(roc_aucs))
print('Mean Accuracy: %.4f' % np.mean(accuracies))
print('Mean Recall: %.4f' % np.mean(recalls))
print('Mean Precision: %.4f' % np.mean(precisions))
print('Mean F1: %.4f' % np.mean(f1s))


### 2-Cluster Fit

In [None]:
X_transformed_0 = preprocessor.fit_transform(X[X.cluster == 0])
y_0 = y[X.cluster == 0]
lr = LogisticRegression()

cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
roc_aucs = cross_val_score(lr, X_transformed_0, y_0, scoring='roc_auc', cv=cv, n_jobs=-1)
accuracies = cross_val_score(lr, X_transformed_0, y_0, scoring='accuracy', cv=cv, n_jobs=-1)
recalls = cross_val_score(lr, X_transformed_0, y_0, scoring='recall', cv=cv, n_jobs=-1)
precisions = cross_val_score(lr, X_transformed_0, y_0, scoring='precision', cv=cv, n_jobs=-1)
f1s = cross_val_score(lr, X_transformed_0, y_0, scoring='f1', cv=cv, n_jobs=-1)

print('Mean ROC AUC: %.4f' % np.mean(roc_aucs))
print('Mean Accuracy: %.4f' % np.mean(accuracies))
print('Mean Recall: %.4f' % np.mean(recalls))
print('Mean Precision: %.4f' % np.mean(precisions))
print('Mean F1: %.4f' % np.mean(f1s))

In [None]:
X_transformed_1 = preprocessor.fit_transform(X[X.cluster == 1])
y_1 = y[X.cluster == 1]
lr = LogisticRegression()

cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
roc_aucs = cross_val_score(lr, X_transformed_1, y_1, scoring='roc_auc', cv=cv, n_jobs=-1)
accuracies = cross_val_score(lr, X_transformed_1, y_1, scoring='accuracy', cv=cv, n_jobs=-1)
recalls = cross_val_score(lr, X_transformed_1, y_1, scoring='recall', cv=cv, n_jobs=-1)
precisions = cross_val_score(lr, X_transformed_1, y_1, scoring='precision', cv=cv, n_jobs=-1)
f1s = cross_val_score(lr, X_transformed_1, y_1, scoring='f1', cv=cv, n_jobs=-1)

print('Mean ROC AUC: %.4f' % np.mean(roc_aucs))
print('Mean Accuracy: %.4f' % np.mean(accuracies))
print('Mean Recall: %.4f' % np.mean(recalls))
print('Mean Precision: %.4f' % np.mean(precisions))
print('Mean F1: %.4f' % np.mean(f1s))

### `cluster_id` as a feature

In [None]:
from sklearn.ensemble import RandomForestClassifier

# Apply the transformation
X_transformed = preprocessor.fit_transform(X)
rf = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=1)

cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
roc_aucs = cross_val_score(rf, X_transformed, y, scoring='roc_auc', cv=cv, n_jobs=-1)
accuracies = cross_val_score(rf, X_transformed, y, scoring='accuracy', cv=cv, n_jobs=-1)
recalls = cross_val_score(rf, X_transformed, y, scoring='recall', cv=cv, n_jobs=-1)
precisions = cross_val_score(rf, X_transformed, y, scoring='precision', cv=cv, n_jobs=-1)
f1s = cross_val_score(rf, X_transformed, y, scoring='f1', cv=cv, n_jobs=-1)

print('Mean ROC AUC: %.4f' % np.mean(roc_aucs))
print('Mean Accuracy: %.4f' % np.mean(accuracies))
print('Mean Recall: %.4f' % np.mean(recalls))
print('Mean Precision: %.4f' % np.mean(precisions))
print('Mean F1: %.4f' % np.mean(f1s))

In [None]:
## Grid search

from sklearn.model_selection import GridSearchCV

# Define the parameter grid for Random Forest hyperparameter tuning
param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth': [10, 20, None],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4],
    'max_features': ['auto', 'sqrt', 'log2']
}

# Define RandomForestClassifier model
rf = RandomForestClassifier(random_state=1)

# Define GridSearchCV with the parameter grid
grid_search = GridSearchCV(estimator=rf, param_grid=param_grid, cv=3, n_jobs=-1, scoring='roc_auc')

# Apply the transformation and perform GridSearchCV
X_transformed = preprocessor.fit_transform(X)

# Fit GridSearchCV with cross-validation
grid_search.fit(X_transformed, y)

# Print the best parameters found by GridSearchCV
print("Best Hyperparameters:", grid_search.best_params_)

In [None]:
from sklearn.ensemble import RandomForestClassifier

# Apply the transformation
X_transformed = preprocessor.fit_transform(X)
# rf = RandomForestClassifier(n_estimators=200, max_depth=10, random_state=1, max_features='sqrt', min_samples_leaf=4, min_samples_split=2)
rf = grid_search.best_estimator_

cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
roc_aucs = cross_val_score(rf, X_transformed, y, scoring='roc_auc', cv=cv, n_jobs=-1)
accuracies = cross_val_score(rf, X_transformed, y, scoring='accuracy', cv=cv, n_jobs=-1)
recalls = cross_val_score(rf, X_transformed, y, scoring='recall', cv=cv, n_jobs=-1)
precisions = cross_val_score(rf, X_transformed, y, scoring='precision', cv=cv, n_jobs=-1)
f1s = cross_val_score(rf, X_transformed, y, scoring='f1', cv=cv, n_jobs=-1)

print('Mean ROC AUC: %.4f' % np.mean(roc_aucs))
print('Mean Accuracy: %.4f' % np.mean(accuracies))
print('Mean Recall: %.4f' % np.mean(recalls))
print('Mean Precision: %.4f' % np.mean(precisions))
print('Mean F1: %.4f' % np.mean(f1s))

In [None]:
# Convert transformed array back to DataFrame (optional)
output_columns = (
    num_features + ['cluster'] + 
    list(preprocessor.named_transformers_['cat'].named_steps['encoder'].get_feature_names_out(cat_features))
)

In [None]:
# Generate feature importance ranking
feature_importances = rf.feature_importances_

# Create a DataFrame with feature names and their corresponding importance values
importance_df = pd.DataFrame({
    'Feature': output_columns,
    'Importance': feature_importances
})

# Sort the features by their importance in descending order
importance_df = importance_df.sort_values(by='Importance', ascending=False)

# Display the feature importance ranking
importance_df

## Other Considerations

In [None]:
credit2 = credit.copy()
credit2['risk'] = credit2['risk'].apply(lambda x: 0 if x == 'good' else 1)
credit2.head(3)

In [None]:
X = credit2.drop(columns=['risk'])
y = credit2['risk']

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

num_features = ['age', 'job', 'credit_amount', 'duration']
cat_features = ['sex', 'housing', 'saving_accounts', 'checking_account', 'purpose']

num_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='mean')),  # Fill missing numeric values with mean
    ('scaler', StandardScaler())
])

cat_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='unknown')),  # Replace NaN with 'unknown'
    ('encoder', OneHotEncoder(handle_unknown='ignore'))  # Encode categorical variables
])

preprocessor = ColumnTransformer(transformers=[
    ('num', num_transformer, num_features),
    ('cat', cat_transformer, cat_features)
], remainder='passthrough')  # Keep unlisted columns unchanged


In [None]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
X_train_transformed = preprocessor.fit_transform(X_train)
lr = LogisticRegression()
lr.fit(X_train_transformed, y_train)
X_test_transformed = preprocessor.transform(X_test)
y_test_pred = lr.predict(X_test_transformed)

# calculate accuracy, recall, precision, f1
accuracy = accuracy_score(y_test, y_test_pred)
precision = precision_score(y_test, y_test_pred)
recall = recall_score(y_test, y_test_pred)
f1 = f1_score(y_test, y_test_pred)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)


In [None]:
y_train_pred = lr.predict(X_train_transformed)

# calculate accuracy, recall, precision, f1
accuracy = accuracy_score(y_train, y_train_pred)
precision = precision_score(y_train, y_train_pred)
recall = recall_score(y_train, y_train_pred)
f1 = f1_score(y_train, y_train_pred)

print("Accuracy: {:.2f}".format(accuracy))
print("Precision: {:.2f}".format(precision))
print("Recall: {:.2f}".format(recall))
print("F1-score: {:.2f}".format(f1))

In [None]:
df_train = pd.concat([X_train, y_train], axis=1)
df_train['risk_pred'] = y_train_pred
df_train['accuracy'] = y_train_pred == y_train
df_train['accuracy'] = df_train['accuracy'].astype(int)
df_train.head()

In [None]:
df_train.groupby('purpose').agg(accuracy_mean=('accuracy', 'mean'), row_count=('accuracy', 'count')).reset_index()

In [None]:
df_test = pd.concat([X_test, y_test], axis=1)
df_test['risk_pred'] = y_test_pred
df_test['accuracy'] = y_test_pred == y_test
df_test['accuracy'] = df_test['accuracy'].astype(int)
df_test.head()

In [None]:
df_test.groupby('purpose').agg(accuracy_mean=('accuracy', 'mean'), row_count=('accuracy', 'count')).reset_index()

In [None]:
summary_train = df_train.groupby('purpose').agg(accuracy_mean=('accuracy', 'mean'), row_count=('accuracy', 'count')).reset_index()
summary_test = df_test.groupby('purpose').agg(accuracy_mean=('accuracy','mean'), row_count=('accuracy', 'count')).reset_index()
summary = pd.merge(
    summary_train.rename(columns={'accuracy_mean': 'accuracy_mean_train', 'row_count': 'row_count_train'}),
    summary_test.rename(columns={'accuracy_mean': 'accuracy_mean_test', 'row_count': 'row_count_test'}),
    on='purpose',
)
summary