# Task Four

Charlie wants to make her model work for future data sets, so she needs a general approach to generating the buckets. Given a set number of buckets corresponding to the number of input labels for the model, she would like to find out the boundaries that best summarize the data. You need to create a rating map that maps the FICO score of the borrowers to a rating where a lower rating signifies a better credit score.

The process of doing this is known as quantization. You could consider many ways of solving the problem by optimizing different properties of the resulting buckets, such as the mean squared error or log-likelihood (see below for definitions).

Mean squared error

You can view this question as an approximation problem and try to map all the entries in a bucket to one value, minimizing the associated squared error. We are now looking to minimize the following:

$$ \text{MSE} = \frac{1}{n} \sum_{i=1}^{n} ( Y_i - \hat{Y}_i)^2 $$

Log-likelihood

A more sophisticated possibility is to maximize the following log-likelihood function:

$$ LL(b_1,\ldots, b_{r-1}) = \sum_{i=1}^{r}[k_i \ln{p_i} + (n_i - k_i) \ln{(1 -p_i)}] $$

Where $b_i$ is the bucket boundaries, $n_i$ is the number of records in each bucket, $k_i$ is the number of defaults in each bucket, and $p_i = k_i / n_i$  is the probability of default in the bucket. This function considers how rough the discretization is and the density of defaults in each bucket. This problem could be addressed by splitting it into subproblems, which can be solved incrementally (i.e., through a dynamic programming approach). For example, you can break the problem into two subproblems, creating five buckets for FICO scores ranging from 0 to 600 and five buckets for FICO scores ranging from 600 to 850.

In [1]:
# Implement the above mentioned tasks to quantify FICO scores into seperate buckets

# Import the necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize

In [2]:
# Load the loan data and extract the FICO scores and default columns
loan_data = pd.read_csv('Loan_Data.csv')
fico_scores = loan_data['fico_score']
default_status = loan_data['default']

In [3]:
# Following the discussion above, we will employ different methods to bucket the FICO scores
# Method 1: Mean squared error minimization
def mse_bucketing(fico_scores, default_status, num_buckets):
    # Define the range of FICO scores
    min_fico = fico_scores.min()
    max_fico = fico_scores.max()
    
    # Initialize bucket boundaries
    bucket_boundaries = np.linspace(min_fico, max_fico, num_buckets + 1)
    
    # Function to calculate MSE for given bucket boundaries
    def calculate_mse(boundaries):
        mse = 0
        for i in range(len(boundaries) - 1):
            bucket_mask = (fico_scores >= boundaries[i]) & (fico_scores < boundaries[i + 1])
            bucket_defaults = default_status[bucket_mask]
            if len(bucket_defaults) > 0:
                default_rate = bucket_defaults.mean()
                mse += ((default_rate - default_status.mean()) ** 2) * len(bucket_defaults)
        return mse / len(fico_scores)
    
    # Optimize bucket boundaries to minimize MSE
    result = minimize(calculate_mse, bucket_boundaries, method='L-BFGS-B')
    
    optimized_boundaries = result.x
    return optimized_boundaries

In [4]:
# Method 2: Log-likelihood maximization
def log_likelihood_bucketing(fico_scores, default_status, num_buckets):
    # Define the range of FICO scores
    min_fico = fico_scores.min()
    max_fico = fico_scores.max()
    
    # Initialize bucket boundaries
    bucket_boundaries = np.linspace(min_fico, max_fico, num_buckets + 1)
    
    # Function to calculate negative log-likelihood for given bucket boundaries
    def calculate_neg_log_likelihood(boundaries):
        neg_log_likelihood = 0
        for i in range(len(boundaries) - 1):
            bucket_mask = (fico_scores >= boundaries[i]) & (fico_scores < boundaries[i + 1])
            bucket_defaults = default_status[bucket_mask]
            if len(bucket_defaults) > 0:
                default_rate = bucket_defaults.mean()
                # Avoid log(0) by adding a small constant
                default_rate = np.clip(default_rate, 1e-5, 1 - 1e-5)
                neg_log_likelihood -= np.sum(bucket_defaults * np.log(default_rate) + (1 - bucket_defaults) * np.log(1 - default_rate))
        return neg_log_likelihood
    
    # Optimize bucket boundaries to maximize log-likelihood (minimize negative log-likelihood)
    result = minimize(calculate_neg_log_likelihood, bucket_boundaries, method='L-BFGS-B')
    
    optimized_boundaries = result.x
    return optimized_boundaries

