In [12]:
import pandas as pd
import numpy as np
import string

In [4]:
df = pd.read_csv("../dataproject2025.csv", index_col=0)

In [5]:
df.drop(columns=['Predictions', 'Predicted probabilities'], inplace=True)

In [7]:
df.dropna(inplace=True)

In [10]:
def get_features(df: pd.DataFrame()) -> pd.DataFrame():
    """
    Get additional features.
    """        

    df_with_features = (
        df
        .assign(

            # logs --> to money, not months or so
            annual_inc_log = np.log1p(df["annual_inc"]),
            avg_cur_bal_log = np.log1p(df["avg_cur_bal"]),
            fico_range_high_log = np.log1p(df["fico_range_high"]),
            revol_bal_log = np.log1p(df["revol_bal"]),

            # broader zip code area
            zip_code2 = np.round(df["zip_code"] / 10, 0),

            # total balance?
            cur_balance = df["avg_cur_bal"] * df["open_acc"],

            # flags
            delinq_2yrs_flag = df["delinq_2yrs"] >= 1,
            tax_liens_flag = df["tax_liens"] >= 1,

            # shares
            s_actv_bc_tl = df["num_actv_bc_tl"] / (df["open_acc"] + 1e-6),
            s_bc_tl = df["num_bc_tl"] / (df["open_acc"] + 1e-6),
            s_il_tl = df["num_il_tl"] / (df["open_acc"] + 1e-6),
            s_rev_accts = df["num_rev_accts"] / (df["open_acc"] + 1e-6),

            # interactions
            int_rate_x_duration = df["int_rate"] * df["loan duration"], # higher rates are even riskier on 60 vs 36
            dti_x_util = df["dti"] * (df["revol_util"] / 100.0), # debt burden (DTI) is more problematic if utilization of their cards/lines is also high
            revol_bal_income_ratio = df["revol_bal"] / (df["annual_inc"] + 1e-6), # leverage: outstanding revolving balance / income
            fico_x_dti = df["fico_range_high"] * df["dti"], # same DTI can mean different risk depending on FICO score; "do they manage well or not?"
        )
    )

    return df_with_features

df_engineered = get_features(df)

In [13]:
def categorical_encoding(df: pd.DataFrame) -> pd.DataFrame:
  """Encodings of categorical variables."""

  df_encoded = df.copy()

  # grade to numeric
  grade_map = {c: i+1 for i, c in enumerate(string.ascii_uppercase[:7])}
  df_encoded["grade_num"] = df_encoded["grade"].map(grade_map)

  # sub_grade to numeric
  sg = df_encoded["sub_grade"].astype(str).str.upper().str.strip()
  letter = sg.str[0]
  number = pd.to_numeric(sg.str[1:].str.extract(r"(\d+)", expand=False), errors="coerce")
  letter_map = {ch: i+1 for i, ch in enumerate("ABCDEFG")}
  base = letter.map(letter_map)
  sub_grade_num = (base - 1) * 5 + number
  df_encoded["sub_grade_num"] = sub_grade_num.astype("float32")

  # emp_length to numeric; map prob cleanest; maybe 10+ different?
  emp_length_map = {
    '< 1 year': 0,
    '1 year': 1,
    '2 years': 2,
    '3 years': 3,
    '4 years': 4,
    '5 years': 5,
    '6 years': 6,
    '7 years': 7,
    '8 years': 8,
    '9 years': 9,
    '10+ years': 10
  }

  df_encoded["emp_length_num"] = df_encoded["emp_length"].map(emp_length_map).astype("float32")

  # one-hot
  onehot_cols = ["home_ownership", "purpose", "emp_title"]
  df_encoded = pd.get_dummies(df_encoded, columns=onehot_cols, prefix=onehot_cols, drop_first=True)

  # drop originals
  df_encoded = df_encoded.drop(columns=["grade", "sub_grade", "emp_length"])

  return df_encoded

df_encoded = categorical_encoding(df_engineered)

In [None]:
from collections import Counter
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
import pandas as pd

