In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score
from sklearn import svm
from myfun_QNN import *
from torchvision import transforms
import matplotlib
matplotlib.use('Agg')  # 使用非交互式后端
import matplotlib.pyplot as plt

#############################
# ---------------------------
# 1. 数据加载与预处理
# ---------------------------
print("Loading Data...")
file_path = r'./data/data_koopman_6.txt'  # 请确保文件路径正确
data = np.loadtxt(file_path, delimiter=',')

# 分离特征和标签
X = data[:, :-1]
y = data[:, -1]

# 确保标签是二元的
unique_labels = np.unique(y)
if len(unique_labels) != 2:
    raise ValueError("该分类器仅支持二元分类。")

# 标签映射 {class_1, class_2} → {0,1}
label_map = {unique_labels[0]: 0, unique_labels[1]: 1}
y_mapped = np.vectorize(label_map.get)(y)


# 数据拆分 训练集/测试集
X_train, X_test, y_train, y_test = train_test_split(X, y_mapped, test_size=0.3, random_state=42)


# 标准化特征
scaler = MinMaxScaler(feature_range=(0, np.pi))
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 转换为 PyTorch Tensor
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)



##############################
# ---------------------------
# 3. 定义 HQNN 模型
# ---------------------------
def r_quantum_layer(re, params, x):
    """
    通过 re 将 x 分成 re 份，每部分用一个 quantum_layer 处理
    """
    split_size = len(x) // re
    split_size2 = len(params) // re
    results = []
    for i in range(re):
        # x_sub = x[i * split_size: (i + 1) * split_size]
        # params_sub = params[i * len(x_sub) * 3: (i + 1) * len(x_sub) * 3]
        results.append(quantum_layer(params[i * split_size2 * 3: (i + 1) * split_size2 * 3], x[i * split_size: (i + 1) * split_size]))

    # 使用 torch.cat 来拼接 PyTorch 张量，而不是 np.concatenate
    return torch.cat(results, dim=0)



def quantum_layer(params, x):

    sigma_x, sigma_y, sigma_z, I, Ix, Iy, Iz, ST0, ST1, KIx, KIy, KIz = MultiPauli(1)

    num_qubits = len(x)
    state_dim = 2 ** num_qubits
    state = torch.zeros((state_dim, 1), dtype=torch.complex128)
    state[0, 0] = 1  # 初始态 |0...0>

    # 角度编码为量子态
    encoding_matrix = torch.eye(1, dtype=torch.complex128)  # 使用 PyTorch 的 eye 函数来生成单位矩阵
    for i in range(num_qubits):
        theta=x[i]
        # theta.float()
        rotation = torch.matrix_exp(-1j * sigma_y * theta)
        encoding_matrix = torch.kron(encoding_matrix, rotation)

    if encoding_matrix.shape == (state_dim, state_dim):
        state = torch.matmul(encoding_matrix, state)
    else:
        raise ValueError(f'encoding_matrix 形状错误: {encoding_matrix.shape}, 期望 ({state_dim}, {state_dim})')

    num_layers = len(params) // (num_qubits * 3)
    for layer in range(num_layers):

        state = cnot_layer(num_qubits)@state

        uni_matrix = torch.eye(1, dtype=torch.complex128)

        for i in range(num_qubits):

            Up=torch.matrix_exp(-1j * sigma_y * params[layer * num_qubits * 3 + i * 3])@\
            torch.matrix_exp(-1j * sigma_z * params[layer * num_qubits * 3 + i * 3 + 1])@\
            torch.matrix_exp(-1j * sigma_y * params[layer * num_qubits * 3 + i * 3 + 2])

            uni_matrix = torch.kron(uni_matrix, Up)

        state = torch.matmul(uni_matrix, state)

    # 进行 Pauli-Z 测量
    measurements = measure_pauli_z(state, num_qubits)

    return measurements


