In [69]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data, DataLoader
import numpy as np
import matplotlib.pyplot as plt

First extract the data like in Task II. I'm not sure why, but the first time you run the data cell, or any cell with DataLoader, it comes up with a warning. It's fine if you run it again though and doesn't seem to affect the model.

In [166]:
# from torch_geometric.nn import knn_graph
from torch_geometric.transforms import KNNGraph

class JetDataset(torch.utils.data.Dataset):
    def __init__(self, data_path, train=False):
        self.data = np.load(data_path, allow_pickle=True)
        self.samples = []
        self.num_classes = 2
        self.train = train

        for key, value in self.data.items():
            if key == 'X':
                self.samples.extend([(sample, label) for sample, label in zip(value, self.data['y'])])
        
        # 80/20 split for training and testing
        split_index = int(0.8 * len(self.samples))
        if self.train:
            self.samples = self.samples[:split_index]
        else:
            self.samples = self.samples[split_index:]
            
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        sample, label = self.samples[idx]
        sample = torch.tensor(sample.transpose(), dtype=torch.float32)
        label = torch.tensor(label, dtype=torch.long)
        
        # Construct k nearest neighbors graph
        pos = sample  
        # edge_index = knn_graph(pos, k=6, batch=None, loop=False)
        edge_index = KNNGraph(k=6)
        
        return Data(x=sample, edge_index=edge_index, y=label)


Now we want to transform the points into a mesh on a surface.

In [78]:
# Modified to take in a training/testing boolean parameter
class JetDataset(torch.utils.data.Dataset):
    def __init__(self, data_path, train=False):
        self.data = np.load(data_path, allow_pickle=True)
        self.samples = []
        self.num_classes = 2
        self.train = train

        for key, value in self.data.items():
            if key == 'X':
                self.samples.extend([(sample, label) for sample, label in zip(value, self.data['y'])])
        
        # 80/20 split for training and testing
        split_index = int(0.8 * len(self.samples))
        if self.train:
            self.samples = self.samples[:split_index]
        else:
            self.samples = self.samples[split_index:]
            
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        sample, label = self.samples[idx]
        sample = torch.tensor(sample.transpose(), dtype=torch.float32)
        label = torch.tensor(label, dtype=torch.long) 
        return sample, label



In [167]:
import torch_geometric.transforms as T

# Load data and create DataLoader
data_path = 'qg_dataset/QG_jets.npz'
train_dataset = JetDataset(data_path, train=True)
test_dataset = JetDataset(data_path, train=False)

batch_size = 128
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)



In [168]:
from torch import Tensor
from torch.nn import Sequential, Linear, ReLU
from torch_geometric.nn import MessagePassing

class PointNetLayer(MessagePassing):
    def __init__(self, in_channels: int, out_channels: int):
        super().__init__(aggr='max')

        self.mlp = Sequential(
            Linear(in_channels + 3, out_channels),
            ReLU(),
            Linear(out_channels, out_channels),
        )

    def forward(self,
        h: Tensor,
        pos: Tensor,
        edge_index: Tensor,
    ) -> Tensor:
        return self.propagate(edge_index, h=h, pos=pos)

    def message(self,
        h_j: Tensor,
        pos_j: Tensor,
        pos_i: Tensor,
    ) -> Tensor:

        edge_feat = torch.cat([h_j, pos_j - pos_i], dim=-1)
        return self.mlp(edge_feat)

In [169]:
from torch_geometric.nn import global_max_pool

class PointNet(torch.nn.Module):
    def __init__(self):
        super().__init__()

        self.conv1 = PointNetLayer(3, 32)
        self.conv2 = PointNetLayer(32, 32)
        self.classifier = Linear(32, train_dataset.num_classes)

    def forward(self, pos: Tensor, edge_index: Tensor, batch: Tensor) -> Tensor:

        # Perform two-layers of message passing
        h = self.conv1(h=pos, pos=pos, edge_index=edge_index)
        h = h.relu()
        h = self.conv2(h=h, pos=pos, edge_index=edge_index)
        h = h.relu()

        h = global_max_pool(h, batch)

        return self.classifier(h)


