In [13]:
import torch 
import pandas as pd 
import networkx as nx 
from itertools import combinations
from torch_geometric.data import Data 
from model import GAT

In [14]:
# Đọc dataset
file_path = r'D:\Năm 3 - HK2\Mạng xã hội\traffic-accident-analysis\data\new_data_template.csv'
df = pd.read_csv(file_path)

In [16]:
G = nx.Graph()
for index, row in df.iterrows():
    G.add_node(index, **row.to_dict())

def is_similar(acc1, acc2):
    return (
        abs(acc1['crash_hour'] - acc2['crash_hour']) <= 1 or
        acc1['crash_month'] == acc2['crash_month'] or
        acc1['crash_day_of_week'] == acc2['crash_day_of_week'] or
        acc1['trafficway_type'] == acc2['trafficway_type'] or
        acc1['first_crash_type'] == acc2['first_crash_type']
    )

for u, v in combinations(G.nodes(data=True), 2):
    if is_similar(u[1], v[1]):
        G.add_edge(u[0], v[0])

In [17]:
def networkx_to_pyg(G):
    node_mapping = {node: i for i, node in enumerate(G.nodes())}
    edge_index = torch.tensor([[node_mapping[u], node_mapping[v]] for u, v in G.edges()], dtype=torch.long).t().contiguous()

    features = []
    for _, data in G.nodes(data=True):
        node_features = [data[attr] for attr in data]
        features.append(node_features)

    x = torch.tensor(features, dtype=torch.float)
    return Data(x=x, edge_index=edge_index)

data = networkx_to_pyg(G)

In [23]:
from model import GAT
import torch
from torch.nn import LayerNorm

model = GAT(in_features=23, hidden_dim=16, out_features=3)
model.load_state_dict(torch.load('gat_model.pth'))
model.eval()


NameError: name 'LayerNorm' is not defined

In [10]:
from torch_geometric.nn import GATConv
import torch.nn.functional as F
import torch.nn as nn

class GAT(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, heads=2):
        super(GAT, self).__init__()
        self.gat1 = GATConv(in_channels, hidden_channels, heads=heads)
        self.gat2 = GATConv(hidden_channels * heads, out_channels, heads=1)
        
    def forward(self, x, edge_index):
        x = self.gat1(x, edge_index)
        x = F.elu(x)
        x = self.gat2(x, edge_index)
        return x


In [11]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GAT(in_channels=data.num_node_features, hidden_channels=32, out_channels=4).to(device)  # 4 mức độ damage
data = data.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.005, weight_decay=5e-4)

def train():
    model.train()
    optimizer.zero_grad()
    out = model(data.x, data.edge_index)
    loss = F.cross_entropy(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()
    return loss.item()

def test():
    model.eval()
    out = model(data.x, data.edge_index)
    pred = out.argmax(dim=1)
    acc = accuracy_score(data.y[data.test_mask].cpu(), pred[data.test_mask].cpu())
    f1 = f1_score(data.y[data.test_mask].cpu(), pred[data.test_mask].cpu(), average='macro')
    return acc, f1

for epoch in range(1, 101):
    loss = train()
    acc, f1 = test()
    if epoch % 10 == 0:
        print(f"Epoch {epoch:03d} | Loss: {loss:.4f} | Test Acc: {acc:.4f} | F1: {f1:.4f}")


Epoch 010 | Loss: 2.6882 | Test Acc: 0.7025 | F1: 0.2751
Epoch 020 | Loss: 1.1694 | Test Acc: 0.7025 | F1: 0.2751
Epoch 030 | Loss: 0.7998 | Test Acc: 0.7025 | F1: 0.2751
Epoch 040 | Loss: 0.8707 | Test Acc: 0.7025 | F1: 0.2751
Epoch 050 | Loss: 0.8049 | Test Acc: 0.7025 | F1: 0.2751
Epoch 060 | Loss: 0.7995 | Test Acc: 0.7025 | F1: 0.2751
Epoch 070 | Loss: 0.7996 | Test Acc: 0.7025 | F1: 0.2751
Epoch 080 | Loss: 0.7970 | Test Acc: 0.7025 | F1: 0.2751
Epoch 090 | Loss: 0.7959 | Test Acc: 0.7025 | F1: 0.2751
Epoch 100 | Loss: 0.7956 | Test Acc: 0.7025 | F1: 0.2751


In [12]:
# Mapping ngược (giả sử encoding gốc như sau — bạn có thể chỉnh lại nếu khác):
damage_label_map = {
    0: "$500 OR LESS",
    1: "$501 - $1,500",
    2: "OVER $1,500",
    3: "0"
}

# Dự đoán trên toàn bộ tập dữ liệu
model.eval()
with torch.no_grad():
    out = model(data.x, data.edge_index)
    pred = out.argmax(dim=1).cpu().numpy()

# Chuyển nhãn số thành nhãn gốc
decoded_preds = [damage_label_map[label] for label in pred]

# Ví dụ: Xem dự đoán đầu tiên
print(f"Dự đoán damage (dạng text): {decoded_preds[:10]}")


Dự đoán damage (dạng text): ['OVER $1,500', 'OVER $1,500', 'OVER $1,500', 'OVER $1,500', 'OVER $1,500', 'OVER $1,500', 'OVER $1,500', 'OVER $1,500', 'OVER $1,500', 'OVER $1,500']
