In [4]:
import os
from collections import defaultdict
from copy import deepcopy

import numpy as np
import pandas as pd
import torch
from torch_geometric.data import Data

from pypower.api import case30, rundcpf, ppoption
from pypower.idx_bus import PD

from algorithm import (
    gatcase30,
    gat_cnn_case30,
    gcncase30,
    cnn_transformer_locator,
    baseline_gae_30,
    FLDAlgorithm,
)
from utils import (
    observe_state,
    get_admittance_matrix,
    BFS_algorithm,
    find_edge_indices_within_nodes,
)



In [5]:
def calculate_metrics(TP, FP, TN, FN):
    """计算性能指标"""
    total = TP + FP + TN + FN
    if total == 0:
        return {
            'accuracy': 0.0,
            'false_alarm': 0.0,
            'miss_detection': 0.0,
            'precision': 0.0,
            'recall': 0.0,
            'f1_score': 0.0
        }
    
    accuracy = (TP + TN) / total
    false_alarm = FP / (FP + TN) if (FP + TN) > 0 else 0.0
    miss_detection = FN / (TP + FN) if (TP + FN) > 0 else 0.0
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0.0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0.0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
    
    return {
        'accuracy': accuracy,
        'false_alarm': false_alarm,
        'miss_detection': miss_detection,
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score
    }


def evaluate_model(model, input_tensor, y_bool_tensor, threshold=0.5):
    """评估模型预测"""
    with torch.no_grad():
        output = (model(input_tensor) > threshold).bool()
    
    TP = torch.sum(output & y_bool_tensor).item()
    FP = torch.sum(output & ~y_bool_tensor).item()
    TN = torch.sum(~output & ~y_bool_tensor).item()
    FN = torch.sum(~output & y_bool_tensor).item()
    
    return TP, FP, TN, FN


def evaluate_optimization(optimizer, va_post, pbus_post, results_origin, y_bool):
    """评估优化算法"""
    prediction = optimizer.eval(va_post, pbus_post, results_origin)
    # FLDAlgorithm返回整个分支数组，需要提取E_H对应的部分
    prediction_bool = (prediction[optimizer.e_set] > 0.5).astype(bool)
    
    TP = np.sum(prediction_bool & y_bool)
    FP = np.sum(prediction_bool & ~y_bool)
    TN = np.sum(~prediction_bool & ~y_bool)
    FN = np.sum(~prediction_bool & y_bool)
    
    return TP, FP, TN, FN


def generate_attack(mpc, random_index, attack_types=['cutting', 'weak attack']):
    """生成攻击向量"""
    attacks = np.zeros_like(mpc['branch'][:, 3])
    for idx in random_index:
        attack_type = np.random.choice(attack_types)
        if attack_type == 'cutting':
            attacks[idx] = np.random.randint(10000, 100000) / 100
        else:
            attacks[idx] = np.random.randint(150, 500) / 100
    return attacks

In [6]:
# ========== 初始化配置 ==========
mpc = deepcopy(case30())
B = get_admittance_matrix(mpc)
start_node = 6
length_V_H = 8
V_H = BFS_algorithm(mpc, start_node, length_V_H)
E_H = find_edge_indices_within_nodes(mpc['branch'], V_H)
# BFS_algorithm返回的是1-based节点编号，需要转换为0-based索引
V_H_zero = [int(v) - 1 for v in V_H]
V_H = [int(v) for v in V_H]

N = mpc['bus'].shape[0]
mean_load = 8
std_load = 10
num_load_samples = 50
num_trials_per_load = 4

# 构建图结构
edges = mpc["branch"][:, :2].astype(int) - 1
edge_index = torch.tensor(edges.T, dtype=torch.long)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
simulation_options = ppoption(PF_DC=True, VERBOSE=0, OUT_ALL=0)

print(f"设备: {device}")
print(f"隐藏节点数: {len(V_H)}, 隐藏边数: {len(E_H)}")
print(f"V_H (0-based): {V_H}")
print(f"E_H: {E_H}")

# ========== 初始化优化算法（使用 0-based V_H_zero）==========
fld_optimizer = FLDAlgorithm(case30(), V_H_zero, E_H, B)
print("✓ 成功初始化 FLD 优化算法")

