In [1]:
import pennylane as qml
import torch
import torch.nn as nn
import numpy as np
import time
import os
import json

In [2]:
# 设置路径
PATH = "../DataSpace/csi_cmri/"
LOCAL = "./"
FOLDER = "model_30km_9_5/"

# 加载数据
data = np.load(PATH + "CSI_channel_30km.npy")  # shape=(80000, 2560)

# 数据划分参数
TOTAL_SAMPLES = 80000
TRAIN_SAMPLES = 56000  # 56000个训练样本
VAL_SAMPLES = 12000    # 12000个验证样本
TEST_SAMPLES = 12000   # 12000个测试样本

# 设置量子线路参数
INPUT_DIM = data.shape[1]
OUTPUT_DIM = 128
ORG_QUBITS = int(np.ceil(np.log2(INPUT_DIM)))
TAR_QUBITS = int(np.ceil(np.log2(OUTPUT_DIM)))
ALL_QUBITS = ORG_QUBITS + 1
ANS_QUBITS = ORG_QUBITS - TAR_QUBITS

# 验证和测试数据（固定）
VAL_DATA = data[56000:68000]  # 12000个验证样本
TEST_DATA = data[68000:80000] # 12000个测试样本

# 数据划分的起始索引
VAL_START_IDX = 56000
TEST_START_IDX = 68000

In [3]:
def sigmoid(x):
    return 1 / (1 + torch.exp(-x))

def normlize(x):
    norm = torch.norm(x)
    if norm == 0:
        return x
    return x / norm

def dense_layer(x, c_weight, c_bias):
    if isinstance(x, np.ndarray):
        x = torch.from_numpy(x).float()
    output = torch.matmul(x, c_weight) + c_bias
    output = sigmoid(output)
    output = normlize(output[0])  # 确保输出是一维的
    return output

# FRQI编码量子线路
def frqi_encoder(qubits, params, target_wire=0):
    # 对数据量子比特应用Hadamard门创建叠加态
    for i in range(1, qubits+1):
        qml.Hadamard(wires=i)
    # 使用受控PauliY旋转进行编码
    for index in range(min(2**qubits, len(params))):
        binary_str = bin(index)[2:].zfill(qubits)
        bits = [int(bit) for bit in binary_str]
        bits.reverse()
        qml.ctrl(qml.RY, control=range(1, qubits + 1), control_values=bits)(2*params[index], wires=target_wire)

# 构造强纠缠层ansatz线路
def strong_entangling_ansatz(q_weight, dist=[1, 2, 3, 4]):
    LAYERS = 4
    if q_weight.shape != (LAYERS, ALL_QUBITS, 3):
        print("Wrong q_weight!")
        return
    if len(dist) != LAYERS:
        print("Wrong distance!")
        return
    for i_layer in range(LAYERS):
        for i_qubit in range(ALL_QUBITS):
            qml.Rot(q_weight[i_layer, i_qubit, 0], q_weight[i_layer, i_qubit, 1], q_weight[i_layer, i_qubit, 2], wires=i_qubit)
        for i_qubit in range(ALL_QUBITS):
            target_qubit = (i_qubit + dist[i_layer]) % ALL_QUBITS
            qml.CNOT(wires=[i_qubit, target_qubit])

In [38]:
# 完整的经典-量子混合神经网络
Q_DEV = qml.device("lightning.gpu", wires=ALL_QUBITS)
@qml.qnode(Q_DEV, interface = "torch")
def cir(sample, c_weight, c_bias, q_weight):
    y = dense_layer(sample, c_weight, c_bias)
    frqi_encoder(qubits=TAR_QUBITS, params=y)
    strong_entangling_ansatz(q_weight)
    x = -sample
    frqi_encoder(qubits=ORG_QUBITS, params=x)
    Ham = qml.Hamiltonian([-1], [qml.PauliZ(0)])
    return qml.state()

