In [10]:
import math
import random
from dataclasses import dataclass
from typing import Tuple, Dict, Any, List

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam

from torch_geometric.datasets import Planetoid
from torch_geometric.transforms import NormalizeFeatures
from torch_geometric.nn import GCNConv

from sklearn.metrics import f1_score

from sklearn.metrics import f1_score
import itertools

In [11]:
def set_seed(seed: int = 42):
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)


def get_device() -> torch.device:
    return torch.device("cuda" if torch.cuda.is_available() else "cpu")


def f1_macro(logits: torch.Tensor, y: torch.Tensor, mask: torch.Tensor) -> float:
    preds = logits.argmax(dim=-1)[mask].detach().cpu().numpy()
    true = y[mask].detach().cpu().numpy()
    if len(true) == 0:
        return 0.0
    return f1_score(true, preds, average="macro")



def load_cora(device: torch.device):
    #Cora — классический датасет для классификации вершин.
    #В Planetoid уже есть train/val/test маски.

    dataset = Planetoid(
        root="data/Planetoid",
        name="Cora",
        transform=NormalizeFeatures()
    )
    data = dataset[0].to(device)
    return dataset, data



class GCN_PyG(nn.Module):

    def __init__(self, in_channels: int, hidden_channels: int,
                 out_channels: int, dropout: float):
        super().__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, out_channels)
        self.dropout = dropout

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=self.dropout, training=self.training)
        x = self.conv2(x, edge_index)
        return x

class MyGCNConv(nn.Module):
    def __init__(self, in_channels: int, out_channels: int, bias: bool = True):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels

        self.weight = nn.Parameter(torch.Tensor(in_channels, out_channels))
        if bias:
            self.bias = nn.Parameter(torch.Tensor(out_channels))
        else:
            self.register_parameter("bias", None)

        self.reset_parameters()

    def reset_parameters(self):
        nn.init.xavier_uniform_(self.weight)
        if self.bias is not None:
            nn.init.zeros_(self.bias)

    @staticmethod
    def _build_normalized_adj(edge_index: torch.Tensor, num_nodes: int,
                              device: torch.device) -> torch.sparse.FloatTensor:

        loop_index = torch.arange(num_nodes, device=device)
        loop_index = loop_index.unsqueeze(0).repeat(2, 1)
        edge_index_full = torch.cat([edge_index, loop_index], dim=1)

        row, col = edge_index_full

        values = torch.ones(row.size(0), device=device)

        deg = torch.zeros(num_nodes, device=device)
        deg.scatter_add_(0, row, values)

        deg_inv_sqrt = deg.pow(-0.5)
        deg_inv_sqrt[torch.isinf(deg_inv_sqrt)] = 0.0

        norm_values = deg_inv_sqrt[row] * values * deg_inv_sqrt[col]

        adj = torch.sparse_coo_tensor(
            indices=edge_index_full,
            values=norm_values,
            size=(num_nodes, num_nodes),
            device=device
        )
        adj = adj.coalesce()
        return adj

    def forward(self, x: torch.Tensor, edge_index: torch.Tensor) -> torch.Tensor:
        num_nodes = x.size(0)
        device = x.device

        adj_norm = self._build_normalized_adj(edge_index, num_nodes, device)

        support = torch.sparse.mm(adj_norm, x)

        out = support @ self.weight

        if self.bias is not None:
            out = out + self.bias

        return out


class GCN_Custom(nn.Module):
    def __init__(self, in_channels: int, hidden_channels: int,
                 out_channels: int, dropout: float):
        super().__init__()
        self.conv1 = MyGCNConv(in_channels, hidden_channels)
        self.conv2 = MyGCNConv(hidden_channels, out_channels)
        self.dropout = dropout

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=self.dropout, training=self.training)
        x = self.conv2(x, edge_index)
        return x



@dataclass
class TrainConfig:
    hidden_channels: int = 16
    dropout: float = 0.5
    lr: float = 0.01
    weight_decay: float = 5e-4
    epochs: int = 200