# ========== 加载模型（对齐 main.py 配置）==========
models = {}
model_configs = [
    (
        'GAT',
        gatcase30,
        'model/case30_gat_baseline.pth',
        {
            'e_h': E_H,
            'edge_index': edge_index,
            'gat_channels': (256, 256, 256),
            'gat_heads': (4, 4, 4),
            'mlp_dims': (256, 128, 128),
            'dropout': 0.2,
            'num_nodes': N,
        },
    ),
    (
        'GCN Baseline',
        gcncase30,
        'model/case30_gcn_baseline.pth',
        {
            'v_h': V_H_zero,
            'e_h': E_H,
            'edge_index': edge_index,
            'gcn_channels': (256, 256, 256),
            'mlp_dims': (256, 128, 128),
            'dropout': 0.2,
            'num_nodes': N,
        },
    ),
    (
        'GAT-CNN',
        gat_cnn_case30,
        'model/case30_gat_cnn.pth',
        {
            'v_h': V_H_zero,
            'e_h': E_H,
            'edge_index': edge_index,
            'gat_channels': (256, 256, 256),
            'gat_heads': (4, 4, 4),
            'dropout': 0.2,
            'num_nodes': N,
        },
    ),
    (
        'CNN Transformer',
        cnn_transformer_locator,
        'model/case30_cnn_transformer.pth',
        {
            'mlp_out_features': len(E_H),
            'num_nodes': N,
            'e_h': E_H,
            'edge_index': edge_index,
            # 使用默认 Transformer 配置
        },
    ),
    (
        'GAE',
        baseline_gae_30,
        'model/case30_gae.pth',
        {
            'mlp_out_features': len(E_H),
            'num_nodes': N,
            'e_h': E_H,
            'edge_index': edge_index,
            'dropout': 0.2,
        },
    ),
]

print("\n开始加载模型...")
for name, model_class, model_path, kwargs in model_configs:
    try:
        # 所有模型使用 3 维特征 (Va, Pbus, Load)
        model = model_class(in_features=3, **kwargs).to(device)

        possible_paths = [
            model_path,
            os.path.join('model', os.path.basename(model_path)),
            os.path.basename(model_path),
        ]
        loaded = False
        for path in possible_paths:
            if os.path.exists(path):
                state_dict = torch.load(path, map_location=device)
                model.load_state_dict(state_dict, strict=True)
                model.eval()
                models[name] = model
                print(f"✓ 成功加载模型: {name} (从 {path})")
                loaded = True
                break
        if not loaded:
            print(f"✗ 加载模型 {name} 失败: 找不到模型文件")
            print(f"  尝试的路径: {possible_paths}")
    except Exception as e:
        print(f"✗ 加载模型 {name} 失败: {e}")
        import traceback
        traceback.print_exc()

print(f"\n成功加载 {len(models)} 个模型: {list(models.keys())}")

# ========== 初始化优化算法（FLD）==========
fld_optimizer = FLDAlgorithm(case30(), V_H_zero, E_H, B)
print("✓ 成功初始化 FLD 优化算法")

# ========== 性能评估 ==========
if len(models) == 0:
    print("错误: 没有成功加载任何模型，无法进行测试！")