class PLTR(BaseEstimator, RegressorMixin):
    def __init__(self, n_estimators=80, max_depth=3, random_state=None, granularity=2, k=5, feature_names=None):
        self.granularity = granularity #how much the rounding should be
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.random_state = random_state
        self.trees_ = []
        self.k = k
        self.feature_names = feature_names

    # def tree_fit(self, X, y):
    #     self.trees_ = []
    #     for i in range(self.n_estimators):
    #         tree = DecisionTreeClassifier(
    #             max_depth=self.max_depth,
    #             random_state=None if self.random_state is None else self.random_state + i
    #         )
    #         tree.fit(X, y)
    #         self.trees_.append(tree)
    #     return self
    
    def tree_fit(self, X, y):
        #boosting
        n_features = X.shape[1]
        n_hide = int(np.sqrt(n_features))
        self.trees_ = []
        self.hidden_features_ = []

        rng = np.random.RandomState(self.random_state)

        for i in range(self.n_estimators):
            # pick features to hide
            hide_idx = rng.choice(n_features, size=n_hide, replace=False)

            # make a copy and zero out hidden features
            X_mod = X.copy()
            X_mod.iloc[:, hide_idx] = 0  

            tree = DecisionTreeClassifier(
                max_depth=self.max_depth,
                random_state=None if self.random_state is None else self.random_state + i
            )
            tree.fit(X_mod, y)

            self.trees_.append(tree)
            self.hidden_features_.append(hide_idx)

        return self

    def tree_predict(self, X):
        preds = [tree.predict(X) for tree in self.trees_]
        return sum(preds) / len(preds)

    def identify(self, k=5, feature_names=None):
        """
        Identify the most common (root split, child split) pairs across trees.

        Parameters
        ----------
        k : int
            Number of most common split pairs to return.
        feature_names : list, optional
            Names of features. If None, indices are returned.
        """
        pair_counter = Counter()

        for tree in self.trees_:
            t = tree.tree_

            # Root node
            

            if t.feature[0] != -2:  # -2 means leaf
                f_root = t.feature[0]
                thr_root = round(t.threshold[0], self.granularity)
                f_root_name = feature_names[f_root] if feature_names is not None else f_root

                # Look at children
                for child in [t.children_left[0], t.children_right[0]]:
                    if child != -1 and t.feature[child] != -2:
                        f_child = t.feature[child]
                        thr_child = round(t.threshold[child], self.granularity)
                        f_child_name = feature_names[f_child] if feature_names is not None else f_child

                        pair_counter[((f_root_name, thr_root), (f_child_name, thr_child))] += 1

        #print(pair_counter)

        self.pairs_identified = pair_counter.most_common(k)
        self.pairs_identified = [pair for pair, count in self.pairs_identified]
        return self.pairs_identified
    

    def create_binary_vars(self, X, split_pairs, feature_names=None):
        """
        Create binary variables from root-child split pairs.
        
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Input data.
        split_pairs : list of tuples
            Each element is ((root_feature, root_thr), (child_feature, child_thr)).
        feature_names : list, optional
            Names of features. If None, indices are used.
            
        Returns
        -------
        df_bin : pd.DataFrame
            DataFrame with 2 binary variables per pair.
        """
        X = np.array(X)
        n_samples = X.shape[0]
        df_bin = pd.DataFrame()
        
        for i, ((f_root, thr_root), (f_child, thr_child)) in enumerate(split_pairs):
            # Get feature indices if names are provided
            # f_root_idx = feature_names.index(f_root) if feature_names else f_root
            # f_child_idx = feature_names.index(f_child) if feature_names else f_child
            f_root_idx = f_root
            f_child_idx = f_child
            
            # Binary var 1: root split
            bin1 = (X[:, f_root_idx] > thr_root).astype(int)
            
            # Binary var 2: child split only where bin1 == 0
            bin2 = np.zeros(n_samples, dtype=int)
            mask = bin1 == 0
            bin2[mask] = (X[mask, f_child_idx] > thr_child).astype(int)
            
            df_bin[f"{feature_names[f_root]} > {thr_root}"] = bin1
            df_bin[f"{feature_names[f_root]} < {thr_root}x{feature_names[f_child]} > {thr_child}"] = bin2
        
        self.modified_df = df_bin
        return self.modified_df

    def fit(self, X, y, adaptive_lasso=True):

        if adaptive_lasso:
            self.tree_fit(X, y)
            self.identify(k=self.k)
            self.create_binary_vars(X, self.pairs_identified, feature_names=self.feature_names)
            modified_input_full = pd.concat([X, self.modified_df], axis=1)

            lr = LogisticRegression(penalty="l2", solver="liblinear", max_iter=3000)
            lr.fit(modified_input_full, y)

            theta0 = lr.coef_.flatten()
            self.weights_alasso = np.power(np.abs(theta0) + 1e-6, -1)  
            self.modified_input_full = modified_input_full / self.weights_alasso[np.newaxis, :]

            final_clf = LogisticRegression(penalty="l1", solver="liblinear",
                                        max_iter=3000)
            self.pltr = final_clf.fit(self.modified_input_full, y)

            return self
        
        else:
            self.pltr = LogisticRegression(max_iter=3000)
            self.tree_fit(X, y)
            self.identify(k=self.k)
            self.create_binary_vars(X, self.pairs_identified, feature_names=self.feature_names)
            self.modified_input_full = pd.concat([X, self.modified_df], axis=1)
            self.pltr.fit(self.modified_input_full, y)
            return self 
    
    def predict(self, X):
        return self.pltr.predict(X)

    def predict_proba(self, X):
        return self.pltr.predict_proba(X)