class HQNN(nn.Module):
    def __init__(self, num_features):
        super(HQNN, self).__init__()
        self.fc1 = nn.Linear(num_features, 6)
        self.bn1 = nn.BatchNorm1d(6)
        self.q_params = nn.Parameter(torch.tensor(2 * np.pi * np.random.rand(6*9, 1)))   # 量子层参数
        self.fc3 = nn.Linear(6, 2)  # 最终分类层
        self.bn3 = nn.BatchNorm1d(2)

        # # 冻结第一层卷积的参数
        # for param in self.fc1.parameters():
        #     param.requires_grad = False  # 让第一层的卷积层参数不更新
        for param in self.fc3.parameters():
            param.requires_grad = False  # 让第一层的卷积层参数不更新
        for param in self.bn3.parameters():
            param.requires_grad = False  # 让第一层的卷积层参数不更新


    def forward(self, x):
        # x = torch.relu(self.bn1(self.fc1(x)))

        x = self.fc1(x)
        # x = self.bn1(x)

        q_out = torch.stack([r_quantum_layer(2, self.q_params, x[i]) for i in range(x.shape[0])])
        # q_out = torch.stack([quantum_layer(self.q_params, x[i]) for i in range(x.shape[0])])
        # q_out=x

        out0 = q_out[:, ::2].mean(dim=1)
        out1 = q_out[:, 1::2].mean(dim=1)
        out = torch.stack([out0, out1], dim=1)
        # x = self.bn3(self.fc3(q_out))
        # q_out = q_out
        # x = self.fc3(q_out)
        # x = self.bn3(self.fc3(q_out))

        # x = x**2
        # x = x / torch.norm(x)
        return out



# ---------------------------
# 4. 训练和测试 HQNN
# ---------------------------
model = HQNN(num_features=6)
# 只优化 q_params
# optimizer = optim.Adam([model.q_params], lr=0.01)
# optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.1)  # 使用带有动量的 SGD 优化器
# optimizer = optim.Adam(model.parameters(), lr=0.002)
optimizer = optim.AdamW(model.parameters(), lr=0.0005)  # 使用 AdamW 优化器


criterion = nn.CrossEntropyLoss()

# 记录训练过程中的损失和准确率
losses = []
train_accuracies = []
test_accuracies = []


import matplotlib.pyplot as plt


def train_model(model, X_train, y_train, epochs=100, batch_size=20):
    model.train()

    # Mini-batch 训练循环
    num_samples = X_train.shape[0]
    num_batches = num_samples // batch_size

    for epoch in range(epochs):
        for batch_idx in range(num_batches):
            start_idx = batch_idx * batch_size
            end_idx = start_idx + batch_size
            X_batch = X_train[start_idx:end_idx]
            y_batch = y_train[start_idx:end_idx]

            # 将 mini-batch 输入到模型中
            optimizer.zero_grad()
            outputs = model(X_batch)  # 直接输入 1D 特征
            loss = criterion(outputs, y_batch)

            loss.backward()
            optimizer.step()

            # 计算训练集准确率
            _, preds = torch.max(outputs, 1)
            acc = accuracy_score(y_batch.numpy(), preds.numpy())


            # 每次迭代后计算测试集准确率
            model.eval()
            with torch.no_grad():
                outputs_test = model(X_test_tensor)
                _, preds_test = torch.max(outputs_test, 1)
                test_acc = accuracy_score(y_test_tensor.numpy(), preds_test.numpy())

            model.train()

            losses.append(loss.item())
            train_accuracies.append(acc)
            test_accuracies.append(test_acc)

            print(
                f"Epoch {epoch + 1}/{epochs}, Batch {batch_idx + 1}/{num_batches}, Loss: {loss.item():.4f}, Train Accuracy: {acc:.3f}, Test Accuracy: {test_acc:.3f}")

    # 训练完毕后，输出预测错误的样本并保存到文件

    misclassified_indices = np.where(preds_test.numpy() != y_test)[0]

    with open('misclassified_samples_cnn.txt', 'w') as f:
        # 写入表头，便于后续读取
        f.write("特征,真实标签,预测标签\n")
        for idx in misclassified_indices:
            # 获取错误预测样本的特征、真实标签和预测标签
            feature = X_test[idx].numpy() if torch.is_tensor(X_test) else X_test[idx]
            true_label = y_test[idx]
            predicted_label = preds_test[idx].item()
            # 将特征数组转换为以空格分隔的字符串
            feature_str = " ".join(map(str, feature))
            # 构造一行数据，注意所有内容在一行
            line = f"{feature_str},{true_label},{predicted_label}\n"
            f.write(line)

    print("预测错误的样本已保存到 'misclassified_samples_cnn.txt' 文件中。")



# 训练 HQNN 模型
train_model(model, X_train_tensor, y_train_tensor, epochs=500, batch_size=1083)


############################### 经典SVM部分
svm_clf = svm.SVC(kernel='linear', C=8)
# 训练SVM
svm_clf.fit(X_train_scaled, y_train)

# 计算 SVM 训练与测试准确率
svm_train_accuracy = svm_clf.score(X_train_scaled, y_train)
svm_test_accuracy = svm_clf.score(X_test_scaled, y_test)
svm_test_y_pred = svm_clf.predict(X_test_scaled)


print('--- 经典SVM分类器 ---')
print(f'训练集准确率: {svm_train_accuracy:.3f}')
print(f'测试集准确率: {svm_test_accuracy:.3f}\n')





