In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.metrics import confusion_matrix
import datetime as dt

In [2]:

# Load the dataset
df = pd.read_csv('risk-train.txt', delimiter='\t')  

# Replace missing values (represented by '?') with NaN
df.replace('?', pd.NA, inplace=True)

# Handle missing values
df.dropna(subset=['ORDER_ID', 'CLASS'], inplace=True)

# Convert date columns to datetime format
date_columns = ['B_BIRTHDATE', 'DATE_LORDER']
for col in date_columns:
    df[col] = pd.to_datetime(df[col], errors='coerce')

# Feature engineering on date columns
# Convert B_BIRTHDATE to age and DATE_LORDER to days since last order
df['AGE'] = df['B_BIRTHDATE'].apply(lambda x: (dt.datetime.now() - x).days // 365 if pd.notna(x) else 0)
df['DAYS_SINCE_LAST_ORDER'] = df['DATE_LORDER'].apply(lambda x: (dt.datetime.now() - x).days if pd.notna(x) else 0)

# Fill missing numerical values with 0 
numerical_columns = ['VALUE_ORDER', 'AMOUNT_ORDER', 'AMOUNT_ORDER_PRE', 'VALUE_ORDER_PRE', 'TIME_ORDER', 'AGE', 'DAYS_SINCE_LAST_ORDER', 'MAHN_AKT', 'MAHN_HOECHST']
df[numerical_columns] = df[numerical_columns].fillna(0)

# Fill missing values with unknown
df['Z_CARD_ART'].fillna('Unknown', inplace=True)
df['Z_LAST_NAME'].fillna('Unknown', inplace=True)

# Remove unnecessary columns
drop_columns = ['ANUMMER_01', 'ANUMMER_02', 'ANUMMER_03', 'ANUMMER_04', 'ANUMMER_05', 'ANUMMER_06', 'ANUMMER_07', 'ANUMMER_08', 'ANUMMER_09', 'ANUMMER_10', 'B_BIRTHDATE', 'DATE_LORDER']
df.drop(columns=drop_columns, inplace=True)

# Convert binary columns to boolean (using 1 and 0)
df['B_EMAIL'] = df['B_EMAIL'].map({'yes': 1, 'no': 0})
df['B_TELEFON'] = df['B_TELEFON'].map({'yes': 1, 'no': 0})
df['FLAG_LRIDENTISCH'] = df['FLAG_LRIDENTISCH'].map({'yes': 1, 'no': 0})
df['FLAG_NEWSLETTER'] = df['FLAG_NEWSLETTER'].map({'yes': 1, 'no': 0})
df['CHK_LADR'] = df['CHK_LADR'].map({'yes': 1, 'no': 0})
df['CHK_RADR'] = df['CHK_RADR'].map({'yes': 1, 'no': 0})
df['CHK_KTO'] = df['CHK_KTO'].map({'yes': 1, 'no': 0})
df['CHK_COOKIE'] = df['CHK_COOKIE'].map({'yes': 1, 'no': 0})
df['CHK_CARD'] = df['CHK_CARD'].map({'yes': 1, 'no': 0})
df['CHK_IP'] = df['CHK_IP'].map({'yes': 1, 'no': 0})
df['FAIL_LPLZ'] = df['FAIL_LPLZ'].map({'yes': 1, 'no': 0})
df['FAIL_LORT'] = df['FAIL_LORT'].map({'yes': 1, 'no': 0})
df['FAIL_LPLZORTMATCH'] = df['FAIL_LPLZORTMATCH'].map({'yes': 1, 'no': 0})
df['FAIL_RPLZ'] = df['FAIL_RPLZ'].map({'yes': 1, 'no': 0})
df['FAIL_RORT'] = df['FAIL_RORT'].map({'yes': 1, 'no': 0})
df['FAIL_RPLZORTMATCH'] = df['FAIL_RPLZORTMATCH'].map({'yes': 1, 'no': 0})
df['NEUKUNDE'] = df['NEUKUNDE'].map({'yes': 1, 'no': 0})
df['CLASS'] = df['CLASS'].map({'yes': 1, 'no': 0})

# Separate the target variable 
dep_var = df['CLASS']

# Drop the target column from the main data
indp_var = df.drop(columns=['CLASS'])

# Perform one-hot encoding on the remaining features
indp_var = pd.get_dummies(indp_var, prefix=None, drop_first=True)

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['Z_CARD_ART'].fillna('Unknown', inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['Z_LAST_NAME'].fillna('Unknown', inplace=True)


In [3]:
# TN = confusion_matrix[0,0]
# FP = confusion_matrix[0,1]
# FN = confusion_matrix[1,0]
# TP = confusion_matrix[1,1]

def custom_precision(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    
    TP = cm[1, 1]  # True Positives
    FP = cm[0, 1]  # False Positives
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    return precision

def custom_recall(y_true, y_pred):
    # Generate the confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    # Calculate True Positives and False Negatives
    TP = cm[1, 1]  # True Positives (actual high-risk correctly predicted as high-risk)
    FN = cm[1, 0]  # False Negatives (actual high-risk incorrectly predicted as low-risk)
    
    # Calculate Recall
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    return recall

def custom_f1_score(y_true, y_pred):
    precision = custom_precision(y_true, y_pred) * 100
    recall = custom_recall(y_true, y_pred) * 100
    f1_score = 2 * ((precision * recall) / (precision + recall)) if (precision + recall) > 0 else 0
    return f1_score

In [4]:
from sklearn.tree import DecisionTreeClassifier
# Define cost-sensitive metric function as you have it
def cost_sensitive_metric(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    print("Confusion Matrix \n", cm)
    FN = cm[1, 0]
    FP = cm[0, 1]
    
    # Applying the given cost matrix
    cost = (FN * 50) + (FP * 5)
    return cost

class CostSensitiveDecisionTree(BaseEstimator, ClassifierMixin):
    def __init__(self, max_depth=15, min_samples_split=3, threshold=0.5, random_state=42):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.threshold = threshold
        self.tree = DecisionTreeClassifier(max_depth=self.max_depth, min_samples_split=self.min_samples_split, class_weight={0: 1, 1: 17}, random_state=random_state)

    def fit(self, X, y):
        # Fit the internal decision tree model
        self.tree.fit(X, y)
        return self

    def predict(self, X):
        # Predict probabilities and apply threshold for cost-sensitivity
        raw_preds = self.tree.predict_proba(X)[:, 1]
        return np.where(raw_preds > self.threshold, 1, 0)  # Threshold is set at 0.5

    def recall(self, X, y):
        y_pred = self.predict(X)
        return custom_recall(y, y_pred)
    
    def precision(self, X, y):
        y_pred = self.predict(X)
        return custom_precision(y, y_pred)
    
    def f1_score(self, X, y):
        y_pred = self.predict(X)
        return custom_f1_score(y, y_pred)
    
    def cost_metric(self, X, y):
        y_pred = self.predict(X)
        return cost_sensitive_metric(y, y_pred)

In [5]:
# Train-test split
x_train, x_test, y_train, y_test = train_test_split(indp_var, dep_var, test_size=0.2, random_state=42, stratify=dep_var)

# Instantiate the model with desired hyperparameters
model = CostSensitiveDecisionTree(max_depth=5, min_samples_split=10, threshold=0.66, random_state=42)
model.fit(x_train, y_train)

recall_train = model.recall(x_train,y_train)
recall_test = model.recall(x_test, y_test)

print("Training Data:")
cost_train = model.cost_metric(x_train,y_train)
print(f"Cost on training data: {cost_train}")
print(f"recall on training data: {recall_train*100:.2f} %")


print("\nTesting Data:")
cost_test = model.cost_metric(x_test, y_test)
print(f"Cost on testing data: {cost_test}")
print(f"recall on testing data: {recall_test*100:.2f} %")


Training Data:
Confusion Matrix 
 [[18765  3838]
 [  673   724]]
Cost on training data: 52840
recall on training data: 51.83 %

Testing Data:
Confusion Matrix 
 [[4692  959]
 [ 164  185]]
Cost on testing data: 12995
recall on testing data: 53.01 %