In [27]:
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import r2_score, accuracy_score, classification_report, confusion_matrix


target_col = 'target'
# Separate features and target
X = df_encoded.drop(columns=[target_col]).reset_index(drop=True)
y = df_encoded[target_col]

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"Training set size: {X_train.shape}")
print(f"Test set size: {X_test.shape}")
print(f"Target distribution in training set:")
print(y_train.value_counts(normalize=True))
print("\n" + "="*50 + "\n")


Training set size: (478378, 112)
Test set size: (119595, 112)
Target distribution in training set:
target
0.0    0.797888
1.0    0.202112
Name: proportion, dtype: float64




In [28]:
cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)


In [29]:
import warnings
warnings.simplefilter(action="ignore", category=FutureWarning)
pltr = PLTR(k=10, random_state=10, feature_names=X.columns)

In [41]:
pltr.modified_input_full

Unnamed: 0,issue_d,loan duration,annual_inc,avg_cur_bal,bc_open_to_buy,bc_util,delinq_2yrs,dti,fico_range_high,funded_amnt,...,sub_grade_num < 13.5xsub_grade_num > 8.5,sub_grade_num < 13.5xint_rate_x_duration > 17.82,grade_num > 3.5,grade_num < 3.5xgrade_num > 2.5,grade_num < 3.5xint_rate_x_duration > 17.82,sub_grade_num < 13.5xgrade_num > 4.5,sub_grade_num < 13.5xsub_grade_num > 20.5,int_rate > 15.36,int_rate < 15.36xint_rate > 11.18,int_rate < 15.36xint_rate_x_duration > 17.82
0,2.309374,0.0,0.152838,0.063752,0.052091,0.008290,0.000000,0.002336,4.783109,0.206971,...,0.000000,0.0,0.000000,0.000000,0.0,0.0,0.0,0.00000,0.000000,0.0
1,2.309374,0.0,0.097973,0.014269,0.213545,0.003225,0.000000,0.007281,4.783109,0.289759,...,0.000000,0.0,0.001829,0.000000,0.0,0.0,0.0,0.00178,0.000000,0.0
2,2.308227,0.0,0.032658,0.039336,0.000792,0.010343,0.000000,0.013609,4.616798,0.103485,...,0.000000,0.0,0.001829,0.000000,0.0,0.0,0.0,0.00178,0.000000,0.0
3,2.309374,0.0,0.125405,0.006850,0.043109,0.003576,0.000000,0.000762,4.716584,0.103485,...,0.000436,0.0,0.000000,0.000215,0.0,0.0,0.0,0.00000,0.000146,0.0
4,2.307081,0.0,0.043108,0.126359,0.025414,0.006836,0.000724,0.004574,4.550273,0.108660,...,0.000000,0.0,0.000000,0.000000,0.0,0.0,0.0,0.00000,0.000000,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
478373,2.309374,0.0,0.150225,0.048770,0.005759,0.010870,0.000000,0.008462,4.550273,0.351851,...,0.000000,0.0,0.000000,0.000215,0.0,0.0,0.0,0.00000,0.000146,0.0
478374,2.307081,0.0,0.267793,0.233735,0.056797,0.008384,0.000000,0.006706,4.916158,0.144880,...,0.000000,0.0,0.000000,0.000000,0.0,0.0,0.0,0.00000,0.000000,0.0
478375,2.310520,0.0,0.150225,0.113393,0.029676,0.011492,0.000000,0.008313,4.683322,0.362199,...,0.000436,0.0,0.000000,0.000000,0.0,0.0,0.0,0.00000,0.000146,0.0
478376,2.310520,0.0,0.043108,0.033062,0.000893,0.011468,0.000000,0.005822,4.483749,0.103485,...,0.000000,0.0,0.000000,0.000215,0.0,0.0,0.0,0.00000,0.000146,0.0


