In [1]:
import torch
import torch.nn as nn
from torch_geometric.datasets import TUDataset
from torch_geometric.data import DataLoader
import torch_geometric.transforms as T
from torch_geometric.nn import MessagePassing, global_mean_pool
from torch_geometric.utils import add_self_loops, degree
import torch.nn.functional as F

In [2]:
from manifolds import PoincareBall
from utils import tanh

In [3]:
dataset = TUDataset(root='data/TUDataset', name='PROTEINS', transform=T.NormalizeFeatures())
dataset

PROTEINS(1113)

In [4]:
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

In [5]:
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)



In [6]:
num_features = dataset.num_features
num_classes = dataset.num_classes
print(f"Number of features: {num_features} \nNumber of classes: {num_classes}")

Number of features: 3 
Number of classes: 2


In [7]:
class HyperbolicGCNLayer(MessagePassing):
    def __init__(self, in_channels, out_channels):
        super(HyperbolicGCNLayer, self).__init__(aggr='add')
        self.weight = nn.Parameter(torch.Tensor(in_channels, out_channels))
        self.bias = nn.Parameter(torch.Tensor(out_channels))
        self.reset_parameters()

    def reset_parameters(self):
        nn.init.xavier_uniform_(self.weight)
        nn.init.zeros_(self.bias)
        
    def forward(self, x, edge_index):
        edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0))
        
        poincare = PoincareBall(dimension=3)
        x = poincare.mobius_matrix_vector_mul(self.weight, x)

        x = self.propagate(edge_index, x=x)
        x = poincare.mobius_add(x, self.bias)
        return tanh(x)

    def message(self, x_j):
        return x_j


In [8]:
class HyperbolicGCN(nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super(HyperbolicGCN, self).__init__()
        self.conv1 = HyperbolicGCNLayer(in_channels, hidden_channels)
        self.conv2 = HyperbolicGCNLayer(hidden_channels, hidden_channels)
        self.conv3 = HyperbolicGCNLayer(hidden_channels, hidden_channels)
        self.global_pool = global_mean_pool
        self.fc = nn.Linear(hidden_channels, out_channels)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.conv1(x, edge_index)
        x = self.conv2(x, edge_index)
        x = self.conv3(x, edge_index)
        x = self.global_pool(x, batch)
        x = self.fc(x)
        return x


In [9]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = HyperbolicGCN(in_channels=num_features, hidden_channels=64, out_channels=num_classes).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
criterion = nn.CrossEntropyLoss()

def train():
    model.train()
    total_loss = 0
    for data in train_loader:
        data = data.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)

def evaluate(loader):
    model.eval()
    correct = 0
    for data in loader:
        data = data.to(device)
        output = model(data)
        pred = output.argmax(dim=1)
        correct += pred.eq(data.y).sum().item()
    return correct / len(loader.dataset)

best_val_acc = 0.0
best_model_path = 'best_hyperbolic_gcn.pth'
for epoch in range(50):
    loss = train()
    train_acc = evaluate(train_loader)
    val_acc = evaluate(val_loader)
    print(f"Epoch {epoch+1}: Loss: {loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")
    
    # Save best model based on validation accuracy
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), best_model_path)
        

Epoch 1: Loss: 0.7302, Train Acc: 0.5955, Val Acc: 0.5766
Epoch 2: Loss: 0.6780, Train Acc: 0.5955, Val Acc: 0.5766
Epoch 3: Loss: 0.6816, Train Acc: 0.5955, Val Acc: 0.5766
Epoch 4: Loss: 0.6767, Train Acc: 0.5955, Val Acc: 0.5766
Epoch 5: Loss: 0.6533, Train Acc: 0.6416, Val Acc: 0.7117
Epoch 6: Loss: 0.6321, Train Acc: 0.7090, Val Acc: 0.6667
Epoch 7: Loss: 0.6247, Train Acc: 0.6820, Val Acc: 0.7207
Epoch 8: Loss: 0.6306, Train Acc: 0.6652, Val Acc: 0.6126
Epoch 9: Loss: 0.6383, Train Acc: 0.7067, Val Acc: 0.7027
Epoch 10: Loss: 0.6351, Train Acc: 0.7022, Val Acc: 0.6577
Epoch 11: Loss: 0.6211, Train Acc: 0.7101, Val Acc: 0.6577
Epoch 12: Loss: 0.6465, Train Acc: 0.7112, Val Acc: 0.6937
Epoch 13: Loss: 0.6265, Train Acc: 0.6270, Val Acc: 0.6577
Epoch 14: Loss: 0.6524, Train Acc: 0.6022, Val Acc: 0.5946
Epoch 15: Loss: 0.6330, Train Acc: 0.6798, Val Acc: 0.7027
Epoch 16: Loss: 0.6282, Train Acc: 0.7180, Val Acc: 0.6937
Epoch 17: Loss: 0.6125, Train Acc: 0.7000, Val Acc: 0.7027
Epoch 

In [10]:
test_model = HyperbolicGCN(in_channels=num_features, hidden_channels=64, out_channels=num_classes).to(device)
test_model.load_state_dict(torch.load(best_model_path))
test_acc = evaluate(test_loader)
print(f"Test Accuracy: {test_acc * 100:.2f}%")

Test Accuracy: 79.46%