################################# 绘制损失曲线和准确率对比
fig, ax1 = plt.subplots()

# 画 HQNN 损失曲线
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Loss', color='tab:red')
ax1.plot(range(1, len(losses) + 1), losses, color='tab:red', label="HQNN Loss")
ax1.tick_params(axis='y', labelcolor='tab:red')

# 共享 x 轴，画 HQNN 训练准确率 vs SVM
ax2 = ax1.twinx()
ax2.set_ylabel('Accuracy', color='tab:blue')
ax2.plot(range(1, len(train_accuracies) + 1), train_accuracies, color='tab:blue', linestyle='dashed',
         label=f"HQNN Train Accuracy:{train_accuracies[-1]:.3f}")
ax2.plot(range(1, len(test_accuracies) + 1), test_accuracies, color='tab:orange', linestyle='dashed',
         label=f"HQNN test Accuracy:{test_accuracies[-1]:.3f}")

ax2.axhline(y=svm_train_accuracy, color='green', linestyle='--', label=f"SVM Train Accuracy:{svm_train_accuracy:.3f}")
ax2.axhline(y=svm_test_accuracy, color='purple', linestyle='--', label=f"SVM Test Accuracy:{svm_test_accuracy:.3f}")
ax2.tick_params(axis='y', labelcolor='tab:blue')


# 标题包含特征数目
feature_count = X_train_tensor.shape[1]
plt.title(f"Training Convergence and Accuracy Comparison (Feature Count: {feature_count})")

fig.legend(loc="center right")
plt.savefig("hqnn_svm_comparison4_6q_batch.pdf")
# plt.show()




##################################画图
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D  # 3D 绘图需要

# 读取 CSV 文件
data = pd.read_csv('misclassified_samples_cnn.txt')
# CSV 文件的列分别为 "特征", "真实标签", "预测标签"
# 其中 "特征" 字段中，各个特征以空格分隔

# 解析特征，转换为数值列表
features = data['特征'].apply(lambda s: [float(x) for x in s.split()]).tolist()
features = np.array(features)  # shape: (n_samples, n_features)
true_labels = data['真实标签'].values
predicted_labels = data['预测标签'].values

n_samples, n_features = features.shape

# 构造新的数据矩阵，每一行依次包含：特征1, 特征2, ..., 特征n, 真实标签, 预测标签
data_matrix = np.hstack([features, true_labels.reshape(-1, 1), predicted_labels.reshape(-1, 1)])

# x 轴对应的标签名称：前 n_features 个为特征名称，后面两个分别为真实标签和预测标签
x_labels = [f'F{i+1}' for i in range(n_features)] + ['True', 'Pred']
x = np.arange(len(x_labels))  # 数值化 x 轴

# 创建 3D 图形
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')

# 对于每个样本，在 x 轴上对应每个特征/标签的位置上绘制其数值，y 轴为样本序号
for i in range(n_samples):
    # 使用 plot 连接每个样本在各个位置的数值点，也可以用 scatter 单独绘制每个点
    ax.plot(x, [i] * len(x), data_matrix[i, :], marker='o', label=f"样本 {i}" if i==0 else "")
    # 如果希望样本之间颜色不同，也可以加入颜色控制

# 设置 x 轴为类别标签
ax.set_xticks(x)
ax.set_xticklabels(x_labels)

ax.set_xlabel('特征 / 标签')
ax.set_ylabel('样本序号')
ax.set_zlabel('值')
ax.set_title('预测错误样本的特征及标签可视化')
plt.savefig("features.pdf")
plt.show()


Loading Data...


  return z_observables.float()


In [27]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score
import matplotlib
matplotlib.use('Agg')
from myfun_QNN_fast import r_quantum_layer_fast

# 检测设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# ===========================
# 1. 数据加载 (全程 float64)
# ===========================
print("Loading Data...")
file_path = r'./data/data_koopman_6.txt'
try:
    data = np.loadtxt(file_path, delimiter=',')
except:
    # 兼容测试：如果没有数据文件，生成随机数据
    print("Warning: Data file not found, using random data.")
    data = np.random.rand(100, 7)
    data[:, -1] = np.random.randint(0, 2, 100)

X = data[:, :-1]
y = data[:, -1]

# 标签处理
unique_labels = np.unique(y)
label_map = {unique_labels[0]: 0, unique_labels[1]: 1}
y_mapped = np.vectorize(label_map.get)(y)

X_train, X_test, y_train, y_test = train_test_split(X, y_mapped, test_size=0.3, random_state=42)

scaler = MinMaxScaler(feature_range=(0, np.pi))
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 【关键】全程使用 float64 (double) 以匹配量子精度
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float64).to(device)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float64).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.long).to(device)

