In [2]:
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader
import torch.nn.functional as F
from sklearn.preprocessing import MinMaxScaler
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, ConfusionMatrixDisplay

In [1]:
# create a function to evaluate the models
def evaluate(y_test, y_pred):
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)

    return [accuracy, precision, recall, f1]

In [4]:
# Import data
data = pd.read_excel("C:\\Users\\Phil\\CaseStudy\\PSP_DATA_PREP.xlsx")
scaler = MinMaxScaler()
data['hour'] = scaler.fit_transform(data['hour'].to_numpy().reshape(-1,1))
data['day_of_week'] = scaler.fit_transform(data['day_of_week'].to_numpy().reshape(-1,1))
data['month'] = scaler.fit_transform(data['month'].to_numpy().reshape(-1,1))

data_PSP = pd.get_dummies(data, columns=['PSP'], dtype=int)

data_PSP.head()

Unnamed: 0,tmsp,attempts,success,3D_secured,fee,hour,day_of_week,month,amount_norm,country_Austria,country_Germany,country_Switzerland,card_Diners,card_Master,card_Visa,PSP_Goldcard,PSP_Moneycard,PSP_Simplecard,PSP_UK_Card
0,2019-01-01 00:01:11,2,1,0,4.0,0.0,0.166667,0.0,0.133013,0,1,0,0,0,1,0,0,0,1
1,2019-01-01 00:02:49,2,1,1,4.0,0.0,0.166667,0.0,0.371795,0,1,0,1,0,0,0,0,0,1
2,2019-01-01 00:04:33,1,0,0,0.5,0.0,0.166667,0.0,0.189103,1,0,0,1,0,0,0,0,1,0
3,2019-01-01 00:06:41,2,0,0,1.5,0.0,0.166667,0.0,0.442308,0,0,1,0,1,0,0,0,1,0
4,2019-01-01 00:08:46,1,1,0,3.0,0.0,0.166667,0.0,0.177885,0,1,0,0,1,0,0,0,0,1


In [7]:
# Split data into test and train and convert to tensors



X = data_PSP.drop(columns=['success', 'attempts', 'fee', 'tmsp'])
y = data_PSP['success']
X = torch.from_numpy(X.to_numpy()).type(torch.float)
y = torch.from_numpy(y.to_numpy()).type(torch.float)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [8]:
#smote = SMOTE(random_state=42)
#X_train_smote, y_train_smote = smote.fit_resample(X_train, y_train)
#X_train = torch.from_numpy(X_train_smote).type(torch.float)
#y_train = torch.from_numpy(y_train_smote).type(torch.float)

undersampler = RandomUnderSampler(sampling_strategy='auto', random_state=42)
X_resampled, y_resampled = undersampler.fit_resample(X_train, y_train)
X_train = torch.from_numpy(X_resampled).type(torch.float)
y_train = torch.from_numpy(y_resampled).type(torch.float)


In [9]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cpu'

In [19]:
# 1. Construct a model class that subclasses nn.Module
class ModelV0(nn.Module):
    def __init__(self):
        super().__init__()
        # 2. Create nn.Linear layers capable of handling X and y input and output shapes
        self.layer_1 = nn.Linear(in_features=15, out_features=20)
        self.layer_2 = nn.Linear(in_features=20, out_features=7)
        self.layer_3 = nn.Linear(in_features=7, out_features=5)
        self.layer_4 = nn.Linear(in_features=5, out_features=1)
        self.relu = nn.ReLU()
    
    # 3. Define a forward method containing the forward pass computation
    def forward(self, x):
        out = self.relu(self.layer_1(x))
        out = self.relu(self.layer_2(out))
        out = self.relu(self.layer_3(out))
        out = self.layer_4(out)
        return out

# 4. Create an instance of the model and send it to target device
model_0 = ModelV0().to(device)
model_0

ModelV0(
  (layer_1): Linear(in_features=15, out_features=20, bias=True)
  (layer_2): Linear(in_features=20, out_features=7, bias=True)
  (layer_3): Linear(in_features=7, out_features=5, bias=True)
  (layer_4): Linear(in_features=5, out_features=1, bias=True)
  (relu): ReLU()
)

In [23]:
# Create a loss function
# loss_fn = nn.BCELoss() # BCELoss = no sigmoid built-in
loss_fn = nn.BCEWithLogitsLoss() # BCEWithLogitsLoss = sigmoid built-in

# Create an optimizer
optimizer = torch.optim.Adam(params=model_0.parameters(), lr=0.001)


In [21]:
# define function for evaluating accuracy
def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100 
    return acc


In [24]:
torch.manual_seed(42)

losses = []
# Set the number of epochs
epochs = 50

# send data to target device
X_train, y_train = X_train.to(device), y_train.to(device)
X_test, y_test = X_test.to(device), y_test.to(device)

# Create datasets and dataloaders
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

