In [1]:
import torch
from torch import nn
import pandas as pd
from torch.utils.data import DataLoader, random_split, Dataset
from sklearn.impute import SimpleImputer
import numpy as np
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE
from torch.utils.tensorboard import SummaryWriter
import time
import matplotlib.pyplot as plt
import seaborn as sns


In [None]:
device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
writer = SummaryWriter(log_dir=f"runs/experiment_initial")
class CustomDataset(Dataset):
    def __init__(self, dataFrane):
        self.data = dataFrane
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        row = self.data.iloc[index]
        features = torch.tensor(row[:-1].values, dtype=torch.float32)
        label = torch.tensor(row[-1], dtype=torch.float32).unsqueeze(0)  # shape [1]
        return features, label

def train(dataLoader, model, loss_fn, optimizer, epoch, writer):
        size = len(dataLoader.dataset)
        model.train() #set model to the training mode, it just switches internal setting
        running_loss = 0.0
        correct = 0
        for batch, (X, y) in enumerate(dataLoader):
            X, y = X.to(device), y.to(device)
            predict = model(X)
            loss = loss_fn(predict, y)
            #backpropagation
            loss.backward()
            optimizer.step()
            optimizer.zero_grad() #?
            # Calculate accuracy for this batch
            pred_binary = (predict >= 0.5).float()
            correct += (pred_binary == y).sum().item()
            running_loss += loss.item()
            
            if batch % 100 == 0:
                loss, current = loss.item(), (batch + 1) * len(X)
                print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
        # Log training metrics to TensorBoard
        avg_loss = running_loss / len(dataLoader)
        accuracy = (correct / size) * 100
        
        writer.add_scalar('Loss/Train', avg_loss, epoch)
        writer.add_scalar('Accuracy/Train', accuracy, epoch)
                
def test(dataloader, model, loss_fn, epoch, writer):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            pred = pred.flatten()
            y = y.flatten()
            for i in range(len(pred)):
                predict = 1 if pred[i] >= 0.5 else 0
                if predict == y[i]:
                   correct += 1 
    test_loss /= num_batches
    correct /= size
    writer.add_scalar('Loss/Test', test_loss, epoch)
    writer.add_scalar('Accuracy/Test', correct, epoch)
    print(f"Accuracy: {correct}%, Avg loss: {test_loss:>8f} \n")
    return correct
    
    
customDataset = CustomDataset(preprocessed_df)

trainData, testData = random_split(customDataset, [0.8, 0.2])
print(len(trainData))
print(len(testData))
trainDataLoader = DataLoader(trainData, batch_size=64, shuffle=True)
testDataLoader = DataLoader(testData, batch_size=64, shuffle=False)
model = NeuralNetwork().to(device)
loss_fn = nn.BCELoss() # use binary cross entropy because we have 2 classes in target
optimizer = torch.optim.SGD(model.parameters(), lr= 0.1) #??? changed SGD to adam

epochs = 20
for i in range(epochs):
    print(f'Epoch {i + 1}')
    train(trainDataLoader, model, loss_fn, optimizer, i, writer)
    test(testDataLoader, model, loss_fn, i, writer)
writer.close()
print("DONE!")