# MML Project

Import Libraries

In [51]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split

Load the dataset into tensors

In [52]:
# dimensionality of the dataset
n = 9

In [53]:
# Load the dataset
train_data = np.load(f'../Kryptonite-N-main/Datasets/kryptonite-{n}-X.npy')
train_labels = np.load(f'../Kryptonite-N-main/Datasets/kryptonite-{n}-y.npy')

# Split the data into training (70%), validation (15%), and test (15%) sets
X_train, X_temp, y_train, y_temp = train_test_split(train_data, train_labels, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Convert numpy arrays to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.long)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)

# Create DataLoader
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

Get device for training:

In [54]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cpu device


Design the neural network:

In [55]:
# Define a simple neural network
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(train_data.shape[1], 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid() # Binary Classification
        )

    def forward(self, x):
        res = self.model(x)
        
        return torch.round(res)

In [56]:
print(X_train.size())

hidden_1 = nn.Linear(train_data.shape[1], 128)(X_train)
print(hidden_1.size())

hidden_1 = nn.ReLU()(hidden_1)
print(hidden_1.size())

hidden_2 = nn.Linear(128, 64)(hidden_1)
print(hidden_2.size())

hidden_2 = nn.ReLU()(hidden_2)
print(hidden_2.size())

hidden_3 = nn.Linear(64, 1)(hidden_2)
print(hidden_3.size())

hidden_3 = nn.Sigmoid()(hidden_3)
print(hidden_3.size())

print(torch.flatten(hidden_3).size())

torch.Size([12600, 9])
torch.Size([12600, 128])
torch.Size([12600, 128])
torch.Size([12600, 64])
torch.Size([12600, 64])
torch.Size([12600, 1])
torch.Size([12600, 1])
torch.Size([12600])


Initialize the model, loss function, and optimizer

In [57]:
model = NeuralNetwork().to(device)
print(model)

learning_rate = 0.001
batch_size = 64

loss_fn = nn.BCELoss() # For Binary classification
optimizer = optim.SGD(model.parameters(), lr=learning_rate)

NeuralNetwork(
  (model): Sequential(
    (0): Linear(in_features=9, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=64, bias=True)
    (3): ReLU()
    (4): Linear(in_features=64, out_features=1, bias=True)
    (5): Sigmoid()
  )
)


Other relevant methods:

In [58]:
# Training function
def train_model(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train(True)
    
    running_loss = 0.0
    last_loss = 0
    
    for batch, (inputs, labels) in enumerate(dataloader):
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Reset gradients of model parameters. This prevents double-counting
        optimizer.zero_grad() 
        
        # Compute prediction
        pred = model(inputs)
        
        # Reshape labels to match the shape of pred
        labels = labels.view(-1, 1).float()
        
        # Compute the loss and its gradients
        loss = loss_fn(pred, labels)
        loss.backward() # Backpropagate the prediction loss
        
        # Adjust learning weights
        optimizer.step()
        
        # Gather data and report
        running_loss += loss.item()

        if batch % 100 == 999:
            last_loss = running_loss / 1000 # loss per batch
            print('  batch {} loss: {}'.format(batch + 1, last_loss))
            running_loss = 0.
            
    return last_loss

# Testing function
def test_model(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0
    
    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            pred = model(inputs)
            
            # Reshape labels to match the shape of pred
            labels = labels.view(-1, 1).float()
            
            test_loss += loss_fn(pred, labels).item()
            correct += (pred == labels).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= (size * num_batches)
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    
    return test_loss

# Save the model
def save_model(model, path='model.pth'):
    torch.save(model.state_dict(), path)

# Load the model
def load_model_weights(model, path='model.pth'):
    model.load_state_dict(torch.load(path, weights_only=True))
    model.eval()
    
def load_model(path='model.pth'):
    return torch.load(path, weights_only=True)

We train and test the model:

In [59]:
epochs = 10
best_val_loss = 1_000_000
# timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
model_path = 'model'

In [60]:
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    
    avg_loss = train_model(train_loader, model, loss_fn, optimizer)
    
    avg_val_loss = test_model(val_loader, model, loss_fn)
    
    # Track best performance, and save the model's state
    if avg_val_loss < best_val_loss:
        best_vloss = avg_val_loss
        # model_path = 'model_{}_{}'.format(timestamp, t)
        torch.save(model.state_dict(), model_path)

print("Done!")

Epoch 1
-------------------------------
Test Error: 
 Accuracy: 0.6%, Avg loss: 50.686274 

Epoch 2
-------------------------------
Test Error: 
 Accuracy: 0.6%, Avg loss: 50.686274 

Epoch 3
-------------------------------
Test Error: 
 Accuracy: 0.6%, Avg loss: 50.686274 

Epoch 4
-------------------------------
Test Error: 
 Accuracy: 0.6%, Avg loss: 50.686274 

Epoch 5
-------------------------------
Test Error: 
 Accuracy: 0.6%, Avg loss: 50.686274 

Epoch 6
-------------------------------
Test Error: 
 Accuracy: 0.6%, Avg loss: 50.686274 

Epoch 7
-------------------------------
Test Error: 
 Accuracy: 0.6%, Avg loss: 50.686274 

Epoch 8
-------------------------------
Test Error: 
 Accuracy: 0.6%, Avg loss: 50.686274 

Epoch 9
-------------------------------
Test Error: 
 Accuracy: 0.6%, Avg loss: 50.686274 

Epoch 10
-------------------------------
Test Error: 
 Accuracy: 0.6%, Avg loss: 50.686274 

Done!


In [61]:
# Load the best model
load_model_weights(model, model_path)

print("Test performance in test data set")
test_model(test_loader, model, loss_fn)

# Save the model
# save_model(model)

# Load the model (example usage)
# load_model(model)
# test_model(test_loader, model, loss_fn)

Test performance in test data set
Test Error: 
 Accuracy: 0.6%, Avg loss: 51.127451 



51.12745096543256