In [1]:
!unzip dsb-24-german-credit.zip

Archive:  dsb-24-german-credit.zip
  inflating: german_credit_test.csv  
  inflating: german_credit_test_submission.csv  
  inflating: german_credit_train.csv  


In [2]:
numerical_cols = [
    'LoanDuration',
    'LoanAmount',
    'InstallmentPercent',
    'CurrentResidenceDuration',
    'Age',
    'ExistingCreditsCount',
]
categorical_cols = [
  'CheckingStatus',
  'CreditHistory',
 'LoanPurpose',
 'ExistingSavings',
 'EmploymentDuration',
 'Sex',
 'OthersOnLoan',
 'OwnsProperty',
 'InstallmentPlans',
 'Housing',
 'Job',
 'Dependents',
 'Telephone',
 'ForeignWorker',]

In [3]:
import pandas as pd
df = pd.read_csv('german_credit_train.csv')

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from xgboost import XGBClassifier, XGBRegressor
from sklearn.metrics import accuracy_score, mean_squared_error

# Load dataset (replace with actual data)

# Define target and features
target = "Risk"
X = df.drop(columns=[target])
y = df[target]

# Binary encode target variable if classification
y = LabelEncoder().fit_transform(y) if y.nunique() == 2 else y

# Identify categorical and numerical columns
cat_cols = categorical_cols
num_cols = numerical_cols

# Preprocessing for numerical data
num_transformer = Pipeline([
    #('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler())
])

# Preprocessing for categorical data
cat_transformer = Pipeline([
    #('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

# Combine preprocessors in a column transformer
preprocessor = ColumnTransformer([
    ('num', num_transformer, num_cols),
    ('cat', cat_transformer, cat_cols)
])


In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import numpy as np


In [63]:
# Split into train and test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

X_train, y_train = X,y

loan_amounts_train = np.array(X_train['LoanAmount'])
loan_amounts_test = np.array(X_test['LoanAmount'])
X_train = preprocessor.fit_transform(X_train)
X_test = preprocessor.transform(X_test)


In [64]:
# PyTorch Dataset class
class CustomDataset(Dataset):
    def __init__(self, X, y, loan_amounts):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32).unsqueeze(1)
        self.loan_amounts = torch.tensor(loan_amounts, dtype=torch.float32).unsqueeze(1)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx], self.loan_amounts[idx]

  # Load data into PyTorch Dataloader