# ===========================
# 2. 模型定义
# ===========================

class HQNN(nn.Module):
    def __init__(self, num_features):
        super(HQNN, self).__init__()
        
        # 【新增】1. 批归一化层 (Batch Normalization)
        # 在数据进入第一层矩阵前，先做一次动态归一化
        self.bn_input = nn.BatchNorm1d(num_features).double()

        # 2. 投影层 (初始设为可训练)
        # bias=False 是因为 BN 层已经有了偏置项
        self.fixed_matrix = nn.Linear(num_features, 6, bias=False).double()

        # self.bn_input2 = nn.BatchNorm1d(6).double()

        self.ln_input = nn.LayerNorm(num_features).double()
        
        # 假设量子层输出维度等于量子比特数
        self.ln_output = nn.LayerNorm(6).double()
        

        # 【修改】不要在这里冻结它！我们要让它先练 10 轮。
        # for param in self.fixed_matrix.parameters():
        #     param.requires_grad = False
            
        # 3. 量子参数 (Global Unitary)
        split_re = 2               
        num_sub_qubits = 6 // split_re 
        dim = 2 ** num_sub_qubits      
        
        params_per_circuit = 2 * (dim * dim) 
        total_q_params = params_per_circuit * split_re
        
        print(f"Global Unitary Mode: {split_re} circuits, {dim}x{dim} each.")
        print(f"Total Quantum Params: {total_q_params}")

        # 初始化量子参数
        self.q_params = nn.Parameter(torch.randn(total_q_params, 1, dtype=torch.float64) * 0.1)
        
        # 4. 后处理 (保持冻结)
        self.fc3 = nn.Linear(6, 2)
        self.bn3 = nn.BatchNorm1d(2)
        for param in self.fc3.parameters(): param.requires_grad = False
        for param in self.bn3.parameters(): param.requires_grad = False

    def forward(self, x):
        # 确保输入精度匹配
        x = x.to(dtype=torch.float64)
        
        # 【新增】先通过 BN 层归一化
        x = self.ln_input(x)
        
        # 通过投影层 (前  轮可训练，之后冻结)
        x = self.fixed_matrix(x) 
        
        # 量子层
        q_out = r_quantum_layer_fast(2, self.q_params, x)      
        # q_out = self.bn_input2(q_out)
        q_out = self.ln_output(q_out)
        
        # 测量映射
        out0 = q_out[:, ::2].mean(dim=1)
        out1 = q_out[:, 1::2].mean(dim=1)
        out = torch.stack([out0, out1], dim=1).to(dtype=torch.float32)
        
        return out

# ===========================
# 3. 训练配置
# ===========================
model = HQNN(num_features=6).to(device)

# 【关键修复】学习率从 0.1 降为 0.01，防止震荡
optimizer = optim.AdamW(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# ===========================
# 4. 训练循环
# ===========================
def train_model(model, X_train, y_train, epochs=50, batch_size=32):
    # --- 【新增 1】初始化日志 ---
    # 计算参数量 (注意：这里计算的是总参数量，或者你可以只计算 requires_grad 的)
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    
    log_data = {
        'name': 'PQNN (Koopman 6D)',
        'params': total_params, # 或者 trainable_params，看你想展示哪个
        'loss': [],
        'train_acc': [],
        'test_acc': [],
        'epochs': []
    }
    print(f"PQNN 参数量: {total_params} (Trainable: {trainable_params})")
    # ---------------------------

    model.train()
    num_samples = X_train.shape[0]
    num_batches = num_samples // batch_size

    for epoch in range(epochs):
        
        # 策略：第1轮冻结经典层 (模拟从一开始就只有量子训练)
        if epoch == 0:
            print(f">>> Epoch {epoch+1}: Freezing fixed_matrix for Pure Quantum Training...")
            for param in model.fixed_matrix.parameters():
                param.requires_grad = False

        running_loss = 0.0
        correct = 0
        total = 0

        for batch_idx in range(num_batches):
            start_idx = batch_idx * batch_size
            end_idx = start_idx + batch_size
            X_batch = X_train[start_idx:end_idx]
            y_batch = y_train[start_idx:end_idx]

            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)

            loss.backward()
            optimizer.step()
            
            # 统计 Loss 和 训练精度
            running_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            total += y_batch.size(0)
            correct += (preds == y_batch).sum().item()

        # 计算本轮指标
        avg_loss = running_loss / num_batches
        train_acc = 100 * correct / total
        
        # --- 计算测试集精度 (每轮都算，为了画平滑曲线) ---
        model.eval()
        with torch.no_grad():
            outputs_test = model(X_test_tensor)
            _, preds_test = torch.max(outputs_test, 1)
            test_acc = accuracy_score(y_test_tensor.cpu().numpy(), preds_test.cpu().numpy()) * 100
        model.train()
        
        # --- 【新增 2】记录数据 ---
        log_data['loss'].append(avg_loss)
        log_data['train_acc'].append(train_acc)
        log_data['test_acc'].append(test_acc)
        log_data['epochs'].append(epoch + 1)
        
        # 打印 (减少刷屏，每5轮打印一次，但数据每轮都存)
        if (epoch + 1) % 5 == 0 or epoch == 0:
            print(f"Epoch {epoch + 1}/{epochs} | Loss: {avg_loss:.4f} | Train: {train_acc:.2f}% | Test: {test_acc:.2f}%")

    # --- 【新增 3】保存日志 ---
    np.save('log_pqnn.npy', log_data)
    print(">>> PQNN 训练日志已保存至: log_pqnn.npy")
    
    return preds_test.cpu().numpy()