# 定义损失函数
def loss_fn(c_weight, c_bias, q_weight, batch_data):
    total_loss = 0
    for sample in batch_data:
        loss = cir(sample, c_weight, c_bias, q_weight)
        total_loss += loss
    return total_loss / len(batch_data)

In [5]:
def load_final_model():
    """加载最终训练好的模型参数"""
    model_file = f"{LOCAL}{FOLDER}hybrid_qnn_model_final.pth"
    
    if not os.path.exists(model_file):
        print(f"错误: 模型文件 {model_file} 不存在!")
        return None, None, None, None
    
    try:
        print(f"正在加载模型: {model_file}")
        checkpoint = torch.load(model_file, map_location='cpu')
        
        # 初始化参数结构
        C_WEIGHT = torch.randn(INPUT_DIM, OUTPUT_DIM, requires_grad=False)
        C_BIAS = torch.randn(1, OUTPUT_DIM, requires_grad=False)
        LAYERS = 4
        Q_WEIGHT = torch.randn(LAYERS, ALL_QUBITS, 3, requires_grad=False)
        
        # 加载保存的参数
        if 'C_WEIGHT' in checkpoint:
            C_WEIGHT.data = checkpoint['C_WEIGHT'].data.clone()
            print(f"  已加载 C_WEIGHT, 形状: {C_WEIGHT.shape}")
        
        if 'C_BIAS' in checkpoint:
            C_BIAS.data = checkpoint['C_BIAS'].data.clone()
            print(f"  已加载 C_BIAS, 形状: {C_BIAS.shape}")
        
        if 'Q_WEIGHT' in checkpoint:
            Q_WEIGHT.data = checkpoint['Q_WEIGHT'].data.clone()
            print(f"  已加载 Q_WEIGHT, 形状: {Q_WEIGHT.shape}")
        
        history = checkpoint.get('training_history', {})
        print(f"  已加载训练历史，包含 {len(history.get('phase_history', []))} 个阶段")
        
        # 显示模型信息
        if 'all_phases' in checkpoint:
            print(f"  训练阶段: {checkpoint['all_phases']}")
        if 'completion_time' in checkpoint:
            print(f"  完成时间: {checkpoint['completion_time']}")
        
        return C_WEIGHT, C_BIAS, Q_WEIGHT, history
        
    except Exception as e:
        print(f"加载模型时出错: {e}")
        return None, None, None, None

In [18]:
# 加载模型
C_WEIGHT, C_BIAS, Q_WEIGHT, history = load_final_model()

正在加载模型: ./model_30km_9_5/hybrid_qnn_model_final.pth
  已加载 C_WEIGHT, 形状: torch.Size([2560, 128])
  已加载 C_BIAS, 形状: torch.Size([1, 128])
  已加载 Q_WEIGHT, 形状: torch.Size([4, 13, 3])
  已加载训练历史，包含 3 个阶段
  训练阶段: ['phase_1', 'phase_2', 'phase_3', 'phase_4', 'phase_5', 'phase_6', 'phase_7']
  完成时间: 2025-10-20 12:02:04


In [39]:
out1 = cir(data[11], C_WEIGHT, C_BIAS, Q_WEIGHT)

In [28]:
data[11]

array([-5.52105e-06,  4.61857e-05,  1.13332e-05, ...,  2.61349e-05,
       -3.25311e-05,  3.61780e-05], shape=(2560,))

In [26]:
dev = qml.device("default.qubit", wires=2)
@qml.qnode(dev)
def cir_st():
    qml.Hadamard(wires=0)
    return qml.state()

In [27]:
cir_st()

array([0.70710678+0.j, 0.        +0.j, 0.70710678+0.j, 0.        +0.j])

In [36]:
len(out1)

8192

In [40]:
out1

tensor([-1.4922e-02+2.0432e-03j, -2.6717e-03-1.5758e-02j,
        -1.2305e-02+4.6108e-03j,  ...,
        -3.0652e-05+9.8120e-06j, -9.5097e-07+3.5577e-06j,
         8.1824e-06+3.6866e-05j], dtype=torch.complex128)