In [1]:
!pip install torch-geometric
!pip install pyg_lib torch_scatter torch_sparse -f https://data.pyg.org/whl/torch-2.6.0+cu124.html

Collecting torch-geometric
  Downloading torch_geometric-2.6.1-py3-none-any.whl.metadata (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.1/63.1 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Downloading torch_geometric-2.6.1-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m19.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torch-geometric
Successfully installed torch-geometric-2.6.1
Looking in links: https://data.pyg.org/whl/torch-2.6.0+cu124.html
Collecting pyg_lib
  Downloading https://data.pyg.org/whl/torch-2.6.0%2Bcu124/pyg_lib-0.4.0%2Bpt26cu124-cp311-cp311-linux_x86_64.whl (4.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.7/4.7 MB[0m [31m42.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting torch_scatter
  Downloading https://data.pyg.org/whl/torch-2.6.0%2Bcu124/torch_scatter-2.1.2%2Bpt26cu124-cp311-cp311-linux_x86_64.whl (10.8 MB)
[2K     [90m━━━━━━

In [2]:
import torch
import torch.nn.functional as F
from torch.nn import ModuleList
from torch.optim import Adam

from torch_geometric.datasets import Planetoid, BAShapes
from torch_geometric.transforms import NormalizeFeatures

from torch_geometric.nn import GCNConv
from torch_geometric.nn import HypergraphConv
from torch_geometric.data import Data
from sklearn.decomposition import PCA

from sklearn.metrics.pairwise import cosine_similarity

import numpy as np
import pandas as pd

from collections import defaultdict
from collections import Counter

import matplotlib.pyplot as plt
import seaborn as sns

In [3]:
# 마스크 생성 함수
def create_scaled_masks(data, train_ratio=0.1, val_ratio=0.15, test_ratio=0.25, seed=42):
    np.random.seed(seed)
    num_nodes = data.num_nodes
    labels = data.y.cpu().numpy()
    num_classes = labels.max() + 1

    train_per_class = max(1, int(train_ratio * num_nodes / num_classes))
    val_total = int(val_ratio * num_nodes)
    test_total = int(test_ratio * num_nodes)

    train_mask = torch.zeros(num_nodes, dtype=torch.bool)
    val_mask = torch.zeros(num_nodes, dtype=torch.bool)
    test_mask = torch.zeros(num_nodes, dtype=torch.bool)

    for c in range(num_classes):
        idx = np.where(labels == c)[0]
        chosen = np.random.choice(idx, size=min(train_per_class, len(idx)), replace=False)
        train_mask[chosen] = True

    rest_idx = np.where(~train_mask.cpu().numpy())[0]
    chosen_val = np.random.choice(rest_idx, size=min(val_total, len(rest_idx)), replace=False)
    val_mask[chosen_val] = True

    rest_idx2 = np.setdiff1d(rest_idx, chosen_val)
    chosen_test = np.random.choice(rest_idx2, size=min(test_total, len(rest_idx2)), replace=False)
    test_mask[chosen_test] = True

    data.train_mask = train_mask
    data.val_mask = val_mask
    data.test_mask = test_mask

    return data

In [5]:
# BAShapes feature vector 생성
def create_mean_shifted_features(data, shift_scale=4.0, seed=42):
    torch.manual_seed(seed)
    num_features = data.num_features
    num_classes = data.y.max().item() + 1

    new_x = torch.zeros(data.num_nodes, num_features, device=data.x.device)

    for c in range(num_classes):
        class_mask = (data.y == c)
        idx = class_mask.nonzero(as_tuple=True)[0]  # 클래스 c에 해당하는 노드 인덱스
        n_c = idx.size(0)
        x_c = torch.randn(n_c, num_features, device=data.x.device) + c * shift_scale  # 클래스별 평균 다른 정규분포 샘플링
        new_x[idx] = x_c

    data.x = new_x
    return data

In [6]:
# 1-hop 하이퍼엣지 생성
def generate_1hop_hyperedge_index(data):
    edge_index = data.edge_index
    num_nodes = data.num_nodes
    edge_dict = defaultdict(set)

    # 1-hop 이웃 관계 구성 (양방향으로 간주)
    for src, tgt in edge_index.t().tolist():
        edge_dict[src].add(tgt)
        edge_dict[tgt].add(src)

    # hyperedge 생성: 각 노드 + 그 이웃들 = 하나의 hyperedge
    node_list = []
    hyperedge_list = []
    for hyperedge_id, node in enumerate(range(num_nodes)):
        group = edge_dict[node] | {node}  # 자신 포함
        for n in group:
            node_list.append(n)
            hyperedge_list.append(hyperedge_id)

    hyperedge_index = torch.tensor([node_list, hyperedge_list], dtype=torch.long)
    return hyperedge_index

In [7]:
# PCA + k-nn 하이퍼엣지 생성
def generate_pca_knn_hyperedge_index(data: Data, k: int, pca_dim: int = 0) -> torch.Tensor:
    x = data.x
    if x.is_sparse:
        x = x.to_dense()
    x = x.cpu().numpy()

    # 차원이 100 보다 작은 경우 pca_dim을 0으로 주어 PCA 진행 X
    if pca_dim > 0:
      x_reduced = PCA(n_components=pca_dim).fit_transform(x)
    else:
      x_reduced = x

    sim_matrix = cosine_similarity(x_reduced)

    node_list, hyperedge_list = [], []
    num_nodes = sim_matrix.shape[0]

    for i in range(num_nodes):
        sim_matrix[i, i] = -1
        top_k = sim_matrix[i].argsort()[-(k+1):]
        for j in top_k:
            node_list.append(j)
            hyperedge_list.append(i)

    hyperedge_index = torch.tensor([node_list, hyperedge_list], dtype=torch.long)

    return hyperedge_index

In [8]:
# dataset: Cora, Citeseer, Pubmed, BAShapes

normalize = NormalizeFeatures()
datasets = {}

# Planetoid
planetoid_names = ['Cora', 'Citeseer', 'Pubmed']
knn_config = {  # 평균 degree 반올림 + 자기자신
    'Cora': 4+1,
    'Citeseer': 3+1,
    'Pubmed': 4+1
}

for name in planetoid_names:
    data = Planetoid(root=f'data/{name}', name=name, transform=normalize)[0]

    data_1hop = data.clone()
    data_1hop.hyperedge_index = generate_1hop_hyperedge_index(data_1hop)
    datasets[f'{name}_1hop'] = data_1hop

    data_knn = data.clone()
    k = knn_config[name]
    data_knn.hyperedge_index = generate_pca_knn_hyperedge_index(data_knn, k=k, pca_dim=100)
    datasets[f'{name}_k-nn'] = data_knn


# BAShapes - 구조 기반 (feature: all ones)
bas_struct = BAShapes()[0]
bas_struct = normalize(bas_struct)
bas_struct = create_scaled_masks(bas_struct)
bas_struct.x = torch.ones(bas_struct.num_nodes, 1)

bas_struct_1hop = bas_struct.clone()
bas_struct_1hop.hyperedge_index = generate_1hop_hyperedge_index(bas_struct_1hop)
datasets['BAShapes_1hop'] = bas_struct_1hop

bas_struct_knn = bas_struct.clone()
bas_struct_knn.hyperedge_index = generate_pca_knn_hyperedge_index(bas_struct_knn, k=6+1)
datasets['BAShapes_k-nn'] = bas_struct_knn


# BAShapes - feature-injected
bas_feat = BAShapes()[0]
bas_feat = normalize(bas_feat)
bas_feat = create_scaled_masks(bas_feat)
bas_feat = create_mean_shifted_features(bas_feat)

bas_feat_1hop = bas_feat.clone()
bas_feat_1hop.hyperedge_index = generate_1hop_hyperedge_index(bas_feat_1hop)
datasets['BAShapes_features_1hop'] = bas_feat_1hop

bas_feat_knn = bas_feat.clone()
bas_feat_knn.hyperedge_index = generate_pca_knn_hyperedge_index(bas_feat_knn, k=6+1)
datasets['BAShapes_features_k-nn'] = bas_feat_knn

Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.cora.x
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.cora.tx
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.cora.allx
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.cora.y
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.cora.ty
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.cora.ally
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.cora.graph
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.cora.test.index
Processing...
Done!
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.citeseer.x
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.citeseer.tx
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.citeseer.allx
Downloading https://github.com/kimiyoung/planetoid/raw/master/data/ind.citeseer.y
Dow

In [9]:
# hypergraph 정보
def summarize_hypergraph(data):
    if not hasattr(data, 'hyperedge_index'):
        return "No hyperedge_index"

    he = data.hyperedge_index
    num_nodes = data.num_nodes
    num_hyperedges = he[1].max().item() + 1 if he.numel() > 0 else 0

    # 각 하이퍼엣지가 연결한 노드 수
    edge_sizes = Counter(he[1].tolist())
    avg_size = sum(edge_sizes.values()) / len(edge_sizes) if edge_sizes else 0

    return f"{num_hyperedges} hyperedges, {avg_size:.2f} avg size"

# dataset 정보
for name, data in datasets.items():
    print(f"\n Dataset: {name}")
    print(f" - Nodes         : {data.num_nodes}")
    print(f" - Edges         : {data.num_edges}")
    print(f" - Features      : {data.num_node_features}")
    print(f" - Classes       : {data.y.unique().numel()}")
    print(f" - Hypergraph    : {summarize_hypergraph(data)}")


 Dataset: Cora_1hop
 - Nodes         : 2708
 - Edges         : 10556
 - Features      : 1433
 - Classes       : 7
 - Hypergraph    : 2708 hyperedges, 4.90 avg size

 Dataset: Cora_k-nn
 - Nodes         : 2708
 - Edges         : 10556
 - Features      : 1433
 - Classes       : 7
 - Hypergraph    : 2708 hyperedges, 6.00 avg size

 Dataset: Citeseer_1hop
 - Nodes         : 3327
 - Edges         : 9104
 - Features      : 3703
 - Classes       : 6
 - Hypergraph    : 3327 hyperedges, 3.74 avg size

 Dataset: Citeseer_k-nn
 - Nodes         : 3327
 - Edges         : 9104
 - Features      : 3703
 - Classes       : 6
 - Hypergraph    : 3327 hyperedges, 5.00 avg size

 Dataset: Pubmed_1hop
 - Nodes         : 19717
 - Edges         : 88648
 - Features      : 500
 - Classes       : 3
 - Hypergraph    : 19717 hyperedges, 5.50 avg size

 Dataset: Pubmed_k-nn
 - Nodes         : 19717
 - Edges         : 88648
 - Features      : 500
 - Classes       : 3
 - Hypergraph    : 19717 hyperedges, 6.00 avg siz

In [10]:
# GCN
class GCN(torch.nn.Module):
    def __init__(self, in_channels, out_channels, hidden_channels=16, num_layers=2, dropout=0.5):
        super().__init__()
        self.convs = ModuleList()
        self.convs.append(GCNConv(in_channels, hidden_channels))
        for _ in range(num_layers - 2):
            self.convs.append(GCNConv(hidden_channels, hidden_channels))
        self.convs.append(GCNConv(hidden_channels, out_channels))
        self.dropout = dropout

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        for i, conv in enumerate(self.convs[:-1]):
            x = F.relu(conv(x, edge_index))
            x = F.dropout(x, p=self.dropout, training=self.training)
        return self.convs[-1](x, edge_index)

    def get_hidden_embeddings(self, data):
        x, edge_index = data.x, data.edge_index
        for conv in self.convs[:-1]:
            x = F.relu(conv(x, edge_index))
            x = F.dropout(x, p=self.dropout, training=False)
        return x

# HyperGCN
class HyperGCN(torch.nn.Module):
    def __init__(self, in_channels, out_channels, hidden_channels=16, num_layers=2, dropout=0.5):
        super().__init__()
        self.convs = ModuleList()
        self.convs.append(HypergraphConv(in_channels, hidden_channels))
        for _ in range(num_layers - 2):
            self.convs.append(HypergraphConv(hidden_channels, hidden_channels))
        self.convs.append(HypergraphConv(hidden_channels, out_channels))
        self.dropout = dropout

    def forward(self, data):
        x, hyperedge_index = data.x, data.hyperedge_index
        for i, conv in enumerate(self.convs[:-1]):
            x = F.relu(conv(x, hyperedge_index))
            x = F.dropout(x, p=self.dropout, training=self.training)
        return self.convs[-1](x, hyperedge_index)

    def get_hidden_embeddings(self, data):
        x, hyperedge_index = data.x, data.hyperedge_index
        for conv in self.convs[:-1]:
            x = F.relu(conv(x, hyperedge_index))
            x = F.dropout(x, p=self.dropout, training=False)
        return x

In [11]:
# train, test
def train(model, data, optimizer, epoch=None):
    model.train()
    optimizer.zero_grad()
    out = model(data)
    loss = F.cross_entropy(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()
    if epoch is not None and epoch % 10 == 0:
        print(f"[Epoch {epoch:>3}] Loss: {loss.item():.4f}")

@torch.no_grad()
def test(model, data):
    model.eval()
    out = model(data)
    accs = []
    for mask in [data.train_mask, data.val_mask, data.test_mask]:
        pred = out[mask].argmax(dim=1)
        acc = (pred == data.y[mask]).sum().item() / mask.sum().item()
        accs.append(acc)
    return accs

In [12]:
# training, validation, test
def run_experiment(name="Cora", model_class=GCN, epochs=200, lr=0.01, weight_decay=5e-4, verbose=False):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    data = datasets[name].to(device)

    num_features = data.num_features
    num_classes = data.y.unique().numel()

    model = model_class(num_features, num_classes).to(device)
    optimizer = Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

    best_val_acc = 0.0
    test_acc_at_best_val = 0.0
    final_train_acc = 0.0

    for epoch in range(1, epochs + 1):
        train(model, data, optimizer, epoch if verbose else None)
        accs = test(model, data)
        train_acc, val_acc, test_acc = accs

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            test_acc_at_best_val = test_acc
            final_train_acc = train_acc

        if verbose and epoch % 10 == 0:
            print(f"[Epoch {epoch:>3}] Val Acc: {val_acc:.4f} | Test Acc: {test_acc:.4f}")

    return {
        "Model": model_class.__name__,
        "Dataset": name,
        "Train Acc @ Best Val": final_train_acc,
        "Best Val Acc": best_val_acc,
        "Test Acc @ Best Val": test_acc_at_best_val
    }

In [13]:
# 실행
def run_all_experiments(model_classes=[GCN, HyperGCN], dataset_dict=datasets, verbose=False):
    results = []

    for model_cls in model_classes:
        for name in dataset_dict.keys():
            if model_cls.__name__ == "GCN" and '_k-nn' in name:  # GCN은 두 번 실험할 필요 X
              continue
            print(f"\n---------- {model_cls.__name__} | {name} ----------")
            result = run_experiment(name=name, model_class=model_cls, verbose=verbose)
            results.append(result)

    return pd.DataFrame(results)

# 결과 저장 및 출력
df = run_all_experiments(verbose=True)
print("Final Results:")
print(df)


---------- GCN | Cora_1hop ----------
[Epoch  10] Loss: 1.8656
[Epoch  10] Val Acc: 0.3080 | Test Acc: 0.3600
[Epoch  20] Loss: 1.7324
[Epoch  20] Val Acc: 0.6180 | Test Acc: 0.6260
[Epoch  30] Loss: 1.5253
[Epoch  30] Val Acc: 0.6620 | Test Acc: 0.6750
[Epoch  40] Loss: 1.3173
[Epoch  40] Val Acc: 0.7320 | Test Acc: 0.7390
[Epoch  50] Loss: 1.0889
[Epoch  50] Val Acc: 0.7600 | Test Acc: 0.7690
[Epoch  60] Loss: 0.9093
[Epoch  60] Val Acc: 0.7740 | Test Acc: 0.7860
[Epoch  70] Loss: 0.7308
[Epoch  70] Val Acc: 0.7680 | Test Acc: 0.7920
[Epoch  80] Loss: 0.6568
[Epoch  80] Val Acc: 0.7780 | Test Acc: 0.7990
[Epoch  90] Loss: 0.6320
[Epoch  90] Val Acc: 0.7840 | Test Acc: 0.8020
[Epoch 100] Loss: 0.5222
[Epoch 100] Val Acc: 0.7880 | Test Acc: 0.8120
[Epoch 110] Loss: 0.4943
[Epoch 110] Val Acc: 0.7860 | Test Acc: 0.8170
[Epoch 120] Loss: 0.4261
[Epoch 120] Val Acc: 0.7840 | Test Acc: 0.8000
[Epoch 130] Loss: 0.4046
[Epoch 130] Val Acc: 0.7800 | Test Acc: 0.7960
[Epoch 140] Loss: 0.3975


In [14]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

def extract_base_dataset(name):
    return name.rpartition('_')[0] if '_' in name else name

def extract_edge_type(name):
    if '_1hop' in name:
        return 'HGConv-1hop'
    elif '_k-nn' in name:
        return 'HGConv-k-NN'
    else:
        return 'GCN'

def plot_grouped_test_accuracy(df, save_path="test_accuracy_plot.png"):
    df = df.copy()
    df['DatasetType'] = df['Dataset'].apply(extract_base_dataset)
    df['EdgeType'] = df['Dataset'].apply(extract_edge_type)

    # GCN 명시적으로 처리
    df.loc[df['Model'] == 'GCN', 'EdgeType'] = 'GCN'

    # 막대 순서 설정
    df['EdgeType'] = pd.Categorical(df['EdgeType'], ['GCN', 'HGConv-1hop', 'HGConv-k-NN'], ordered=True)

    plt.figure(figsize=(14, 6))
    sns.barplot(
        data=df,
        x="DatasetType",
        y="Test Acc @ Best Val",
        hue="EdgeType",
        dodge=True
    )

    plt.title("Test Accuracy by Dataset")
    plt.ylim(0, 1)
    plt.ylabel("Accuracy")
    plt.xlabel("Dataset")
    plt.legend(title="Edge Type")
    plt.grid(True, linestyle='--', alpha=0.3)
    plt.tight_layout()

    # 저장
    plt.savefig(save_path, dpi=300)
    plt.close()

# 사용 예시
plot_grouped_test_accuracy(df, save_path="grouped_test_accuracy.png")