In [44]:
import cudaq
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Function
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import shap
from lime import lime_tabular
import os


In [45]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
cudaq.set_target("qpp-cpu")

In [46]:
def ry(theta, qubit):
    cudaq.ry(theta, qubit)

def rx(theta, qubit):
    cudaq.rx(theta, qubit)

In [47]:
class QuantumFunction(Function):
    def __init__(self, qubit_count: int, hamiltonian: cudaq.SpinOperator):
        # Define a parameterized quantum kernel
        @cudaq.kernel
        def kernel(qubit_count: int, thetas: np.ndarray):
            qubits = cudaq.qvector(qubit_count)
            ry(thetas[0], qubits[0])
            rx(thetas[1], qubits[0])

        self.kernel = kernel
        self.qubit_count = qubit_count
        self.hamiltonian = hamiltonian

    def run(self, theta_vals: torch.Tensor) -> torch.Tensor:
        theta_vals_np = theta_vals.cpu().numpy()
        q_counts = [self.qubit_count] * theta_vals_np.shape[0]
        results = cudaq.observe(self.kernel, self.hamiltonian, q_counts, theta_vals_np)
        exp_vals = [res.expectation() for res in results]

        return torch.tensor(exp_vals, dtype=torch.float32, device=device)

    @staticmethod
    def forward(ctx, thetas: torch.Tensor, quantum_circuit, shift) -> torch.Tensor:
        ctx.shift = shift
        ctx.quantum_circuit = quantum_circuit

        exp_vals = ctx.quantum_circuit.run(thetas).reshape(-1, 1)

        # Save values for backward
        ctx.save_for_backward(thetas, exp_vals)
        return exp_vals

    @staticmethod
    def backward(ctx, grad_output):
        thetas, _ = ctx.saved_tensors
        gradients = torch.zeros_like(thetas, device=device)

        # Parameter-shift rule
        for i in range(thetas.shape[1]):
            thetas_plus = thetas.clone()
            thetas_minus = thetas.clone()

            thetas_plus[:, i] += ctx.shift
            thetas_minus[:, i] -= ctx.shift

            exp_vals_plus = ctx.quantum_circuit.run(thetas_plus)
            exp_vals_minus = ctx.quantum_circuit.run(thetas_minus)

            gradients[:, i] = (exp_vals_plus - exp_vals_minus) / (2.0 * ctx.shift)

        # Multiply by incoming gradient from chain rule
        return gradients * grad_output, None, None

class QuantumLayer(nn.Module):
    def __init__(self, qubit_count: int, hamiltonian, shift: torch.Tensor):
        super().__init__()
        self.quantum_circuit = QuantumFunction(qubit_count, hamiltonian)
        self.shift = shift

    def forward(self, input: torch.Tensor):
        return QuantumFunction.apply(input, self.quantum_circuit, self.shift)

In [56]:
class Hybrid_QNN(nn.Module):
    def __init__(self, num_features):
        super(Hybrid_QNN, self).__init__()

        self.lstm_hidden_dim = 64
        self.gru_hidden_dim = 32

        # LSTM
        self.lstm = nn.LSTM(input_size=num_features,
                            hidden_size=self.lstm_hidden_dim,
                            num_layers=1, batch_first=True)

        # GRU
        self.gru = nn.GRU(input_size=self.lstm_hidden_dim,
                          hidden_size=self.gru_hidden_dim,
                          num_layers=1, batch_first=True)

        # Fully connected
        self.fc1 = nn.Linear(self.gru_hidden_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.dropout = nn.Dropout(0.25)
        self.fc3 = nn.Linear(64, 32)
        self.fc4 = nn.Linear(32, 2)  # intermediate 2D before quantum

        # Two separate quantum layers for two outputs
        self.quantum1 = QuantumLayer(
            qubit_count=2,
            hamiltonian=cudaq.spin.z(0),
            shift=torch.tensor(np.pi / 2)
        )
        self.quantum2 = QuantumLayer(
            qubit_count=2,
            hamiltonian=cudaq.spin.z(1),
            shift=torch.tensor(np.pi / 2)
        )

    def forward(self, x):
        x = x.unsqueeze(1)  # [batch, 1, num_features]

        lstm_out, _ = self.lstm(x)
        gru_out, _ = self.gru(lstm_out)
        x = gru_out[:, -1, :]

        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.dropout(x)
        x = torch.relu(self.fc3(x))
        x = self.fc4(x)  # [batch, 2]

        # Split into two parts for two quantum circuits
        x1_in = x[:, 0].unsqueeze(1)  # [batch, 1]
        x2_in = x[:, 1].unsqueeze(1)  # [batch, 1]

        x1_out = self.quantum1(x1_in)  # [batch, 1]
        x2_out = self.quantum2(x2_in)  # [batch, 1]

        # Concatenate outputs back
        x_out = torch.cat([x1_out, x2_out], dim=1)  # [batch, 2]
        return x_out

In [57]:

import pandas as pd
import numpy as np

def load_data(path):
    data = pd.read_csv(path)
    return data

dataset_path = "../data/noalpha.csv"  # Adjust to your CSV path
data = load_data(dataset_path)

# Identify only the numeric columns
numeric_cols = data.select_dtypes(include=[np.number]).columns

# Fill missing values in numeric columns with their column-wise mean
data[numeric_cols] = data[numeric_cols].fillna(data[numeric_cols].mean())

features = ['RAdeg', 'DEdeg', 'e_RAdeg', 'e_DEdeg', 'RApeak',
            'DEpeak', 'Sint', 'e_Sint', 'Speak', 'e_Speak', 'rmspeak', 'e_rmspeak', 'PA']

target = ['thetamaj', 'thetamin']
X = data[features].values
y = data[target].values

# Normalize
scaler_X = StandardScaler()
scaler_y = StandardScaler()

X = scaler_X.fit_transform(X)
y = scaler_y.fit_transform(y)

# Convert to torch tensors
X = torch.tensor(X, dtype=torch.float32).to(device)
y = torch.tensor(y, dtype=torch.float32).to(device)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.1,
    random_state=42
)
print("Train shape:", X_train.shape, y_train.shape)
print("Test shape: ", X_test.shape, y_test.shape)