train_dataset = CustomDataset(X_train, y_train, loan_amounts_train)
test_dataset = CustomDataset(X_test, y_test, loan_amounts_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [65]:
# Define a simple feedforward network
class BinaryClassifier(nn.Module):
    def __init__(self, input_dim):
        super(BinaryClassifier, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(16, 12),
            nn.ReLU(),
            nn.Linear(12, 8),
            nn.ReLU(),
            nn.Linear(8, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.fc(x)

In [98]:
from sklearn.preprocessing import MinMaxScaler
# Custom Cost Function based on Loan Amount
def compute_costs(loan_amount):
    #loan_amount = StandardScaler().fit_transform(loan_amount.reshape(-1, 1))
    #loan_amount = loan_amount.flatten()
    loan_amount = MinMaxScaler().fit_transform(loan_amount.reshape(-1, 1))
    loan_amount = loan_amount.flatten()
    return {
        'Risk_No Risk': 5.0 + 0.6 * loan_amount, #'Risk_No Risk': 5, # 'Risk_No Risk': 5.0 + 0.6 * loan_amount,
        'No Risk_No Risk': 1.0 - 0.05 * loan_amount, #'No Risk_No Risk': -1, # 'No Risk_No Risk': 1.0 - 0.05 * loan_amount,
        'Risk_Risk': 1.0,
        'No Risk_Risk': 1.0
    }

class LoanWeightedLoss(nn.Module):
    def __init__(self):
        super(LoanWeightedLoss, self).__init__()
        self.bce = nn.BCELoss(reduction='none')

    def forward(self, outputs, targets, loan_amounts):
        outputs = torch.clamp(outputs, 1e-7, 1 - 1e-7)  # Prevent log(0) errors
        costs = compute_costs(loan_amounts)

        real_prop = {'Risk': 0.02, 'No Risk': 0.98}
        train_prop = {'Risk': 1/3, 'No Risk': 2/3}
        custom_weight = {
            'Risk': real_prop['Risk'] / train_prop['Risk'],
            'No Risk': real_prop['No Risk'] / train_prop['No Risk']
        }

        base_loss = self.bce(outputs, targets)

        weight_matrix = (
            (targets == 1) * custom_weight['Risk'] *
            ((outputs >= 0.5) * costs['Risk_Risk'] + (outputs < 0.5) * costs['Risk_No Risk']) +
            (targets == 0) * custom_weight['No Risk'] *
            ((outputs >= 0.5) * costs['No Risk_Risk'] + (outputs < 0.5) * costs['No Risk_No Risk'])
        )

        loss = weight_matrix * base_loss
        return loss.mean()

class proportionWeightedLoss(nn.Module):
    def __init__(self):
        super(proportionWeightedLoss, self).__init__()
        self.bce = nn.BCELoss(reduction='none')  # Compute BCE loss without reduction

    def forward(self, outputs, targets):
        outputs = torch.clamp(outputs, 1e-7, 1 - 1e-7)  # Avoid log(0) issues

        # Define custom weight proportions
        real_prop = {'Risk': 0.02, 'No Risk': 0.98}
        train_prop = {'Risk': 1/3, 'No Risk': 2/3}
        custom_weight = {
            'Risk': real_prop['Risk'] / train_prop['Risk'],
            'No Risk': real_prop['No Risk'] / train_prop['No Risk']
        }

        # Compute weighted BCE loss
        weights = (targets * custom_weight['Risk']) + ((1 - targets) * custom_weight['No Risk'])
        loss = self.bce(outputs, targets) * weights

        return loss.mean()


In [99]:
# Model setup
model = BinaryClassifier(input_dim=X_train.shape[1])
criterion = LoanWeightedLoss()
#criterion = nn.BCELoss()
#criterion = proportionWeightedLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

def train_model(model, train_loader, criterion, optimizer, epochs=10):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for X_batch, y_batch, loan_amounts in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch,
                             loan_amounts
                             )
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}")

def evaluate_model(model, test_loader):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for X_batch, y_batch, loan_amounts in test_loader:
            outputs = model(X_batch)
            predicted = (outputs > 0.5).float()
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()
    print(f"Test Accuracy: {100 * correct / total:.2f}%")

# Train and evaluate
train_model(model, train_loader, criterion, optimizer, epochs=100)
evaluate_model(model, test_loader)


Epoch 1/100, Loss: 0.2929
Epoch 2/100, Loss: 0.2536
Epoch 3/100, Loss: 0.2455
Epoch 4/100, Loss: 0.2310
Epoch 5/100, Loss: 0.2209
Epoch 6/100, Loss: 0.2188
Epoch 7/100, Loss: 0.2174
Epoch 8/100, Loss: 0.2125
Epoch 9/100, Loss: 0.2160
Epoch 10/100, Loss: 0.2110
Epoch 11/100, Loss: 0.2104
Epoch 12/100, Loss: 0.2077
Epoch 13/100, Loss: 0.2142
Epoch 14/100, Loss: 0.2064
Epoch 15/100, Loss: 0.2051
Epoch 16/100, Loss: 0.1984
Epoch 17/100, Loss: 0.2012
Epoch 18/100, Loss: 0.1980
Epoch 19/100, Loss: 0.1971
Epoch 20/100, Loss: 0.1918
Epoch 21/100, Loss: 0.1970
Epoch 22/100, Loss: 0.1894
Epoch 23/100, Loss: 0.1850
Epoch 24/100, Loss: 0.1867
Epoch 25/100, Loss: 0.1881
Epoch 26/100, Loss: 0.1777
Epoch 27/100, Loss: 0.1815
Epoch 28/100, Loss: 0.1757
Epoch 29/100, Loss: 0.1730
Epoch 30/100, Loss: 0.1712
Epoch 31/100, Loss: 0.1763
Epoch 32/100, Loss: 0.1656
Epoch 33/100, Loss: 0.1741
Epoch 34/100, Loss: 0.1657
Epoch 35/100, Loss: 0.1607
Epoch 36/100, Loss: 0.1650
Epoch 37/100, Loss: 0.1634
Epoch 38/1

In [86]:
# prompt: open the german credit test file, and run the model above on it. make sure to use the loan amounts separately

import pandas as pd
import numpy as np
# Load the German Credit test dataset
df_test = pd.read_csv('german_credit_test.csv')

# Separate loan amounts for the test set
loan_amounts_test_new = np.array(df_test['LoanAmount'])

# Preprocess the test data using the same preprocessor fitted on the training data
X_test_new = preprocessor.transform(df_test)

# Create a PyTorch Dataset for the test data
test_dataset_new = CustomDataset(X_test_new, np.zeros(len(X_test_new)), loan_amounts_test_new) # Placeholder for y_test, as it's not used in evaluation.
test_loader_new = DataLoader(test_dataset_new, batch_size=32, shuffle=False)

# Evaluate the model on the new test data
def predict_on_test_data(model, test_loader):
    model.eval()
    predictions = []
    with torch.no_grad():
        for X_batch, _, loan_amounts in test_loader:
            outputs = model(X_batch)
            predicted = (outputs > 0.5).int().tolist()
            predictions.append(predicted)
    return predictions
def flatten_list(nested_list):
    flat_list = []
    for item in nested_list:
        if isinstance(item, list):
            flat_list.extend(flatten_list(item))  # Recursively flatten
        else:
            flat_list.append(item)
    return flat_list
# Make predictions on the new test data
predictions = predict_on_test_data(model, test_loader_new)
predictions = flatten_list(predictions)

In [87]:
# Optionally save predictions to a file
predictions_df = pd.DataFrame({
    'Id': df_test.index,
    'Risk': predictions}
                              )
predictions_df.to_csv('german_credit_test_predictions.csv', index=False)
predictions_df.Risk.value_counts()

Unnamed: 0_level_0,count
Risk,Unnamed: 1_level_1
1,1001


In [88]:
le = LabelEncoder()
le.fit(df['Risk'])
y_pred_final_labels = le.inverse_transform(predictions)


# Optionally save predictions to a file
predictions_df = pd.DataFrame({
    'Id': df_test.index,
    'Risk': y_pred_final_labels # Use 'TARGET' as column name
})
#print(predictions_df['TARGET'].value_counts())
predictions_df.to_csv('german_credit_test_predictions.csv', index=False)

In [89]:
predictions_df.Risk.value_counts()

Unnamed: 0_level_0,count
Risk,Unnamed: 1_level_1
Risk,1001