# 运行训练
final_preds = train_model(model, X_train_tensor, y_train_tensor, epochs=500, batch_size=64)

# 保存误分类样本 (保持原样)
y_test_cpu = y_test_tensor.cpu().numpy()
misclassified_indices = np.where(final_preds != y_test_cpu)[0]
with open('misclassified_samples_cnn.txt', 'w') as f:
    f.write("Feature,True,Pred\n")
    for idx in misclassified_indices:
        feat_str = " ".join(map(str, X_test_tensor[idx].cpu().numpy()))
        f.write(f"{feat_str},{y_test_cpu[idx]},{final_preds[idx]}\n")
print(f"Saved {len(misclassified_indices)} misclassified samples.")



Using device: cpu
Loading Data...
Global Unitary Mode: 2 circuits, 8x8 each.
Total Quantum Params: 256
PQNN 参数量: 346 (Trainable: 328)
>>> Epoch 1: Freezing fixed_matrix for Pure Quantum Training...
Epoch 1/500 | Loss: 0.3426 | Train: 90.35% | Test: 89.78%
Epoch 5/500 | Loss: 0.1981 | Train: 94.59% | Test: 95.59%
Epoch 10/500 | Loss: 0.1397 | Train: 96.91% | Test: 96.85%
Epoch 15/500 | Loss: 0.1291 | Train: 97.09% | Test: 96.71%
Epoch 20/500 | Loss: 0.1238 | Train: 97.18% | Test: 96.85%
Epoch 25/500 | Loss: 0.1186 | Train: 97.21% | Test: 96.71%
Epoch 30/500 | Loss: 0.1161 | Train: 97.21% | Test: 96.64%
Epoch 35/500 | Loss: 0.1147 | Train: 97.21% | Test: 96.71%
Epoch 40/500 | Loss: 0.1155 | Train: 97.12% | Test: 96.78%
Epoch 45/500 | Loss: 0.1215 | Train: 97.06% | Test: 96.71%
Epoch 50/500 | Loss: 0.1139 | Train: 97.18% | Test: 96.78%
Epoch 55/500 | Loss: 0.1136 | Train: 97.21% | Test: 96.64%
Epoch 60/500 | Loss: 0.1127 | Train: 97.21% | Test: 96.64%
Epoch 65/500 | Loss: 0.1124 | Train: 

In [39]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib
matplotlib.use('Agg') # 服务器端绘图，不显示窗口
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, silhouette_score
from sklearn.manifold import TSNE
import seaborn as sns

# 引入您的快速量子层函数
try:
    from myfun_QNN_fast import r_quantum_layer_fast
except ImportError:
    print("【错误】未找到 myfun_QNN_fast.py，请确保该文件在当前目录下。")

# 配置
CONFIG = {
    'device': torch.device("cuda" if torch.cuda.is_available() else "cpu"),
    'raw_data_path': r'./data/raw_dataset_standardized.npy',
    'koopman_data_path': r'./data/data_koopman_6.txt',
    'seed': 42,
    'epochs': 100,          # 总训练轮数
    'freeze_epoch': 10,    # 第几轮冻结第一层
    'batch_size': 64,
    'lr': 0.01
}

# ==============================================================================
# PART 1: 模型定义与训练流程 (Training Pipeline)
# ==============================================================================