In [9]:
# Method 3: Dynamic programming approach

def dp_bucketing_nll_prebinned(
    fico_scores,
    default_status,
    num_buckets,
    num_prebins=100
):
    # Step 1: Sort data
    sorted_idx = np.argsort(fico_scores)
    fico = np.asarray(fico_scores)[sorted_idx]
    y = np.asarray(default_status)[sorted_idx]

    n = len(fico)

    # Step 2: Pre-binning (quantile-based)
    bin_edges = np.linspace(0, n, num_prebins + 1, dtype=int)

    prebinned_counts = []
    prebinned_defaults = []
    prebinned_fico = []

    for i in range(num_prebins):
        start, end = bin_edges[i], bin_edges[i + 1]
        if start == end:
            continue

        cnt = end - start
        dsum = y[start:end].sum()

        prebinned_counts.append(cnt)
        prebinned_defaults.append(dsum)
        prebinned_fico.append(fico[start])

    prebinned_counts = np.array(prebinned_counts)
    prebinned_defaults = np.array(prebinned_defaults)
    prebinned_fico = np.array(prebinned_fico)

    m = len(prebinned_counts)

    # Step 3: DP tables
    dp = np.full((m + 1, num_buckets + 1), np.inf)
    split = np.zeros((m + 1, num_buckets + 1), dtype=int)
    dp[0, 0] = 0.0

    eps = 1e-8

    # Step 4: DP recurrence with NLL cost
    for j in range(1, num_buckets + 1):
        for i in range(j, m + 1):
            best_cost = dp[i, j]

            for k in range(j - 1, i):
                total_count = prebinned_counts[k:i].sum()
                total_defaults = prebinned_defaults[k:i].sum()

                if total_count == 0:
                    continue

                p = np.clip(total_defaults / total_count, eps, 1 - eps)

                # Bernoulli negative log-likelihood
                nll = (
                    - total_defaults * np.log(p)
                    - (total_count - total_defaults) * np.log(1 - p)
                )

                cost = dp[k, j - 1] + nll

                if cost < best_cost:
                    best_cost = cost
                    split[i, j] = k

            dp[i, j] = best_cost

    # Step 5: Backtrack boundaries
    boundaries = [prebinned_fico[0]]
    idx = m

    for j in range(num_buckets, 0, -1):
        idx = split[idx, j]
        boundaries.append(prebinned_fico[idx])

    return sorted(set(boundaries))


In [6]:
# Compare the results from the three methods
num_buckets = 5
mse_boundaries = mse_bucketing(fico_scores, default_status, num_buckets)
print("MSE Bucketing Boundaries:", mse_boundaries)


MSE Bucketing Boundaries: [408.  496.4 584.8 673.2 761.6 850. ]


In [7]:
ll_boundaries = log_likelihood_bucketing(fico_scores, default_status, num_buckets)
print("Log-Likelihood Bucketing Boundaries:", ll_boundaries)

Log-Likelihood Bucketing Boundaries: [408.00000556 496.4        584.8        673.2        761.6
 849.99999991]


In [11]:
dp_boundaries = dp_bucketing_nll_prebinned(fico_scores, default_status, num_buckets, num_prebins=500)
print("Dynamic Programming Bucketing Boundaries:", dp_boundaries)

Dynamic Programming Bucketing Boundaries: [np.int64(408), np.int64(553), np.int64(611), np.int64(649), np.int64(720)]
