### Graph Neural Networks

Andrei Gabriel Popescu

In [None]:
import torch_geometric.nn as pyg_nn
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch import Tensor
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
import torch.nn.functional as F
from torch.nn import Linear, Dropout
from torch_geometric.nn import GCNConv, GATv2Conv

# TSNE
from sklearn.manifold import TSNE

In [None]:
import torch_geometric
import torch

In [None]:
from torch_geometric.datasets import Planetoid
from torch_geometric.data import InMemoryDataset
from torch_geometric.utils import degree
from collections import Counter
import os

In [None]:
dataset = Planetoid(root=".", name="Cora")
data = dataset[0]


In [None]:
dataset[0]

In [None]:
# Print information about the dataset
print(f'Number of graphs: {len(dataset)}')
print(f'Number of nodes: {data.x.shape[0]}')
print(f'Number of features: {dataset.num_features}')
print(f'Number of classes: {dataset.num_classes}')
print(f'Has isolated nodes: {data.has_isolated_nodes()}')

In [None]:
# Get the list of degrees for each node
degrees = degree(data.edge_index[0]).numpy()

# Count the number of nodes for each degree
numbers = Counter(degrees)

# Bar plot
fig, ax = plt.subplots(figsize=(18, 6))
ax.set_xlabel('Node degree')
ax.set_ylabel('Number of nodes')
plt.bar(numbers.keys(),
        numbers.values(),
        color='#0A047A')

In [None]:
class GCN(torch.nn.Module):
    """
      Graph Convolutional Network
      made using the architecture from the homework paper

      The changes are the inner layer with residual connections and the use of GELU instead of ReLU for the activation function.
      
    """
    def __init__(self, dim_in:int, dim_h: int, dim_out: int, inner_num_layers: int = 3):
      super().__init__()
      self.gcn1 = GCNConv(in_channels=dim_in, out_channels=dim_h)
      self.gcn_inner = GCNConv(in_channels=dim_h, out_channels=dim_h)
      self.gcn2 = GCNConv(in_channels=dim_h, out_channels=dim_out)
      self.optimizer = torch.optim.Adam(self.parameters(),
                                        lr=5e-3,
                                        weight_decay=5e-4)
      self.dropout_rate = 0.2
      self.inner_num_layers = inner_num_layers

    def forward(self, x, edge_index):
        h = F.dropout(x, p=self.dropout_rate, training=self.training)
        h = self.gcn1(h, edge_index)
        h = F.gelu(h)
        h = F.dropout(h, p=self.dropout_rate, training=self.training)
        for _ in range(self.inner_num_layers):
            old_h = h
            h = self.gcn_inner(h, edge_index)
            h = F.gelu(h)
            h = F.dropout(h, p=self.dropout_rate, training=self.training)
            h += old_h # residual connection
            
        h = self.gcn2(h, edge_index)
        return h, F.log_softmax(h, dim=1)

In [None]:
class GAT(torch.nn.Module):
    """
        Graph Attention Network
        made using the architecture from the homework paper    
    """
    def __init__(self, dim_in: int, dim_h: int, dim_out: int, heads:int = 8, inner_num_layers: int = 3):
        super().__init__()
        self.gat1 = GATv2Conv(in_channels=dim_in, out_channels=dim_h, heads=heads)
        self.gat_inner = GATv2Conv(in_channels=dim_h*heads, out_channels=dim_h*heads, heads=1)
        self.gat2 = GATv2Conv(in_channels=dim_h*heads, out_channels=dim_out, heads=1)
        self.optimizer = torch.optim.Adam(self.parameters(),
                                          lr=5e-3,
                                          weight_decay=5e-4)

        self.dropout_rate = 0.2
        self.inner_num_layers = inner_num_layers

    def forward(self, x, edge_index):
        h = F.dropout(x, p=self.dropout_rate, training=self.training)
        h = self.gat1(h, edge_index)
        h = F.gelu(h)
        h = F.dropout(h, p=self.dropout_rate, training=self.training)
        for _ in range(self.inner_num_layers):
            old_h = h
            h = self.gat_inner(h, edge_index)
            h = F.gelu(h)
            h = F.dropout(h, p=self.dropout_rate, training=self.training)
            # residual connection with respect to the number of heads
            h += old_h
            
        h = self.gat2(h, edge_index)
        return h, F.log_softmax(h, dim=1)