class HQNN(nn.Module):
    def __init__(self, num_features):
        super(HQNN, self).__init__()
        
        # 【新增】1. 批归一化层 (Batch Normalization)
        # 在数据进入第一层矩阵前，先做一次动态归一化
        self.bn_input = nn.BatchNorm1d(num_features).double()

        # 2. 投影层 (初始设为可训练)
        # bias=False 是因为 BN 层已经有了偏置项
        self.fixed_matrix = nn.Linear(num_features, 6, bias=False).double()

        # self.bn_input2 = nn.BatchNorm1d(6).double()

        self.ln_input = nn.LayerNorm(num_features).double()
        
        # 假设量子层输出维度等于量子比特数
        self.ln_output = nn.LayerNorm(6).double()
        

        # 【修改】不要在这里冻结它！我们要让它先练 10 轮。
        # for param in self.fixed_matrix.parameters():
        #     param.requires_grad = False
            
        # 3. 量子参数 (Global Unitary)
        split_re = 2               
        num_sub_qubits = 6 // split_re 
        dim = 2 ** num_sub_qubits      
        
        params_per_circuit = 2 * (dim * dim) 
        total_q_params = params_per_circuit * split_re
        
        print(f"Global Unitary Mode: {split_re} circuits, {dim}x{dim} each.")
        print(f"Total Quantum Params: {total_q_params}")

        # 初始化量子参数
        self.q_params = nn.Parameter(torch.randn(total_q_params, 1, dtype=torch.float64) * 0.1)
        
        # 4. 后处理 (保持冻结)
        self.fc3 = nn.Linear(6, 2)
        self.bn3 = nn.BatchNorm1d(2)
        for param in self.fc3.parameters(): param.requires_grad = False
        for param in self.bn3.parameters(): param.requires_grad = False

    def forward(self, x):
        # 确保输入精度匹配
        x = x.to(dtype=torch.float64)
        
        # 【新增】先通过 BN 层归一化
        x = self.ln_input(x)
        
        # 通过投影层 (前  轮可训练，之后冻结)
        x = self.fixed_matrix(x) 
        
        # 量子层
        q_out = r_quantum_layer_fast(2, self.q_params, x)      
        q_out = self.ln_output(q_out)
        
        # 测量映射
        out0 = q_out[:, ::2].mean(dim=1)
        out1 = q_out[:, 1::2].mean(dim=1)
        out = torch.stack([out0, out1], dim=1).to(dtype=torch.float32)
        
        return out
        

def load_and_process_data():
    print(f"Loading Koopman Data from {CONFIG['koopman_data_path']}...")
    try:
        data = np.loadtxt(CONFIG['koopman_data_path'], delimiter=',')
    except:
        print("Warning: Data file not found, creating random mock data.")
        data = np.random.rand(100, 7)
        data[:, -1] = np.random.randint(0, 2, 100)

    X = data[:, :-1]
    y = data[:, -1]

    # 标签映射
    unique_labels = np.unique(y)
    label_map = {unique_labels[0]: 0, unique_labels[1]: 1}
    y_mapped = np.vectorize(label_map.get)(y)

    # 划分数据集 (Random State 必须固定为 42，以便后续与 Raw Data 对齐)
    X_train, X_test, y_train, y_test = train_test_split(X, y_mapped, test_size=0.3, random_state=CONFIG['seed'])

    # 归一化
    scaler = MinMaxScaler(feature_range=(0, np.pi))
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # 转 Tensor (Double Precision)
    device = CONFIG['device']
    data_dict = {
        'X_train': torch.tensor(X_train_scaled, dtype=torch.float64).to(device),
        'X_test': torch.tensor(X_test_scaled, dtype=torch.float64).to(device),
        'y_train': torch.tensor(y_train, dtype=torch.long).to(device),
        'y_test_cpu': y_test, # 用于后续绘图
        'y_test_tensor': torch.tensor(y_test, dtype=torch.long).to(device)
    }
    return data_dict

