### Imports

In [12]:
import torch
import torch_geometric.nn as pyg_nn
import torch.nn.functional as F

### Below we specify arbitrary constants or hiperparams

`TEST_TRAIN_SPLIT` - test size (0.2 == 20%) <br>
`ROWS_PER_EXAMPLE` - number of rows (key presses) per data chunk <br>
`NUM_HIDDEN_DIMS` - TODO <br>
`NUM_EPOCHS` - number of training epochs

In [13]:
TEST_TRAIN_SPLIT = 0.2
ROWS_PER_EXAMPLE = 150
NUM_HIDDEN_DIMS = 64
NUM_EPOCHS = 1000

file_names = ["../datasets/user1.tsv", "../datasets/user2.tsv", "../datasets/user3.tsv", "../datasets/user4.tsv"]

In [14]:

# Define a simple GCN model
class LetterGNN(torch.nn.Module):
    def __init__(self, num_node_features, hidden_dim, num_classes, num_layers=2):
        super(LetterGNN, self).__init__()

        self.convs = torch.nn.ModuleList()
        self.convs.append(pyg_nn.GCNConv(num_node_features, hidden_dim))
        for _ in range(num_layers - 1):
            self.convs.append(pyg_nn.GCNConv(hidden_dim, hidden_dim))

        self.fc = torch.nn.Linear(hidden_dim, num_classes)

    def forward(self, x, edge_index, batch):
        for conv in self.convs:
            x = x.float()
            edge_index = edge_index.long()
            x = conv.forward(x, edge_index)
            x = F.relu(x)
        
        # idk maybe use a different pool method 
        x = pyg_nn.global_mean_pool(x, batch)

        # classify
        x = self.fc(x)

        return x

Take a look at `dataset.x size`. Size (13, 5) means there is 13 different letters in data chunk, 5 attributes for each letter.

In [15]:
from torch_geometric.data import InMemoryDataset
from sklearn.model_selection import train_test_split
from utils import data_loader

mode = data_loader.LoadMode.DROP
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# This is a list of lists of Data objects - datasets
# each dataset comes from a different input file - a different user
list_of_datasets = [
    data_loader.load_from_file(filename, mode=mode, y=torch.tensor([i]), rows_per_example=ROWS_PER_EXAMPLE)
    for i, filename in enumerate(file_names)]

num_features = list_of_datasets[0][0].x.shape[1]

print("Number of examples: ", sum(len(l) for l in list_of_datasets))
print("Shape of dataset.x: ", tuple(list_of_datasets[0][0].x.shape))

Number of examples:  72
Shape of dataset.x:  (24, 2)


In [16]:
from collections import Counter

training_dataset_pos = []
testing_dataset_pos = []
training_dataset_neg = []
testing_dataset_neg = []

# !!! choose which one get the positive label
positive_index = 2 # just arbitrary

# relabel the datasets
for i, dataset in enumerate(list_of_datasets):
    for item in dataset:
        if i == positive_index:
            item.y = torch.tensor([1])
        else:
            item.y = torch.tensor([0])

    train_items, test_items = train_test_split(dataset, test_size=TEST_TRAIN_SPLIT)
    
    if i == positive_index:
        training_dataset_pos.extend(train_items)
        testing_dataset_pos.extend(test_items)
    else:
        training_dataset_neg.extend(train_items)
        testing_dataset_neg.extend(test_items)


class SimpleGraphDataset(InMemoryDataset):
    def __init__(self, data_list):
        super(SimpleGraphDataset, self).__init__('.', None, None, None)
        self.data, self.slices = self.collate(data_list)  # Collate all data objects

    def __len__(self):
        return len(self.data.y)  # Number of graphs in the dataset
    
    def statistics(self) -> str:
        class_counts = Counter(self.data.y.cpu().numpy())
        return " | ".join([f"{item_class}: {count}" for item_class, count in class_counts.items()]) + f" | Total: {sum(class_counts.values())}"
        

train_items = [e.to(device) for e in (training_dataset_pos + training_dataset_neg)]
test_items = [e.to(device) for e in (testing_dataset_pos + testing_dataset_neg)]

dataset = SimpleGraphDataset(train_items)
test_dataset = SimpleGraphDataset(test_items)

print("Train dataset statistics: ", dataset.statistics())
print("Test dataset statistics:  ", test_dataset.statistics())

Train dataset statistics:  1: 14 | 0: 42 | Total: 56
Test dataset statistics:   1: 4 | 0: 12 | Total: 16


In [17]:
# Train the model 

from torch_geometric.loader import DataLoader
# Q: How Do we choose hidden dims size ?? !!!!!!!!!!