else:
    results_summary = defaultdict(lambda: defaultdict(lambda: {'TP': 0, 'FP': 0, 'TN': 0, 'FN': 0}))
    
    for rand_num in range(1, len(E_H) + 1):
        print(f"\n{'='*60}")
        print(f"测试 {rand_num} 条故障线路")
        print(f"{'='*60}")
        
        # 重置当前故障数的计数器（包括FLD）
        for algo_name in list(models.keys()) + ['FLD']:
            results_summary[algo_name][rand_num] = {'TP': 0, 'FP': 0, 'TN': 0, 'FN': 0}
        
        # 生成负载数据
        Load = pd.DataFrame(np.round(np.clip(
            np.random.normal(mean_load, std_load, (num_load_samples, N)), 0.1, 20
        ), 2))
        
        for load_idx, load in enumerate(Load.values):
            for trial in range(num_trials_per_load):
                # 随机选择故障线路
                random_index = np.random.choice(E_H, rand_num, replace=False)
                
                # 生成攻击
                attacks = generate_attack(mpc, random_index)
                
                # 运行原始潮流
                mpc_test = deepcopy(case30())
                mpc_test['bus'][:, PD] = load
                results_origin, _ = rundcpf(mpc_test, simulation_options)
                Va_origin, Pbus_origin = observe_state(results_origin)
                
                # 应用攻击并运行故障后潮流
                mpc_test['branch'][random_index, 3] *= attacks[random_index]
                results_post, _ = rundcpf(mpc_test, simulation_options)
                Va_post, Pbus_post = observe_state(results_post)
                
                # 准备输入数据（对齐训练/推理：Va, Pbus, Load）
                load_norm = (mpc_test['bus'][:, PD].reshape(-1, 1) / results_origin['baseMVA'])
                input_array = np.concatenate((Va_post, Pbus_post, load_norm), axis=1)
                input_array[V_H_zero, 1] = 0.0  # 隐藏V_H节点的功率注入
                input_array[V_H_zero, 2] = 0.0  # 隐藏V_H节点的负荷特征
                
                # 准备真实标签
                y = np.zeros(mpc_test['branch'].shape[0])
                y[random_index] = 1
                y_bool = y[E_H].astype(bool)
                y_bool_tensor = torch.tensor(y_bool, dtype=torch.bool, device=device)
                
                # 转换为PyTorch张量
                input_tensor = Data(
                    x=torch.tensor(input_array, dtype=torch.float).to(device),
                    edge_index=edge_index.to(device)
                )
                
                # 评估所有模型
                for model_name, model in models.items():
                    try:
                        TP, FP, TN, FN = evaluate_model(model, input_tensor, y_bool_tensor)
                        results_summary[model_name][rand_num]['TP'] += TP
                        results_summary[model_name][rand_num]['FP'] += FP
                        results_summary[model_name][rand_num]['TN'] += TN
                        results_summary[model_name][rand_num]['FN'] += FN
                    except Exception as e:
                        print(f"评估模型 {model_name} 时出错: {e}")
                        import traceback
                        traceback.print_exc()
                
                # 评估FLD优化算法（使用 0-based V_H_zero 配置）
                try:
                    Va_post_flat = Va_post.reshape(-1)
                    Pbus_post_flat = Pbus_post.reshape(-1)
                    TP, FP, TN, FN = evaluate_optimization(
                        fld_optimizer, Va_post_flat, Pbus_post_flat, results_origin, y_bool
                    )
                    results_summary['FLD'][rand_num]['TP'] += TP
                    results_summary['FLD'][rand_num]['FP'] += FP
                    results_summary['FLD'][rand_num]['TN'] += TN
                    results_summary['FLD'][rand_num]['FN'] += FN
                except Exception as e:
                    print(f"评估FLD算法时出错: {e}")
                    import traceback
                    traceback.print_exc()
        
        # 计算并打印所有算法的性能指标
        print(f"\n性能指标汇总 (故障线路数: {rand_num}):")
        print("-" * 100)
        print(f"{'算法':<20} {'准确率':<10} {'误报率':<10} {'漏检率':<10} {'精确率':<10} {'召回率':<10} {'F1分数':<10}")
        print("-" * 100)
        
        for algo_name in sorted(results_summary.keys()):
            metrics = results_summary[algo_name][rand_num]
            perf = calculate_metrics(metrics['TP'], metrics['FP'], metrics['TN'], metrics['FN'])
            print(f"{algo_name:<20} "
                  f"{perf['accuracy']:<10.4f} "
                  f"{perf['false_alarm']:<10.4f} "
                  f"{perf['miss_detection']:<10.4f} "
                  f"{perf['precision']:<10.4f} "
                  f"{perf['recall']:<10.4f} "
                  f"{perf['f1_score']:<10.4f}")
    
    # 打印最终汇总
    print(f"\n{'='*100}")
    print("所有故障线路数的性能汇总")
    print(f"{'='*100}")
    print(f"{'算法':<20} {'故障数':<10} {'准确率':<10} {'误报率':<10} {'漏检率':<10} {'精确率':<10} {'召回率':<10} {'F1分数':<10}")
    print("-" * 100)
    
    for algo_name in sorted(results_summary.keys()):
        for fault_num in sorted(results_summary[algo_name].keys()):
            metrics = results_summary[algo_name][fault_num]
            perf = calculate_metrics(metrics['TP'], metrics['FP'], metrics['TN'], metrics['FN'])
            print(f"{algo_name:<20} "
                  f"{fault_num:<10} "
                  f"{perf['accuracy']:<10.4f} "
                  f"{perf['false_alarm']:<10.4f} "
                  f"{perf['miss_detection']:<10.4f} "
                  f"{perf['precision']:<10.4f} "
                  f"{perf['recall']:<10.4f} "
                  f"{perf['f1_score']:<10.4f}")

设备: cuda
隐藏节点数: 8, 隐藏边数: 8
V_H (0-based): [6, 10, 2, 4, 12, 15, 22, 24]
E_H: [2, 5, 6, 11, 14, 17, 27, 30]
✓ 成功初始化 FLD 优化算法

开始加载模型...
✓ 成功加载模型: GAT (从 model/case30_gat_baseline.pth)
✓ 成功加载模型: GCN Baseline (从 model/case30_gcn_baseline.pth)
✓ 成功加载模型: GAT-CNN (从 model/case30_gat_cnn.pth)
✓ 成功加载模型: CNN Transformer (从 model/case30_cnn_transformer.pth)
✓ 成功加载模型: GAE (从 model/case30_gae.pth)

成功加载 5 个模型: ['GAT', 'GCN Baseline', 'GAT-CNN', 'CNN Transformer', 'GAE']
✓ 成功初始化 FLD 优化算法

测试 1 条故障线路

性能指标汇总 (故障线路数: 1):
----------------------------------------------------------------------------------------------------
算法                   准确率        误报率        漏检率        精确率        召回率        F1分数      
----------------------------------------------------------------------------------------------------
CNN Transformer      0.9544     0.0093     0.3000     0.9150     0.7000     0.7932    
FLD                  0.9856     0.0000     0.1150     1.0000     0.8850     0.9390    
GAE                  0.93