In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.data import (
    Dataset,
)  # 型アノテーション用にインポート（必須ではないが明示的に）
from torch_geometric.nn import GINConv  # GIN（Graph Isomorphism Network）層を使用
from torch_geometric.utils import scatter  # グラフのノードからグラフ単位への集約関数
from torch_geometric.loader import (
    DataLoader,
)  # PyGのDataLoader（複数のグラフをバッチ処理）
from torch_geometric.datasets import (
    TUDataset,
)  # TUDataset（MUTAGなどのグラフ分類データセット）

In [3]:
# ===============================
# データセットの準備と分割
# ===============================

# TUDataset から "NCI1" というグラフ分類用データセットを読み込む（各データが1つのグラフ）
dataset: TUDataset = TUDataset(root="/tmp/nci1", name="NCI1")

# データセット全体をランダムにシャッフル
dataset = dataset.shuffle()

# 全体の90%を学習用、残り10%をテスト用に使用
n_train: int = len(dataset) // 10 * 9  # 学習用データ数（90%）
n_test: int = len(dataset) - n_train  # テスト用データ数（残り10%）

# データセットを分割
train_dataset: TUDataset = dataset[:n_train]  # 学習用データセット
test_dataset: TUDataset = dataset[n_train:]  # テスト用データセット

Downloading https://www.chrsmrrs.com/graphkerneldatasets/NCI1.zip
Processing...
Done!


In [5]:
from torch_geometric.data import Data


# ===============================
# MLP：多層パーセプトロン（GINConv 内部で使用）
# ===============================
class MLP(nn.Module):
    def __init__(self, in_d: int, mid_d: int, out_d: int):
        """
        GINConv の中で使われる2層MLP（BatchNorm + ReLU付き）
        """
        super(MLP, self).__init__()
        self.main = nn.Sequential(
            nn.Linear(in_d, mid_d),  # 入力層 → 中間層
            nn.BatchNorm1d(mid_d),  # バッチ正規化
            nn.ReLU(),  # 活性化関数
            nn.Linear(mid_d, out_d),  # 中間層 → 出力層
            nn.BatchNorm1d(out_d),  # バッチ正規化
            nn.ReLU(),  # 活性化関数
        )

    def forward(self, input: torch.Tensor) -> torch.Tensor:
        return self.main(input)


# ===============================
# GIN：Graph Isomorphism Network
# ===============================
class GIN(nn.Module):
    def __init__(self, in_d: int, mid_d: int, out_d: int):
        """
        GINモデルの定義
        - 3層のGINConvを通じてノード埋め込みを生成し、
        - グラフレベルの分類を行う
        """
        super(GIN, self).__init__()

        # 入力特徴量を最初にMLPで変換
        self.first_layer = MLP(in_d, mid_d, mid_d)

        # GINConv 層（MLPベース）
        self.conv1 = GINConv(MLP(mid_d, mid_d, mid_d))
        self.conv2 = GINConv(MLP(mid_d, mid_d, mid_d))
        self.conv3 = GINConv(MLP(mid_d, mid_d, mid_d))

        # グラフ埋め込みを使って分類する予測器（2層MLP）
        self.predictor = nn.Sequential(
            nn.Linear(mid_d, mid_d),
            nn.BatchNorm1d(mid_d),
            nn.ReLU(),
            nn.Linear(mid_d, out_d),
        )

    def forward(self, data: Data) -> torch.Tensor:
        """
        順伝播処理
        data.x: ノード特徴行列 (shape: [num_nodes, in_d])
        data.edge_index: 隣接関係を示すインデックス (shape: [2, num_edges])
        data.batch: 各ノードが属するグラフのインデックス (shape: [num_nodes])
        """
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # 入力特徴量をMLPで変換
        x = self.first_layer(x)

        # GINConv を3層通す（ノード埋め込みの更新）
        x = self.conv1(x, edge_index)
        x = self.conv2(x, edge_index)
        x = self.conv3(x, edge_index)

        # グラフごとに max pooling をしてグラフ特徴量を得る
        graph_embeddings = scatter(x, batch, dim=0, reduce="max")

        # グラフ埋め込みをMLPで分類
        res = self.predictor(graph_embeddings)

        # 出力を log_softmax で正規化して返す（分類用）
        return F.log_softmax(res, dim=1)

In [6]:
# GIN モデルのインスタンスを作成する
# - 入力次元数: dataset.num_node_features（各ノードの特徴量の次元数）
# - 中間層の次元数: 32（埋め込みの次元数。任意に設定）
# - 出力次元数: dataset.num_classes（分類対象のクラス数）

model = GIN(
    in_d=dataset.num_node_features,  # 入力特徴量の次元数（例：ノードの特徴ベクトルの長さ）
    mid_d=32,  # 中間表現の次元数（ハイパーパラメータとして調整可能）
    out_d=dataset.num_classes,  # クラス数（出力層のノード数）
)

In [7]:
# ===============================
# 最適化手法の定義（SGD: 確率的勾配降下法）
# ===============================

optimizer = torch.optim.SGD(
    model.parameters(),  # モデル内の全パラメータ（勾配更新対象）
    lr=1e-2,  # 学習率（learning rate）0.01
    weight_decay=1e-4,  # 重み減衰（L2正則化）係数：過学習防止のため
)

In [8]:
def train(epoch: int) -> None:
    """
    GINモデルの学習を行う関数

    Parameters:
        epoch (int): 学習エポック数（何回データセット全体を学習するか）
    """
    model.train()  # モデルを学習モードに設定（BatchNormやDropoutが有効になる）

    # 学習用データセットからバッチ単位でデータをロードするためのDataLoaderを作成
    loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    # 指定されたエポック数だけ学習ループを回す
    for i in range(epoch):
        for data in loader:
            optimizer.zero_grad()  # 勾配を初期化（前エポックの勾配をクリア）
            out = model(
                data
            )  # モデルにデータを入力して出力を得る（ロジット or ログソフトマックス）
            loss = F.nll_loss(out, data.y)  # 負の対数尤度損失を計算（分類タスク）
            loss.backward()  # 損失関数に基づいて勾配を計算（誤差逆伝播）
            optimizer.step()  # パラメータを1ステップ更新

In [9]:
%time train(500)

CPU times: user 5min 39s, sys: 2min, total: 7min 39s
Wall time: 6min 7s


In [10]:
# モデルを評価モードに設定（BatchNormやDropoutが推論モードになる）
model.eval()

# 正解数を累積する変数
acc: int = 0

# テストデータセットを32件ずつバッチ処理
for data in DataLoader(test_dataset, batch_size=32):
    # モデルの出力から予測クラスを取得（最大スコアのインデックス）
    pred = model(data).max(1)[1]  # .max(1)[1] は argmax と同義（最大値のインデックス）

    # 予測と正解ラベルが一致した数を加算
    acc += pred.eq(data.y).sum().item()

# テスト全体に対する正解率（Accuracy）を計算
acc / len(test_dataset)

0.8077858880778589