# Define the model, loss, and optimizer
model = LetterGNN(num_node_features=dataset.num_node_features, hidden_dim=NUM_HIDDEN_DIMS, num_classes=dataset.num_classes).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.CrossEntropyLoss()

# Train loop
def train(model, data_loader):
    model.train()
    total_loss = 0
    for data in data_loader:
        optimizer.zero_grad()
        output = model(data.x, data.edge_index, data.batch)  # Forward pass
        loss = criterion(output, data.y)  # Compute the loss
        loss.backward()  # Backpropagation
        optimizer.step()  # Update the model parameters
        total_loss += loss.item()
    return total_loss / len(data_loader)


# Training over epochs
data_loader = DataLoader(dataset, batch_size=32, shuffle=True)
prev_loss = 100000
for epoch in range(1, NUM_EPOCHS):
    loss = train(model, data_loader)
    if epoch % 50 == 0 or epoch == NUM_EPOCHS-1:
        print(f"Epoch: {epoch:03d}, Loss: {loss:.4f}")
    if round(loss, 3) == 0:
        print(f"Epoch: {epoch:03d}, Loss: {loss:.4f}")
        break



Epoch: 050, Loss: 0.4527
Epoch: 100, Loss: 0.4031
Epoch: 150, Loss: 0.3313
Epoch: 200, Loss: 0.3245
Epoch: 250, Loss: 0.2954
Epoch: 300, Loss: 0.2870
Epoch: 350, Loss: 0.5434
Epoch: 400, Loss: 0.2684
Epoch: 450, Loss: 0.2408
Epoch: 500, Loss: 0.2272
Epoch: 550, Loss: 0.2435
Epoch: 600, Loss: 0.2282
Epoch: 650, Loss: 0.1972
Epoch: 700, Loss: 0.2583
Epoch: 750, Loss: 0.2294
Epoch: 800, Loss: 0.1800
Epoch: 850, Loss: 0.2123
Epoch: 900, Loss: 0.2493
Epoch: 950, Loss: 0.1799
Epoch: 999, Loss: 0.2048


In [18]:
# test 
def test(model, data_loader):
    model.eval()
    correct = 0
    for data in data_loader:
        output = model(data.x, data.edge_index, data.batch)
        pred = output.argmax(dim=1)  # Get the index of the max log-probability
        # print(f"Correct {data.y[0]} Pred {pred[0]}")
        if pred != data.y:
            print(output[0])
        correct += (pred == data.y).sum().item()
    return correct / len(data_loader.dataset)

def test_acc():
    # Test the model
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
    accuracy = test(model, test_loader)
    print(f"Test Accuracy: {accuracy:.4f}")

test_acc()

tensor([-14.8833, -15.0566], device='cuda:0', grad_fn=<SelectBackward0>)
Test Accuracy: 0.9375


### Saving

In [19]:
MODEL_PATH = '../models/experimental.pth'
torch.save(model.state_dict(), MODEL_PATH)
print(f"Model saved to {MODEL_PATH}")

Model saved to ../models/experimental.pth


In [20]:
# Load the model (for inference)
loaded_model = LetterGNN(num_node_features=dataset.num_node_features, hidden_dim=NUM_HIDDEN_DIMS, num_classes=dataset.num_classes).to(device)
loaded_model.load_state_dict(torch.load(MODEL_PATH))
loaded_model.eval()  # Set the model to evaluation mode
print("Model loaded for inference.")

Model loaded for inference.


  loaded_model.load_state_dict(torch.load(MODEL_PATH))


In [21]:
# Assuming you have a new data loader for inference data
inference_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Inference loop
with torch.no_grad():  # Disable gradient computation for inference
    for data in inference_loader:
        data = data.to(device)
        output = loaded_model(data.x, data.edge_index, data.batch)
        pred = output.argmax(dim=1)

In [22]:
# test
def test(model, data_loader):
    model.eval()
    correct = 0
    for data in data_loader:
        output = model(data.x, data.edge_index, data.batch)
        pred = output.argmax(dim=1)  # Get the index of the max log-probability
        # print(f"Correct {data.y[0]} Pred {pred[0]}")
        if pred != data.y:
            print(output[0])
        correct += (pred == data.y).sum().item()
    return correct / len(data_loader.dataset)

def test_acc():
    # Test the model
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
    accuracy = test(loaded_model, test_loader)
    print(f"Test Accuracy: {accuracy:.4f}")

test_acc()

tensor([-14.8833, -15.0566], device='cuda:0', grad_fn=<SelectBackward0>)
Test Accuracy: 0.9375