def train_pipeline(model, data_dict):
    X_train = data_dict['X_train']
    y_train = data_dict['y_train']
    X_test = data_dict['X_test']
    y_test = data_dict['y_test_tensor']

    optimizer = optim.AdamW(model.parameters(), lr=CONFIG['lr'])
    criterion = nn.CrossEntropyLoss()
    
    num_samples = X_train.shape[0]
    num_batches = num_samples // CONFIG['batch_size']
    
    print("\n>>> Starting Training...")
    
    for epoch in range(CONFIG['epochs']):
        # --- 策略：第 10 轮冻结第一层 ---
        if epoch == CONFIG['freeze_epoch']:
            print(f">>> [Strategy] Epoch {epoch}: Freezing Projection Layer (fixed_matrix) now!")
            for param in model.fixed_matrix.parameters():
                param.requires_grad = False
            # 可选：微调阶段降低学习率
            # for g in optimizer.param_groups: g['lr'] *= 0.1 

        model.train()
        for batch_idx in range(num_batches):
            start = batch_idx * CONFIG['batch_size']
            end = start + CONFIG['batch_size']
            
            optimizer.zero_grad()
            outputs = model(X_train[start:end])
            loss = criterion(outputs, y_train[start:end])
            loss.backward()
            optimizer.step()

        # 评估
        if (epoch + 1) % 10 == 0 or epoch == 0:
            model.eval()
            with torch.no_grad():
                # Train Acc
                _, preds_tr = torch.max(outputs, 1)
                train_acc = accuracy_score(y_train[start:end].cpu().numpy(), preds_tr.cpu().numpy())
                # Test Acc
                out_test = model(X_test)
                _, preds_te = torch.max(out_test, 1)
                test_acc = accuracy_score(y_test.cpu().numpy(), preds_te.cpu().numpy())
            
            status = "Frozen" if not model.fixed_matrix.weight.requires_grad else "Trainable"
            print(f"Epoch {epoch+1:03d} | L1: {status:<9} | Loss: {loss.item():.4f} | Train Acc: {train_acc:.3f} | Test Acc: {test_acc:.3f}")

    print(">>> Training Finished.")
    return model

# ==============================================================================
# PART 2: 可视化流程 (Visualization Pipeline)
# ==============================================================================

def pad_sequences(sequences, downsample=10):
    """辅助函数：将变长序列填充为矩阵，并降采样加速 t-SNE"""
    # 简单降采样
    seqs_ds = [s[::downsample] for s in sequences]
    max_len = max(len(s) for s in seqs_ds)
    # 填充
    matrix = np.zeros((len(seqs_ds), max_len))
    for i, s in enumerate(seqs_ds):
        matrix[i, :len(s)] = s
    return matrix

def visualize_feature_evolution(model, data_dict):
    """
    绘制图 5：Raw Data -> Koopman -> Quantum Latent 的演化
    """
    print("\n>>> Starting Feature Visualization Pipeline...")
    
    # 1. 准备标签和颜色
    y_test = data_dict['y_test_cpu']
    
    # -------------------------------------------------
    # 阶段 A: 获取原始数据 (Raw Data)
    # -------------------------------------------------
    print(f"(1/3) Loading Raw Data from {CONFIG['raw_data_path']}...")
    try:
        raw_dict = np.load(CONFIG['raw_data_path'], allow_pickle=True).item()
        raw_seqs = raw_dict['sequences']
        raw_labels = raw_dict['labels']
        
        # 关键：必须执行完全相同的 split 以对齐数据
        _, X_test_raw_seq, _, _ = train_test_split(raw_seqs, raw_labels, test_size=0.3, random_state=CONFIG['seed'])
        
        # 处理变长序列 -> 矩阵
        print("      Processing sequences (padding & downsampling)...")
        X_raw_vis = pad_sequences(X_test_raw_seq, downsample=20) # 20倍降采样
        
    except Exception as e:
        print(f"      [Error] Could not load raw data: {e}")
        print("      Skipping Raw Data visualization (creating dummy noise for layout).")
        X_raw_vis = np.random.randn(len(y_test), 50) # Dummy

    # -------------------------------------------------
    # 阶段 B: 获取 Koopman 特征 (QNN 输入)
    # -------------------------------------------------
    print("(2/3) Extracting Koopman Features...")
    X_test_tensor = data_dict['X_test']
    X_koopman_vis = X_test_tensor.cpu().numpy()

    # -------------------------------------------------
    # 阶段 C: 获取量子潜层特征 (Quantum Latent)
    # -------------------------------------------------
    print("(3/3) Extracting Quantum Latent Features...")
    model.eval()
    with torch.no_grad():
        x = X_test_tensor.to(dtype=torch.float64)
        # x = model.bn_input(x)
        x = model.ln_input(x)
        x = model.fixed_matrix(x)
        # 获取 Quantum Layer 的输出 (在 Mean 之前)
        q_out = r_quantum_layer_fast(2, model.q_params, x)
        q_out = model.ln_output(q_out)
        
        out0 = q_out[:, ::2].mean(dim=1)
        out1 = q_out[:, 1::2].mean(dim=1)
        out = torch.stack([out0, out1], dim=1).to(dtype=torch.float32)

        X_quantum_vis = out.cpu().numpy()

    # -------------------------------------------------
    # 绘图逻辑 (Three Panels)
    # -------------------------------------------------
    data_map = [
        ('Raw Data Space', X_raw_vis),
        ('Koopman Features', X_koopman_vis),
        ('Quantum Latent Space', X_quantum_vis)
    ]
    
    plt.style.use('seaborn-v0_8-paper')
    fig, axes = plt.subplots(1, 3, figsize=(18, 5.5))
    colors = ['#1f77b4', '#ff7f0e'] # Blue, Orange
    class_names = ['Normal', 'Disruption']
    
    print("\n>>> Running t-SNE and plotting...")
    for i, (title, data) in enumerate(data_map):
        ax = axes[i]
        
        # t-SNE 降维
        # tsne = TSNE(n_components=2, perplexity=50, n_iter=2000, random_state=42)
        tsne = TSNE(
            n_components=2, 
            perplexity=50,          # 建议尝试 50 或 80，消除长条纹，让簇更圆润
            early_exaggeration=20,  # 增大此值，强行拉大类间距离，视觉更震撼
            learning_rate='auto',   # 自动学习率
            init='pca',             # 使用 PCA 初始化，保留全局结构，图更整齐
            max_iter=1000,            # 增加迭代次数，确保收敛
            random_state=42
        )
        emb = tsne.fit_transform(data)
        
        # 计算可分性指标 (Silhouette Score)
        try:
            score = silhouette_score(data, y_test)
        except: score = 0
        
        # 散点图
        for lbl_idx, color in enumerate(colors):
            mask = (y_test == lbl_idx)
            # ax.scatter(emb[mask, 0], emb[mask, 1], c=color, label=class_names[lbl_idx],
            #            alpha=0.6, s=20, edgecolors='none')
            ax.scatter(
                emb[mask, 0], emb[mask, 1], 
                c=color, 
                label=class_names[lbl_idx],
                alpha=0.75,   # 透明度从 0.6 提高到 0.75，让颜色更实，对比度更高
                s=30,         # 点的大小从 20 提高到 30，让点更清晰
                edgecolors='w', # 加白色描边
                linewidth=0.3   # 描边细一点
            )            
            
        ax.set_title(f"({chr(97+i)}) {title}", fontsize=12, fontweight='bold')
        ax.set_xticks([])
        ax.set_yticks([])
        
        # 添加指标框
        ax.text(0.05, 0.92, f'S-Score: {score:.3f}', transform=ax.transAxes,
                bbox=dict(facecolor='white', alpha=0.9, edgecolor='gray', boxstyle='round'))

    # 全局图例
    handles, _ = axes[0].get_legend_handles_labels()
    fig.legend(handles, class_names, loc='lower center', ncol=2, bbox_to_anchor=(0.5, -0.05), fontsize=12)
    
    plt.tight_layout()
    plt.savefig('Figure5_Evolution.pdf', dpi=300, bbox_inches='tight')
    print(f">>> Figure saved successfully to 'Figure5_Evolution.pdf'")