def train_one_epoch(model: nn.Module,
                    data,
                    optimizer: Adam) -> float:
    model.train()
    optimizer.zero_grad()
    out = model(data.x, data.edge_index)
    loss = F.cross_entropy(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()
    return loss.item()

@torch.no_grad()
def evaluate_model(model: nn.Module, data) -> Dict[str, float]:
    model.eval()
    out = model(data.x, data.edge_index)

    train_f1 = f1_macro(out, data.y, data.train_mask)
    val_f1   = f1_macro(out, data.y, data.val_mask)
    test_f1  = f1_macro(out, data.y, data.test_mask)

    return {
        "train_f1": train_f1,
        "val_f1": val_f1,
        "test_f1": test_f1
    }

def run_experiments(ModelClass,
                    data,
                    device: torch.device,
                    config_grid: List[TrainConfig],
                    label: str):
    print(f"\nExperiments for {label}")

    best_val_acc = -1.0
    best_metrics = None
    best_config = None
    best_state_dict = None

    for i, cfg in enumerate(config_grid, start=1):
        print(f"\nConfig {i}/{len(config_grid)}: {cfg}")
        model = ModelClass(
            in_channels=data.num_features,
            hidden_channels=cfg.hidden_channels,
            out_channels=int(data.y.max().item()) + 1,
            dropout=cfg.dropout
        ).to(device)

        optimizer = Adam(
            model.parameters(),
            lr=cfg.lr,
            weight_decay=cfg.weight_decay
        )

        for epoch in range(1, cfg.epochs + 1):
            loss = train_one_epoch(model, data, optimizer)

        # Оценка после обучения
        final_metrics = evaluate_model(model, data)
        print(
            f"Final metrics (Config {i}): "
            f"train_f1={final_metrics['train_f1']*100:.2f}%, "
            f"val_f1={final_metrics['val_f1']*100:.2f}%, "
            f"test_f1={final_metrics['test_f1']*100:.2f}%"
        )

        if final_metrics["val_f1"] > best_val_acc:
          best_val_acc = final_metrics["val_f1"]
          best_metrics = final_metrics
          best_config = cfg
          best_state_dict = model.state_dict()

    print(f"\nBest config for {label}")
    print(best_config)
    print(
        f"Best metrics: "
        f"train_f1={best_metrics['train_f1']*100:.2f}%, "
        f"val_f1={best_metrics['val_f1']*100:.2f}%, "
        f"test_f1={best_metrics['test_f1']*100:.2f}%"
    )

    return best_config, best_metrics, best_state_dict


In [13]:
set_seed(42)
device = get_device()
print("Device:", device)

dataset, data = load_cora(device)
print(f"Dataset: {dataset}")
print(f"Num nodes: {data.num_nodes}")
print(f"Num features: {data.num_features}")
print(f"Num classes: {int(data.y.max().item()) + 1}")

hidden_choices  = [16, 32, 64]
dropout_choices = [0.5]
lr_choices      = [0.01, 0.005, 0.001]
wd_choices      = [5e-4, 5e-3]
epochs_choices  = [200]

config_grid = [
    TrainConfig(
        hidden_channels=h,
        dropout=d,
        lr=lr,
        weight_decay=wd,
        epochs=ep
    )
    for h, d, lr, wd, ep in itertools.product(
        hidden_choices,
        dropout_choices,
        lr_choices,
        wd_choices,
        epochs_choices,
    )
]

pyg_best_cfg, pyg_best_metrics, pyg_state = run_experiments(
    ModelClass=GCN_PyG,
    data=data,
    device=device,
    config_grid=config_grid,
    label="GCN (PyG GCNConv)"
)

custom_best_cfg, custom_best_metrics, custom_state = run_experiments(
    ModelClass=GCN_Custom,
    data=data,
    device=device,
    config_grid=config_grid,
    label="GCN (Custom MyGCNConv)"
)

print("PyG GCNConv best:")
print(f"  config: {pyg_best_cfg}")
print(
    f"  metrics: train_f1={pyg_best_metrics['train_f1']*100:.2f}%, "
    f"val_f1={pyg_best_metrics['val_f1']*100:.2f}%, "
    f"test_f1={pyg_best_metrics['test_f1']*100:.2f}%"
)

print("\nCustom MyGCNConv best:")
print(f"  config: {custom_best_cfg}")
print(
    f"  metrics: train_f1={custom_best_metrics['train_f1']*100:.2f}%, "
    f"val_f1={custom_best_metrics['val_f1']*100:.2f}%, "
    f"test_f1={custom_best_metrics['test_f1']*100:.2f}%"
)


Device: cpu
Dataset: Cora()
Num nodes: 2708
Num features: 1433
Num classes: 7

Experiments for GCN (PyG GCNConv)

Config 1/18: TrainConfig(hidden_channels=16, dropout=0.5, lr=0.01, weight_decay=0.0005, epochs=200)
Final metrics (Config 1): train_f1=100.00%, val_f1=76.84%, test_f1=78.78%

Config 2/18: TrainConfig(hidden_channels=16, dropout=0.5, lr=0.01, weight_decay=0.005, epochs=200)
Final metrics (Config 2): train_f1=77.95%, val_f1=66.70%, test_f1=63.84%

Config 3/18: TrainConfig(hidden_channels=16, dropout=0.5, lr=0.005, weight_decay=0.0005, epochs=200)
Final metrics (Config 3): train_f1=99.29%, val_f1=77.79%, test_f1=80.27%

Config 4/18: TrainConfig(hidden_channels=16, dropout=0.5, lr=0.005, weight_decay=0.005, epochs=200)
Final metrics (Config 4): train_f1=75.78%, val_f1=56.90%, test_f1=56.35%

Config 5/18: TrainConfig(hidden_channels=16, dropout=0.5, lr=0.001, weight_decay=0.0005, epochs=200)
Final metrics (Config 5): train_f1=95.69%, val_f1=74.73%, test_f1=76.88%

Config 6/18: T