batch_size = 512  # You can adjust this depending on your specific needs
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Build training and evaluation loop
for epoch in range(epochs):
    ### Training
    model_0.train()
    for inputs, labels in train_loader:
        outputs = model_0(inputs).squeeze() # forward pass
        y_pred = torch.round(torch.sigmoid(outputs)) # convert raw output to probabilities and round to 0/1
        
        loss = loss_fn(y_pred, labels) # calculate loss
        acc = accuracy_fn(labels, y_pred) 

        optimizer.zero_grad() # zero the gradients
        loss.backward() # backward pass
        optimizer.step()
    print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')
    losses.append(loss.item())
    # 1. Forward pass (model outputs raw logits)
    # y_logits = model_0(X_train).squeeze() # squeeze to remove extra `1` dimensions, this won't work unless model and data are on same device 
    # y_pred = torch.round(torch.sigmoid(y_logits)) # turn logits -> pred probs -> pred labls
  
    # 2. Calculate loss/accuracy

    # loss = loss_fn(y_logits, y_train) 
    # acc = accuracy_fn(y_train, y_pred) 

    # # 3. Optimizer zero grad
    # optimizer.zero_grad()

    # # 4. Loss backwards
    # loss.backward()

    # # 5. Optimizer step
    # optimizer.step()

    ### Testing
    # model_0.eval()
    # with torch.inference_mode():
    #     # 1. Forward pass
    #     test_logits = model_0(X_test).squeeze() 
    #     test_pred = torch.round(torch.sigmoid(test_logits))
    #     # 2. Caculate loss/accuracy
    #     test_loss = loss_fn(test_logits, y_test)
    #     test_acc = accuracy_fn(y_test, test_pred)

    # Print out what's happening every 10 epochs
    # if epoch % 100 == 0:
    #     print(f"Epoch: {epoch} | Loss: {loss:.5f}, Accuracy: {acc:.2f}% | Test loss: {test_loss:.5f}, Test acc: {test_acc:.2f}%")

model_0.eval()  # Set the model to evaluation mode
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        outputs = model_0(inputs)
        predicted = (outputs.squeeze() > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f'Accuracy: {accuracy:.2f}%')

Epoch [1/50], Loss: 0.8255
Epoch [2/50], Loss: 0.8133
Epoch [3/50], Loss: 0.8480
Epoch [4/50], Loss: 0.8520
Epoch [5/50], Loss: 0.8194
Epoch [6/50], Loss: 0.7929
Epoch [7/50], Loss: 0.8500
Epoch [8/50], Loss: 0.7663
Epoch [9/50], Loss: 0.8173
Epoch [10/50], Loss: 0.8133
Epoch [11/50], Loss: 0.8704
Epoch [12/50], Loss: 0.8031
Epoch [13/50], Loss: 0.8010
Epoch [14/50], Loss: 0.8153
Epoch [15/50], Loss: 0.8214
Epoch [16/50], Loss: 0.8398
Epoch [17/50], Loss: 0.8051
Epoch [18/50], Loss: 0.8173
Epoch [19/50], Loss: 0.8092
Epoch [20/50], Loss: 0.7990
Epoch [21/50], Loss: 0.8071
Epoch [22/50], Loss: 0.8194
Epoch [23/50], Loss: 0.8031
Epoch [24/50], Loss: 0.7786
Epoch [25/50], Loss: 0.8194
Epoch [26/50], Loss: 0.7704
Epoch [27/50], Loss: 0.7847
Epoch [28/50], Loss: 0.8541
Epoch [29/50], Loss: 0.8275
Epoch [30/50], Loss: 0.8398
Epoch [31/50], Loss: 0.7847
Epoch [32/50], Loss: 0.7990
Epoch [33/50], Loss: 0.8173
Epoch [34/50], Loss: 0.8255
Epoch [35/50], Loss: 0.8724
Epoch [36/50], Loss: 0.7684
E

In [32]:
test_pred = torch.sigmoid(model_0(X_test).squeeze())



# cm = confusion_matrix(y_test, test_pred)
# disp = ConfusionMatrixDisplay(cm)
# disp.plot(cmap='RdPu')

In [None]:
class BinaryClassifier(nn.Module):
    def __init__(self):
        super(BinaryClassifier, self).__init__()
        self.fc1 = nn.Linear(X_train.shape[1], 64)
        self.fc2 = nn.Linear(64, 32)
        self.output = nn.Linear(32, 1)
        self.dropout = nn.Dropout(0.5)  # Added dropout

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.dropout(x)  # Apply dropout
        x = F.relu(self.fc2(x))
        x = torch.sigmoid(self.output(x))
        return x

model = BinaryClassifier()
optimizer = torch.optim.Adam(model.parameters(), lr=0.00001)  # Adjusted learning rate
criterion = nn.BCELoss()

# Training loop with modifications
num_epochs = 500
for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        outputs = model(inputs)
        loss = loss_fn(outputs.squeeze(), labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    # Print loss every epoch to monitor training progress
    if epoch % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

In [None]:
model.eval()  # Set the model to evaluation mode
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        outputs = model(inputs)
        predicted = (outputs.squeeze() > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f'Accuracy: {accuracy:.2f}%')

cm = confusion_matrix(labels, predicted)
disp = ConfusionMatrixDisplay(cm)
disp.plot(cmap='RdPu')