Train shape: torch.Size([5185, 13]) torch.Size([5185, 2])
Test shape:  torch.Size([577, 13]) torch.Size([577, 2])


In [58]:
model = Hybrid_QNN(num_features=len(features)).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_function = nn.MSELoss()

epochs = 50
losses = []

for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()

    y_hat_train = model(X_train)
    loss = loss_function(y_hat_train, y_train)
    loss.backward()
    optimizer.step()

    losses.append(loss.item())
    std_loss = np.std(losses)

    print(f"Epoch {epoch + 1}/{epochs}, Loss: {loss.item():.6f}, Std Dev: {std_loss:.6f}")
     

Epoch 1/50, Loss: 1.993080, Std Dev: 0.000000
Epoch 2/50, Loss: 1.992085, Std Dev: 0.000498
Epoch 3/50, Loss: 1.990792, Std Dev: 0.000937
Epoch 4/50, Loss: 1.989176, Std Dev: 0.001462
Epoch 5/50, Loss: 1.987215, Std Dev: 0.002088
Epoch 6/50, Loss: 1.984910, Std Dev: 0.002815
Epoch 7/50, Loss: 1.982338, Std Dev: 0.003626
Epoch 8/50, Loss: 1.979312, Std Dev: 0.004557
Epoch 9/50, Loss: 1.975771, Std Dev: 0.005633
Epoch 10/50, Loss: 1.971398, Std Dev: 0.006924
Epoch 11/50, Loss: 1.966471, Std Dev: 0.008413
Epoch 12/50, Loss: 1.960780, Std Dev: 0.010122
Epoch 13/50, Loss: 1.954242, Std Dev: 0.012076
Epoch 14/50, Loss: 1.946420, Std Dev: 0.014353
Epoch 15/50, Loss: 1.937231, Std Dev: 0.017008
Epoch 16/50, Loss: 1.926677, Std Dev: 0.020072
Epoch 17/50, Loss: 1.913996, Std Dev: 0.023660
Epoch 18/50, Loss: 1.899322, Std Dev: 0.027829
Epoch 19/50, Loss: 1.882379, Std Dev: 0.032642
Epoch 20/50, Loss: 1.861427, Std Dev: 0.038349
Epoch 21/50, Loss: 1.836769, Std Dev: 0.045076
Epoch 22/50, Loss: 1.8

In [59]:
import torch
import numpy as np
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error

# ===== EVALUATION =====
model.eval()
with torch.no_grad():
    y_hat_test = model(X_test)

# ---- Shape checks ----
if y_hat_test is None:
    raise ValueError("Model returned None. Check QuantumLayer.forward().")

if y_hat_test.shape != y_test.shape:
    raise ValueError(
        f"Shape mismatch: y_hat_test {y_hat_test.shape} vs y_test {y_test.shape}"
    )

# ---- Loss ----
test_loss = loss_function(y_hat_test, y_test).item()

# Convert to numpy for sklearn metrics
y_true_np = y_test.cpu().numpy()
y_pred_np = y_hat_test.cpu().numpy()

# Metrics
r2_score_value = r2_score(y_true_np, y_pred_np, multioutput='uniform_average')
mae_score_value = mean_absolute_error(y_true_np, y_pred_np)
rmse_score_value = np.sqrt(mean_squared_error(y_true_np, y_pred_np))
mse_score_value = mean_squared_error(y_true_np, y_pred_np)

# Accuracy-like metrics
y_max = np.max(y_true_np, axis=0)
y_min = np.min(y_true_np, axis=0)
alpha = y_max - y_min

mae_accuracy = (1 - mae_score_value / alpha.mean()) * 100
rmse_accuracy = (1 - rmse_score_value / alpha.mean()) * 100
mse_accuracy  = (1 - mse_score_value / alpha.mean()) * 100

# Model size in MB
model_size_mb = sum(p.numel() * p.element_size() for p in model.parameters()) / (1024**2)

# ---- Print results ----
print("\n===== MODEL EVALUATION =====")
print(f"Test Loss (MSELoss): {test_loss:.6f}")
print(f"Test MSE:  {mse_score_value:.4f}, Accuracy by error: {mse_accuracy:.2f}%")
print(f"Test RMSE: {rmse_score_value:.4f}, Accuracy by error: {rmse_accuracy:.2f}%")
print(f"Test MAE:  {mae_score_value:.4f}, Accuracy by error: {mae_accuracy:.2f}%")
print(f"Test R^2:  {r2_score_value:.4f}")
print(f"Model Size: {model_size_mb:.2f} MB")
print("=============================")



===== MODEL EVALUATION =====
Test Loss (MSELoss): 1.307831
Test MSE:  1.3078, Accuracy by error: 88.79%
Test RMSE: 1.1436, Accuracy by error: 90.20%
Test MAE:  0.7646, Accuracy by error: 93.45%
Test R^2:  -0.2323
Model Size: 0.17 MB
