In [None]:
import sys
sys.path.append("/home/arc/Development/DeepHypergraph/") 

In [None]:
import time
from copy import deepcopy

import torch
import torch.optim as optim
import torch.nn.functional as F

from dhg import Hypergraph
from dhg.data import DBLP8k
from dhg.models import HGNNPLinkPred
from dhg.random import set_seed
from dhg.metrics import LinkPredictionEvaluator as Evaluator

In [None]:
def train(net, X, hypergraph, negative_hypergraph, optimizer, epoch):
    net.train()

    st = time.time()
    optimizer.zero_grad()
    pos_score = net(X, hypergraph)
    neg_score = net(X, negative_hypergraph)

    scores = torch.cat([pos_score, neg_score]).squeeze()
    labels = torch.cat(
        [torch.ones(pos_score.shape[0]), torch.zeros(neg_score.shape[0])]
    ).to(device)

    loss = F.binary_cross_entropy_with_logits(scores, labels)
    loss.backward()
    optimizer.step()
    print(f"Epoch: {epoch}, Time: {time.time()-st:.5f}s, Loss: {loss.item():.5f}")
    return loss.item()

In [None]:
@torch.no_grad()
def infer(net, X, hypergraph, negative_hypergraph, test=False):
    net.eval()
    pos_score = net(X, hypergraph)
    neg_score = net(X, negative_hypergraph)

    scores = torch.cat([pos_score, neg_score]).squeeze()
    labels = torch.cat(
        [torch.ones(pos_score.shape[0]), torch.zeros(neg_score.shape[0])]
    ).to(device)

    if not test:
        res = evaluator.validate(labels, scores)
    else:
        res = evaluator.test(labels, scores)
    return res

In [None]:
import csv
from pathlib import Path

def load_data(file_path: Path):
    hyperedge_list = []
    neg_hyperedge_list = []
    with open(file_path / "hyperedges.csv", "r") as file:
        reader = csv.reader(file)
        for row in reader:
            # 读取每个超边的顶点列表，并将它们添加到 hyperedge_list 中
            hyperedge_list.append(row)
    
    hyperedge_list = [[int(v) for v in edge] for edge in hyperedge_list]

    with open(file_path / "minimal_unschedulable_combinations.csv", "r") as file:
        reader = csv.reader(file)
        for row in reader:
            neg_hyperedge_list.append(row) 

    neg_hyperedge_list = [[int(v) for v in edge] for edge in neg_hyperedge_list]

    with open(file_path / "task_quadruples.csv", 'r') as csvfile:
        reader = csv.reader(csvfile)
        data = [list(map(float, row)) for row in reader]

    # 将数据转换为 Tensor
    features = torch.tensor(data)

    data = {"hyperedge_list": hyperedge_list, "num_edges" : len(hyperedge_list)}
    neg_data = {"hyperedge_list": neg_hyperedge_list, "num_edges" : len(neg_hyperedge_list)}

    return {"pos":data, "neg": neg_data, "vertices_feature" : features, "num_vertices" : features.shape[0]}

In [None]:
set_seed(2021)
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
evaluator = Evaluator(["accuracy", "auc"])
train_data = load_data(Path("../EDF/data/data_s2233_p10_t21/"))
test_data = load_data(Path("../EDF/data/data_s2234_p10_t21/"))

X = train_data["vertices_feature"]
HG = Hypergraph(train_data["num_vertices"], train_data["pos"]["hyperedge_list"])
neg_HG = Hypergraph(train_data["num_vertices"], train_data["neg"]["hyperedge_list"])

test_X = test_data["vertices_feature"]
test_HG = Hypergraph(test_data["num_vertices"], test_data["pos"]["hyperedge_list"])
test_neg_HG = Hypergraph(test_data["num_vertices"], test_data["neg"]["hyperedge_list"])

In [None]:
import numpy as np

def calculate_sparsity(matrix):
    nonzero_elements = np.count_nonzero(matrix)
    total_elements = matrix.size

    nonzero_ratio = nonzero_elements / total_elements
    zero_ratio = 1 - nonzero_ratio

    print(f"非零元素比例：{nonzero_ratio:.2%}")
    print(f"零元素比例：{zero_ratio:.2%}")

# calculate_sparsity(HG.H.to_dense().cpu().numpy())

In [None]:
net = HGNNPLinkPred(X.shape[1], 64, 32, use_bn=True)

In [None]:
optimizer = optim.Adam(net.parameters(), lr=0.0001, weight_decay=5e-4)

In [None]:
X = X.to(device)
HG = HG.to(device)
neg_HG = neg_HG.to(device)
net = net.to(device)

test_X = test_X.to(device)
test_HG = test_HG.to(device)
test_neg_HG = test_neg_HG.to(device)

In [None]:
print(f"X: {X.device}")
print(f"HG: {HG.device}")
print(f"neg_HG: {neg_HG.device}")
print(f"net: {next(net.parameters()).device}")

In [None]:
best_state = None
best_epoch, best_val = 0, 0
for epoch in range(200):
    # train
    train(net, X, HG, neg_HG, optimizer, epoch)
    # validation
    if epoch % 1 == 0:
        with torch.no_grad():
            val_res = infer(net, test_X, test_HG, test_neg_HG)
        if val_res > best_val:
            print(f"update best: {val_res:.5f}")
            best_epoch = epoch
            best_val = val_res
            best_state = deepcopy(net.state_dict())
print("\ntrain finished!")
print(f"best val: {best_val:.5f}")


In [None]:
# test
print("test...")
net.load_state_dict(best_state)
res = infer(net, test_X, test_HG, test_neg_HG, test=True)
print(f"final result: epoch: {best_epoch}")
print(res)