# GNNs

## GNNs with StatQuest

In [14]:
from typing import Any

import torch
import torch.nn as nn
import torch.nn.functional as F
from pytorch_lightning.utilities.types import OptimizerLRScheduler
from torch.optim import SGD
import matplotlib.pyplot as plt
import seaborn as sns
import pytorch_lightning as L
from torch.utils.data import DataLoader


In [None]:
class BasicNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.w00 = nn.Parameter(torch.tensor(1.7), requires_grad=False)
        self.b00 = nn.Parameter(torch.tensor(1.7), requires_grad=False)
        self.w11 = nn.Parameter(torch.tensor(1.7), requires_grad=False)

    def forward(self, x):


In [None]:
# create optimiizer
optimizer = SGD(model.parameters(), lr=0.01)

# for loop for gradient descent
for epoch in range(100):
    total_loss = 0
    for iteration in range(len(inputs)):
        input_i = inputs[iteration]
        label_i = labels[iteration]

        output_i = model(input_i)
        loss = (output_i - labels_i)**2
        loss.backward()
        total_loss += float(loss)

    if total_loss < 0.0001:
        print("num steps: ", epoch)
        break
    optimizer.step()
    optimizer.zero_grad() # without this, would add new derivative to the derivatives of the previous loop
print(f"total loss: {total_loss}, final bias: ")

In [None]:
class BasicLight(L.LightningModule):
    def __init__(self):
        super.__init__()
        self.w00 = nn.Parameter(torch.tensor(1.7), requires_grad=False)
        self.b00 = nn.Parameter(torch.tensor(1.7), requires_grad=False)
        self.w11 = nn.Parameter(torch.tensor(1.7), requires_grad=False)

        #new!
        self.learning_rate = 0.01
    def forward(self, input)
        input_to_top_relu = input * self.w00 + self.b00
        #...
    def configure_optimizers(self) -> OptimizerLRScheduler:
        return SGD(self.parameters(), lr=self.learning_rate)

    def training_step(self, batch, batch_idx):
        input_i, label_i = batch
        output_i = self.forward(input_i)
        loss = (output_i - label_i)**2
        return loss


In [None]:
# wrap training data in dataloader!
dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset)
#easier access for batches, possibility to shuffle data each epoch, easy for small fraction of data for debugging

In [None]:
model = BasicLight()
trainer = L.Trainer(max_epochs=34)
trainer.fit(model, dataloader) # automatically calls zero_grad, backward, optimizer.step; then calls training_step again!

In [None]:
# gpu accelerator

## GNN with Planetoid - translate into Lightning

In [2]:
from torch_geometric.datasets import Planetoid
from torch.utils.data import DataLoader
import torch
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
import pytorch_lightning as L

In [4]:
dataset = Planetoid(root="tutorial1", name="Cora")
train_loader = DataLoader(dataset, batch_size=15, shuffle=True)
print(len(dataset))
print(dataset.data.num_nodes)
print(dataset.num_node_features)
print(dataset.data)
print(dataset.data.y)
data = dataset[0]

1
2708
1433
Data(x=[2708, 1433], edge_index=[2, 10556], y=[2708], train_mask=[2708], val_mask=[2708], test_mask=[2708])
tensor([3, 4, 4,  ..., 3, 3, 3])


In [15]:
class GCN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = GCNConv(dataset.num_node_features, 16)
        self.conv2 = GCNConv(16, dataset.num_classes)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index

        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, training=self.training)
        x = self.conv2(x, edge_index)

        return F.log_softmax(x, dim=1)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GCN().to(device)
data = dataset[0].to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)

In [16]:
model.train()
for epoch in range(200):
    optimizer.zero_grad()
    out = model(data)
    loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()

In [17]:
model.eval()
pred = model(data).argmax(dim=1)
correct = (pred[data.test_mask] == data.y[data.test_mask]).sum()
acc = int(correct) / int(data.test_mask.sum())
print(f'Accuracy: {acc:.4f}')

Accuracy: 0.7910


### Lightning approach
- optimizes learning rate
- simplifies the training loops
- TPU and GPU approach

Possibilities to change: DataModule (instead of lists use Datasets?)

In [None]:
class LightNN(L.LightningModule):
    def __init__(self):
        super().__init__()
        self.conv1 = GCNConv(dataset.num_node_features, 16)
        self.conv2 = GCNConv(16, dataset.num_classes)
    def forward(self, data):
        x, edge_index = data.x, data.edge_index

        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, training=self.training)
        x = self.conv2(x, edge_index)

        return F.log_softmax(x, dim=1)

    def training_step(self, batch, batch_idx):
        loss, output, y = self._common_step(batch, batch_idx)
        return loss

    def validation_step(self, batch, batch_idx):
        loss, output, y = self._common_step(batch, batch_idx)
        return loss

    def test_step(self, batch, batch_idx):
        loss, output, y = self._common_step(batch, batch_idx)
        return loss

    def _common_step(self, batch, batch_idx):
        x, y = batch
        output = self.forward(x)
        loss = F.nll_loss(output, y)
        return loss, output, y

    def configure_optimizers(self):
        return optim.Adam(model.parameters(), lr=0.01)





In [None]:
model = BasicLight()
trainer = L.Trainer(model, max_epochs=34)