In [44]:
# Cross-validation for XGBoost
# print("Performing cross-validation...")

X_train.reset_index(drop=True, inplace=True)
X_test.reset_index(drop=True, inplace=True)
y_train.reset_index(drop=True, inplace=True)
y_test.reset_index(drop=True, inplace=True)


#the implementation is too sensitive to changing indices - and so this type of cross val doesn't work
# pltr_cv_scores = cross_val_score(pltr, X_train, y_train, cv=cv, scoring='accuracy')

# print(f"PLTR CV Scores: {pltr_cv_scores}")
# print(f"PLTR CV Mean: {pltr_cv_scores.mean():.4f} (+/- {pltr_cv_scores.std() * 2:.4f})")

# Train XGBoost on full training set
print("Training PLTR on full training set...")
pltr.fit(X_train, y_train)


# # Plot confusion matrices
# plt.subplot(2, 2, 2)
# xgb_cm = confusion_matrix(y_test, xgb_test_pred)
# sns.heatmap(xgb_cm, annot=True, fmt='d', cmap='Blues')
# plt.title('XGBoost Confusion Matrix')
# plt.ylabel('True Label')
# plt.xlabel('Predicted Label')

# # Feature importance comparison (top 10 features)
# plt.subplot(2, 2, 4)
# xgb_importance = xgb_model.feature_importances_
# rf_importance = rf_model.feature_importances_

# # Get top 10 features from XGBoost
# top_features_idx = np.argsort(xgb_importance)[-10:]
# top_features = X.columns[top_features_idx]

# x_pos = np.arange(len(top_features))
# plt.barh(x_pos - 0.2, xgb_importance[top_features_idx], 0.4, label='XGBoost', alpha=0.8)
# plt.barh(x_pos + 0.2, rf_importance[top_features_idx], 0.4, label='Random Forest', alpha=0.8)
# plt.yticks(x_pos, top_features)
# plt.xlabel('Feature Importance')
# plt.title('Top 10 Feature Importance Comparison')
# plt.legend()

# plt.tight_layout()
# plt.show()


Training PLTR on full training set...


0,1,2
,n_estimators,80
,max_depth,3
,random_state,10
,granularity,2
,k,10
,feature_names,"Index(['issue...', length=112)"


In [45]:
#we need to encode the test set with the new featuers:
# Predictions
pltr_train_pred = pltr.predict(pltr.modified_input_full)

pltr.create_binary_vars(X_test, pltr.pairs_identified, feature_names=pltr.feature_names)
modified_input_full = pd.concat([X_test, pltr.modified_df], axis=1)
pltr_test_pred = pltr.predict(modified_input_full)

# Evaluate XGBoost
pltr_train_acc = accuracy_score(y_train, pltr_train_pred)
pltr_test_acc = accuracy_score(y_test, pltr_test_pred)

print(f"PLTR Training Accuracy: {pltr_train_acc:.4f}")
print(f"PLTR Test Accuracy: {pltr_test_acc:.4f}")

print("\nPLTR Classification Report (Test Set):")
print(classification_report(y_test, pltr_test_pred))


PLTR Training Accuracy: 0.7992
PLTR Test Accuracy: 0.7851

PLTR Classification Report (Test Set):
              precision    recall  f1-score   support

         0.0       0.80      0.97      0.88     95423
         1.0       0.31      0.05      0.09     24172

    accuracy                           0.79    119595
   macro avg       0.56      0.51      0.48    119595
weighted avg       0.70      0.79      0.72    119595