In [None]:
def accuracy(pred_y, y):
    """Calculate accuracy."""
    return ((pred_y == y).sum() / len(y)).item()

In [None]:
def train(model, data):
    """Train a GNN model and return the trained model."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = model.optimizer
    epochs = 200

    model.train()
    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []

    for epoch in range(epochs+1):
        # Training
        optimizer.zero_grad()
        _, out = model(data.x, data.edge_index)
        loss = criterion(out[data.train_mask], data.y[data.train_mask])
        acc = accuracy(out[data.train_mask].argmax(dim=1), data.y[data.train_mask])
        train_losses.append(loss.item())
        train_accs.append(acc)
        loss.backward()
        optimizer.step()
        
        
        # Validation
        val_loss = criterion(out[data.val_mask], data.y[data.val_mask])
        val_acc = accuracy(out[data.val_mask].argmax(dim=1), data.y[data.val_mask])
        val_losses.append(val_loss.item())
        val_accs.append(val_acc)

        # Print metrics every 10 epochs
        if(epoch % 10 == 0):
            print(f'Epoch {epoch:>3} | Train Loss: {loss:.3f} | Train Acc: '
                  f'{acc*100:>6.2f}% | Val Loss: {val_loss:.2f} | '
                  f'Val Acc: {val_acc*100:.2f}%')

    # Save the model into a folder callned GCN_models
    if os.path.exists('GCN_models') == False:
        os.mkdir('GCN_models')
        
    torch.save(model.state_dict(), f'GCN_models/{model.__class__.__name__}_{epochs}.pt')

    # Plot the training and validation losses and accuracies
    fig, ax = plt.subplots(1, 2, figsize=(18, 6))
    ax[0].set_title('Loss')
    ax[0].set_xlabel('Epoch')
    ax[0].set_ylabel('Loss')
    ax[0].plot(train_losses, label='Train')
    ax[0].plot(val_losses, label='Validation')
    ax[0].legend()
    ax[1].set_title('Accuracy')
    ax[1].set_xlabel('Epoch')
    ax[1].set_ylabel('Accuracy')
    ax[1].plot(train_accs, label='Train')
    ax[1].plot(val_accs, label='Validation')
    ax[1].legend()
    plt.show()

    # Save the plot into a folder called GCN_plots
    if os.path.exists('GCN_plots') == False:
        os.mkdir('GCN_plots')
    
    fig.savefig(f'GCN_plots/{model.__class__.__name__}_{epochs}.png')
    return model

In [None]:
@torch.no_grad()
def test(model, data):
    """Evaluate the model on test set and print the accuracy score."""
    model.eval()
    _, out = model(data.x, data.edge_index)
    acc = accuracy(out.argmax(dim=1)[data.test_mask], data.y[data.test_mask])
    return acc

In [None]:
# testing parameters
embedding_dim = [128, 256]
batch_size = 64

# Create GCN model
gcn = GCN(dim_in=dataset.num_features, dim_h=embedding_dim[0], dim_out=dataset.num_classes, inner_num_layers=4)
print(gcn)

# Train and test
train(gcn, data)
acc = test(gcn, data)
print(f'\nGCN test accuracy: {acc*100:.2f}%\n')

In [None]:
# Create GAT model
gat = GAT(dataset.num_features, 128, dataset.num_classes, heads=8, inner_num_layers=3)
print(gat)

# Train and test
train(gat, data)
acc = test(gat, data)
print(f'\nGAT test accuracy: {acc*100:.2f}%\n')

Untrained GAT

In [None]:
untrained_gat = GAT(dataset.num_features, 128, dataset.num_classes, heads=8, inner_num_layers=3)

# Get embeddings
h, _ = untrained_gat(data.x, data.edge_index)

# Train TSNE
tsne = TSNE(n_components=2, learning_rate='auto',
         init='pca').fit_transform(h.detach())

# Plot TSNE
plt.figure(figsize=(10, 10))
plt.axis('off')
plt.scatter(tsne[:, 0], tsne[:, 1], s=50, c=data.y)
plt.show()

Trained GAT

In [None]:
h, _ = gat(data.x, data.edge_index)

# Train TSNE
tsne = TSNE(n_components=2, learning_rate='auto',
         init='pca').fit_transform(h.detach())

# Plot TSNE
plt.figure(figsize=(10, 10))
plt.axis('off')
plt.scatter(tsne[:, 0], tsne[:, 1], s=50, c=data.y)
plt.show()

In [None]:
# Get model's classifications
_, out = gat(data.x, data.edge_index)

# Calculate the degree of each node
degrees = degree(data.edge_index[0]).numpy()

# Store accuracy scores and sample sizes
accuracies = []
sizes = []

# Accuracy for degrees between 0 and 5
for i in range(0, 6):
  mask = np.where(degrees == i)[0]
  accuracies.append(accuracy(out.argmax(dim=1)[mask], data.y[mask]))
  sizes.append(len(mask))

# Accuracy for degrees > 5
mask = np.where(degrees > 5)[0]
accuracies.append(accuracy(out.argmax(dim=1)[mask], data.y[mask]))
sizes.append(len(mask))

# Bar plot
fig, ax = plt.subplots(figsize=(18, 9))
ax.set_xlabel('Node degree')
ax.set_ylabel('Accuracy score')
ax.set_facecolor('#EFEEEA')
plt.bar(['0','1','2','3','4','5','>5'],
        accuracies,
        color='#0A047A')
for i in range(0, 7):
    plt.text(i, accuracies[i], f'{accuracies[i]*100:.2f}%',
             ha='center', color='#0A047A')
for i in range(0, 7):
    plt.text(i, accuracies[i]//2, sizes[i],
             ha='center', color='white')

In [None]:
# Load ppi dataset using PyTorch Geometric 
train_ppi = torch_geometric.datasets.PPI(root='ppi', split='train')
val_ppi = torch_geometric.datasets.PPI(root='ppi', split='val')
test_ppi = torch_geometric.datasets.PPI(root='ppi', split='test')

In [None]:
# Print a sample of the dataset
print(train_ppi[0])


In [None]:
# Dataloader with 64 batch size
from torch_geometric.loader import DataLoader as GeometricDataLoader
from torch_geometric.data import Data

train_loader = GeometricDataLoader(train_ppi, batch_size=64, shuffle=True)
val_loader = GeometricDataLoader(val_ppi, batch_size=64, shuffle=True)
test_loader = GeometricDataLoader(test_ppi, batch_size=64, shuffle=True)


# Print a sample
for batch in train_loader:
    print(batch[0])
    break

In [None]:
def train_protein(model, train_loader, val_loader, test_loader):
    """Train the model on the training set."""
    optimizer = model.optimizer
    criterion = nn.BCEWithLogitsLoss()
    epochs = 100

    loader = train_loader
    
    model.train()
    for epoch in range(epochs+1):
        total_loss = 0
        acc = 0
        val_loss = 0
        val_acc = 0

        # Train on batches
        for data in loader:
          optimizer.zero_grad()
          _, out = model(data.x, data.edge_index)
          loss = criterion(out, data.y)
          total_loss += loss / len(loader)
          acc += accuracy(out, data.y) / len(loader)
          loss.backward()
          optimizer.step()

          # Validation
          val_loss, val_acc = test(model, val_loader)

        # Print metrics every 10 epochs
        if(epoch % 10 == 0):
            print(f'Epoch {epoch:>3} | Train Loss: {total_loss:.2f} '
                f'| Train Acc: {acc*100:>5.2f}% '
                f'| Val Loss: {val_loss:.2f} '
                f'| Val Acc: {val_acc*100:.2f}%')
            
    test_loss, test_acc = test(model, test_loader)
    print(f'Test Loss: {test_loss:.2f} | Test Acc: {test_acc*100:.2f}%')
    
    return model

@torch.no_grad()
def test(model, loader):
    criterion = torch.nn.BCEWithLogitsLoss()
    model.eval()
    loss = 0
    acc = 0

    for data in loader:
        _, out = model(data.x, data.edge_index)
        loss += criterion(out, data.y) / len(loader)
        acc += accuracy(out, data.y) / len(loader)

    return loss, acc

def accuracy(pred_y, y):
    """Calculate accuracy."""
    return ((pred_y == y).sum() / len(y)).item()


In [None]:
# Create model
model = GCN(dim_in=train_ppi.num_features, dim_out=train_ppi.num_classes, dim_h=256, inner_num_layers=3)

# Train model
model = train_protein(model, train_loader, val_loader, test_loader)