model = PointNet()
print(model)

PointNet(
  (conv1): PointNetLayer()
  (conv2): PointNetLayer()
  (classifier): Linear(in_features=32, out_features=2, bias=True)
)


Now, we can load the train and test data similarly to how the data was loaded at the start. The KNN graph transforms the point cloud to a graph structure, connecting nearest neighbors points. 

The model we are initializing is the above defined PointNet with 2 classes (gluons and jets). I chose the standard Adam optimizer, as I've used it before and it's very versatile, and a cross entropy loss function.

In [170]:
model = PointNet()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion=torch.nn.CrossEntropyLoss()

train_dataset = JetDataset(data_path=data_path, train=True)
train_dataset.transform = T.Compose([T.SamplePoints(num=128), T.KNNGraph(k=6)])
test_dataset = JetDataset(data_path=data_path, train=False)
test_dataset.transform = T.Compose([T.SamplePoints(num=128), T.KNNGraph(k=6)])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [187]:
def train():
    model.train()

    total_loss = 0
    for data in train_loader:
        optimizer.zero_grad()
        
        # Ensure edge_index is a tuple of long tensors
        edge_index = (data.edge_index[0].long(), data.edge_index[1].long())
        
        logits = model(data.pos, edge_index, data.batch)
        loss = criterion(logits, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * data.num_graphs  # Use item() to get the loss value
    return total_loss / len(train_loader.dataset)

@torch.no_grad()
def test():
    model.eval()

    total_correct = 0
    for data in test_loader:
        logits = model(data.pos, data.edge_index, data.batch)
        pred = logits.argmax(dim=-1)
        total_correct += int((pred == data.y).sum())

    return total_correct / len(test_loader.dataset)

for epoch in range(1, 51):
    loss = train()
    test_acc = test()
    print(f'Epoch: {epoch:02d}, Loss: {loss:.4f}, Test Acc: {test_acc:.4f}')



AttributeError: 'KNNGraph' object has no attribute 'long'

In [174]:
import torch_geometric.transforms as T

def train(model, train_loader, optimizer, criterion):
    model.train()
    train_loss = 0
    correct = 0
    total = 0

    for data in train_loader:
        optimizer.zero_grad()
        samples, edge_index, labels = data.x, data.edge_index, data.y
        outputs = model(samples, edge_index, batch_size)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    train_loss /= len(train_loader)
    accuracy = 100.0 * correct / total
    return train_loss, accuracy


def test(model, test_loader, criterion):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for data in test_loader:
            samples, labels = data
            pos = samples.transpose(1, 2)
            
            edge_index = T.KNNGraph(pos, k=6, batch=None, loop=False)
            
            batch = torch.zeros(len(samples), dtype=torch.long)
            outputs = model(pos, edge_index, batch)
            loss = criterion(outputs, labels)

            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    test_loss /= len(test_loader)
    accuracy = 100.0 * correct / total
    return test_loss, accuracy

num_epochs = 10
for epoch in range(num_epochs):
    train_loss, train_accuracy = train(model, train_loader, optimizer, criterion)
    test_loss, test_accuracy = test(model, test_loader, criterion)

    print(f"Epoch {epoch + 1}/{num_epochs}:")
    print(f"Train Loss: {train_loss:.4f} | Train Accuracy: {train_accuracy:.2f}%")
    print(f"Test Loss: {test_loss:.4f} | Test Accuracy: {test_accuracy:.2f}%")


ValueError: `MessagePassing.propagate` only supports integer tensors of shape `[2, num_messages]`, `torch_sparse.SparseTensor` or `torch.sparse.Tensor` for argument `edge_index`.