# ==============================================================================
# Main Execution
# ==============================================================================
if __name__ == "__main__":
    print(f"Using device: {CONFIG['device']}")
    
    # 1. 准备数据
    data = load_and_process_data()
    
    # 2. 初始化模型
    model = HQNN(num_features=6).to(CONFIG['device'])
    
    # 3. 运行训练 (Part 1)
    trained_model = train_pipeline(model, data)

    # 4. 运行可视化 (Part 2)
    visualize_feature_evolution(trained_model, data)

Using device: cpu
Loading Koopman Data from ./data/data_koopman_6.txt...
Global Unitary Mode: 2 circuits, 8x8 each.
Total Quantum Params: 256

>>> Starting Training...
Epoch 001 | L1: Trainable | Loss: 0.1529 | Train Acc: 0.969 | Test Acc: 0.964
Epoch 010 | L1: Trainable | Loss: 0.0790 | Train Acc: 0.984 | Test Acc: 0.964
>>> [Strategy] Epoch 10: Freezing Projection Layer (fixed_matrix) now!
Epoch 020 | L1: Frozen    | Loss: 0.0833 | Train Acc: 0.969 | Test Acc: 0.967
Epoch 030 | L1: Frozen    | Loss: 0.0859 | Train Acc: 0.969 | Test Acc: 0.968
Epoch 040 | L1: Frozen    | Loss: 0.0876 | Train Acc: 0.969 | Test Acc: 0.967
Epoch 050 | L1: Frozen    | Loss: 0.0937 | Train Acc: 0.969 | Test Acc: 0.967
Epoch 060 | L1: Frozen    | Loss: 0.0954 | Train Acc: 0.969 | Test Acc: 0.969
Epoch 070 | L1: Frozen    | Loss: 0.0804 | Train Acc: 0.984 | Test Acc: 0.966
Epoch 080 | L1: Frozen    | Loss: 0.0867 | Train Acc: 0.984 | Test Acc: 0.967
Epoch 090 | L1: Frozen    | Loss: 0.0903 | Train Acc: 0.969