### IMPORTANT: this file contains generated content by Gemini 2.5

# QNN with Torch Connector

## Regression: EstimatorQNN

### Imports

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split, RepeatedKFold
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, r2_score

from qiskit import QuantumCircuit
from qiskit.circuit import ParameterVector
from qiskit.circuit.library import PauliFeatureMap, RealAmplitudes
from qiskit.primitives import Estimator, Sampler
from qiskit_machine_learning.neural_networks import EstimatorQNN, SamplerQNN
from qiskit_machine_learning.connectors import TorchConnector
from qiskit_machine_learning.utils.loss_functions import L2Loss # Qiskit ML loss, or use PyTorch's
from qiskit_machine_learning.circuit.library import QNNCircuit

### Globals

In [2]:
# For reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Fixed feature sizes
NUM_FEATURES = 3
NUM_QUBITS = NUM_FEATURES 
NUM_TARGETS = 1 

# Quantum circuit parameters
FEATURE_MAP_REPS = 1 # ADJUST AS NEEDED
ANSATZ_REPS = 3 # ADJUST AS NEEDED

### Feature Map, Ansatz, then QNN Constructor

In [3]:
# a. Feature Map: Encodes NUM_FEATURES into NUM_QUBITS
# ParameterVector for input features
input_params = ParameterVector("x", NUM_FEATURES)

feature_map_template = PauliFeatureMap(
    feature_dimension=NUM_FEATURES, # This tells the template how many input parameters it structurally needs
    reps=FEATURE_MAP_REPS,
    entanglement='linear'
)

# Assign the *specific* input parameters from the vector to the template's parameter slots
# This creates a new circuit instance containing parameters ONLY from input_params (size NUM_FEATURES)
feature_map = feature_map_template.assign_parameters(input_params)
print(f"Assigned feature map parameters: {feature_map.num_parameters}")

# Create a template to find out how many parameters it needs structurally
ansatz_template = RealAmplitudes(NUM_QUBITS, reps=ANSATZ_REPS, entanglement="linear")
# ParameterVector for trainable weights - sized based on the template's structural parameters
num_ansatz_params = ansatz_template.num_parameters # This was correctly calculated as 12
weight_params = ParameterVector("θ", num_ansatz_params)

# Create the ansatz circuit instance by assigning the weight parameters to the template
ansatz = ansatz_template.assign_parameters(weight_params)
print(f"Assigned ansatz parameters: {ansatz.num_parameters}") 

# c. Combine into a full quantum circuit
qc = QuantumCircuit(NUM_QUBITS)
qc.compose(feature_map, inplace=True)
qc.compose(ansatz, inplace=True)

print(f"Total circuit parameters in qc: {qc.num_parameters}")


# d. Define Observable(s)
# For a single output, measure the expectation value of Pauli Z on the first qubit
# The output of EstimatorQNN will be in the range [-1, 1] for Pauli observables
from qiskit.quantum_info import SparsePauliOp
observable = SparsePauliOp.from_list([("Z" + "I" * (NUM_QUBITS - 1), 1.0)])
# If you have multiple qubits and want to combine their measurements, you can define multiple observables
# or a more complex one. For instance, if NUM_TARGETS > 1 or you want a richer output from QNN:
# observables = [SparsePauliOp(f"{'I'*i}Z{'I'*(NUM_QUBITS-1-i)}") for i in range(NUM_QUBITS)]
# This would give NUM_QUBITS outputs from the QNN.

# --- 3. EstimatorQNN ---
# Uses Qiskit's Estimator primitive for expectation value computations
# By default, Estimator uses a local statevector simulator.
# For real hardware or more advanced simulation, configure the Estimator.
estimator = Estimator()

qnn = EstimatorQNN(
    circuit=qc,
    estimator=estimator,
    input_params=input_params,
    weight_params=weight_params, # Parameters for trainable weights
    observables=observable,      # Observable to measure
    input_gradients=False       # Set to True if you need gradients w.r.t. inputs
)

# --- 4. TorchConnector ---
# Wrap the QNN into a PyTorch module
initial_weights = 0.01 * (2 * np.random.rand(qnn.num_weights) - 1)


Assigned feature map parameters: 3
Assigned ansatz parameters: 12
Total circuit parameters in qc: 15


  estimator = Estimator()


### HybridModel Class (add classical PyTorch pre-processing or customizations, right now too big/small SFE need more penalty)

In [4]:
class HybridModel(nn.Module):
    def __init__(self, qnn_model):
        super().__init__()
        # Example: Add classical layers if needed
        # self.classical_pre = nn.Linear(NUM_FEATURES, NUM_FEATURES) # If you want to pre-process features
        self.qnn = qnn_model
        # Example: Add classical layers after the QNN
        # self.classical_post = nn.Linear(qnn_model.output_shape[0], NUM_TARGETS) # output_shape[0] is num_observables
                                                                                # If qnn_model.output_shape is (1,), then it's 1.

    def forward(self, x):
        # x = self.classical_pre(x) # If using classical_pre
        x = self.qnn(x)
        # x = self.classical_post(x) # If using classical_post
        return x

### Prepare Dataset

In [5]:
def prepare_dataset_k_fold(X, y, train_indices, test_indices):
    # Separate train/test split
    X_train_raw, X_test_raw = X[train_indices], X[test_indices]
    y_train, y_test = y[train_indices], y[test_indices]
    
    # Separate element column from the actual features
    element_test = X_test_raw[:, 0]
    
    # Drop the element column (first column)
    X_train = X_train_raw[:, 1:]
    X_test = X_test_raw[:, 1:]

    full_X = np.vstack([X_train, X_test])

    scaler = MinMaxScaler(feature_range=(-1, 1))
    scaler.fit(full_X)

    X_train_scaled = scaler.transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    return X_train_scaled, y_train, X_test_scaled, y_test, element_test

### Custom Weighted Loss

In [6]:
class WeightedLoss(nn.Module):
    def __init__(self, small_threshold=0.1, large_threshold=10.0, weight_small=2.0, weight_large=2.0):
        super(WeightedLoss, self).__init__()
        
        # Thresholds to define small and large values
        self.small_threshold = small_threshold
        self.large_threshold = large_threshold
        
        # Weights for small and large targets
        self.weight_small = weight_small
        self.weight_large = weight_large

    def forward(self, y_pred, y_true):
        # Calculate absolute error between predicted and true values
        error = torch.abs(y_pred - y_true)
        
        # Define weights based on the thresholds
        weights = torch.ones_like(y_true)  # Start with 1.0 (no extra weighting)
        
        # Apply higher weight for small targets
        weights[y_true < self.small_threshold] = self.weight_small
        
        # Apply higher weight for large targets
        weights[y_true > self.large_threshold] = self.weight_large
        
        # Calculate weighted loss
        weighted_error = weights * error
        
        # Return the mean of the weighted error
        return torch.mean(weighted_error)

### Main(): Data Loading, Training, and Testing

In [7]:
def main(file_name, date):

    print("\n--- Loading and Preprocessing Data ---")

    dataset_name = "qml_training-validation-data.csv"
    df = pd.read_csv(dataset_name)
    X = df[['Element', 'el_neg', 'B/GPa', 'Volume/A^3']].values
    y = df['SFE/mJm^-3'].values

    y_scaler = MinMaxScaler(feature_range=(-1, 1))
    y = y_scaler.fit_transform(y.reshape(-1,1))
    
    rkf = RepeatedKFold(n_splits=X.shape[0]//3, n_repeats=3)

    df = pd.DataFrame(columns=['element test', 'actual', 'predicted'])

    LEARNING_RATE = 0.01
    BATCH_SIZE = 30
    NUM_EPOCHS = 50 
    # LOSS = nn.MSELoss()
    LOSS = nn.HuberLoss() # maybe less prone to extreme values
    # LOSS = nn.SmoothL1Loss()
    # LOSS = WeightedLoss(small_threshold=5, large_threshold=30, weight_small=16.0, weight_large=4.0)
    
    i = 0

    print("\n--- Start K-Fold Loop ---")

    for train_indices, test_indices in rkf.split(X):
        qnn_torch_model = TorchConnector(qnn, initial_weights=torch.tensor(initial_weights, dtype=torch.float32))
        # model = qnn_torch_model # for purely quantum nn
        model = HybridModel(qnn_torch_model) # classical modifications in the HybridQNN class
        
        # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        X_train, y_train, X_test, y_test, element_test = prepare_dataset_k_fold(X, y, train_indices, test_indices)

        X_train_t = torch.tensor(X_train, dtype=torch.float32)
        y_train_t = torch.tensor(y_train, dtype=torch.float32)
        X_test_t = torch.tensor(X_test, dtype=torch.float32)
        y_test_t = torch.tensor(y_test, dtype=torch.float32)
        
        # Create DataLoaders
        train_dataset = TensorDataset(X_train_t, y_train_t)
        train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
        test_dataset = TensorDataset(X_test_t, y_test_t)
        test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
        
        print(f"Training data shape: X_train_t: {X_train_t.shape}, y_train_t: {y_train_t.shape}")
        print(f"Testing data shape: X_test_t: {X_test_t.shape}, y_test_t: {y_test_t.shape}")
        
        
        optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
        
        print(f"\n--- Starting Training {i}th---")
        train_losses = []
        test_losses = []
        
        for epoch in range(NUM_EPOCHS):
            # Training phase
            model.train()
            running_loss = 0.0
            for batch_X, batch_y in train_loader:
                optimizer.zero_grad()    # Clear gradients
                outputs = model(batch_X) # Forward pass
                loss = LOSS(outputs, batch_y) # Calculate loss
                loss.backward()          # Backward pass (compute gradients)
                optimizer.step()         # Update weights
                running_loss += loss.item() * batch_X.size(0)
        
            epoch_loss = running_loss / len(train_loader.dataset)
            train_losses.append(epoch_loss)
        
            # Validation/Test phase
            model.eval()
            test_loss = 0.0
            with torch.no_grad(): # Disable gradient calculations
                for batch_X_test, batch_y_test in test_loader:
                    outputs_test = model(batch_X_test)
                    loss_test = LOSS(outputs_test, batch_y_test)
                    test_loss += loss_test.item() * batch_X_test.size(0)
        
            epoch_test_loss = test_loss / len(test_loader.dataset)
            test_losses.append(epoch_test_loss)
        
            print(f"Epoch {epoch+1}/{NUM_EPOCHS}, Train Loss: {epoch_loss:.4f}, Test Loss: {epoch_test_loss:.4f}")
        
        print("--- Training Finished ---")
        
        # --- 9. Plotting Training History (Optional) ---
        plt.figure(figsize=(10, 5))
        plt.plot(train_losses, label='Training Loss')
        plt.plot(test_losses, label='Test Loss')
        plt.title('Training and Test Loss Over Epochs')
        plt.xlabel('Epoch')
        plt.ylabel('MSE Loss')
        plt.legend()
        plt.grid(True)
        # plt.show()
        plt.savefig(f"QNN/regression/figure/{date}/{date}_training_history_#{i}.png")
        plt.close()
        
        # --- 10. Evaluation on Test Set (Example) ---
        model.eval()
        all_preds = []
        all_targets = []
        with torch.no_grad():
            for batch_X_test, batch_y_test in test_loader:
                outputs_test = model(batch_X_test)
                all_preds.extend(outputs_test.cpu().numpy())
                all_targets.extend(batch_y_test.cpu().numpy())
        
        all_preds = np.array(all_preds)
        all_targets = np.array(all_targets)
        all_preds = y_scaler.inverse_transform(all_preds.reshape(-1,1))
        all_targets = y_scaler.inverse_transform(all_targets.reshape(-1,1))
        
        new_row = {'element test': element_test,
                   'actual': np.array(all_targets).flatten(),
                   'predicted': np.array(all_preds).flatten()}
        df.loc[len(df)] = new_row
        
        # Example: Scatter plot for regression
        if NUM_TARGETS == 1: # Simple plot if single target variable
            plt.figure(figsize=(8, 8))
            plt.scatter(all_targets, all_preds, alpha=0.5)
            plt.plot([min(all_targets.min(), all_preds.min()), max(all_targets.max(), all_preds.max())],
                     [min(all_targets.min(), all_preds.min()), max(all_targets.max(), all_preds.max())],
                     'k--', lw=2, label='Ideal')
            plt.xlabel('Actual Values')
            plt.ylabel('Predicted Values')
            plt.title('Actual vs. Predicted Values on Test Set')
            plt.legend()
            plt.grid(True)
            # plt.show()
            plt.savefig(f"QNN/regression/figure/{date}/{date}_test_result_#{i}.png")
            plt.close()
        
        # Further evaluation metrics can be added here (e.g., R-squared for regression, accuracy for classification)
        # final_mse = mean_squared_error(all_targets, all_preds)
        # final_r2 = r2_score(all_targets, all_preds)
        # print(f"\n--- Final Test Set Evaluation ---")
        # print(f"Mean Squared Error (MSE): {final_mse:.4f}")
        # print(f"R-squared (R2 Score): {final_r2:.4f}")

        i += 1
        
    df.at[0, "info"] = [f"DATASET: {dataset_name}, FEATURE_MAP_REPS = {FEATURE_MAP_REPS}, ANSATZ_REPS = {ANSATZ_REPS}, LEARNING_RATE = {LEARNING_RATE}, BATCH_SIZE = {BATCH_SIZE}, NUM_EPOCHS = {NUM_EPOCHS}, LOSS: {LOSS}"]
    df.to_csv(file_name, index=False) 

# To make predictions on new data:
# new_data_np = np.array([[val1, val2, val3], ...]) # Your new data
# new_data_scaled = scaler_X.transform(new_data_np) # Don't forget to scale
# new_data_t = torch.tensor(new_data_scaled, dtype=torch.float32)
# with torch.no_grad():
#     predictions = model(new_data_t)
# print(f"Predictions for new data: {predictions.numpy()}")

### Run Main() to start

In [None]:
date = '05_20_25_0'
file_name = f'QNN/regression/result/{date}.csv'
!mkdir QNN/regression/figure/$date

main(file_name, date)

# 14_0 ansatz reps=2, _1 reps=3
# 15_0 HuberLoss, 15_1 SmoothL1Loss

## Classification： 

### Feature Map, Ansatz, then QNN Constructor

In [8]:
input_params = ParameterVector("x", NUM_FEATURES)

feature_map_template = PauliFeatureMap(
    feature_dimension=NUM_FEATURES, # This tells the template how many input parameters it structurally needs
    reps=FEATURE_MAP_REPS,
    entanglement='linear'
)

# Assign the *specific* input parameters from the vector to the template's parameter slots
# This creates a new circuit instance containing parameters ONLY from input_params (size NUM_FEATURES)
feature_map = feature_map_template.assign_parameters(input_params)
print(f"Assigned feature map parameters: {feature_map.num_parameters}")

# Create a template to find out how many parameters it needs structurally
ansatz_template = RealAmplitudes(NUM_QUBITS, reps=ANSATZ_REPS, entanglement="linear")
# ParameterVector for trainable weights - sized based on the template's structural parameters
num_ansatz_params = ansatz_template.num_parameters # This was correctly calculated as 12
weight_params = ParameterVector("θ", num_ansatz_params)

# Create the ansatz circuit instance by assigning the weight parameters to the template
ansatz = ansatz_template.assign_parameters(weight_params)
print(f"Assigned ansatz parameters: {ansatz.num_parameters}") 

qc = QNNCircuit(
    feature_map=feature_map,
    ansatz=ansatz_template,
)

# example 5.2 from Qiskit guide on binary classification
parity = lambda x: "{:b}".format(x).count("1") % 2
output_shape = 2  # parity = 0, 1

sampler = Sampler()

qnn = SamplerQNN(
    circuit=qc,
    interpret=parity,
    output_shape=output_shape,
    sampler=sampler,
    sparse=False,
    input_gradients=False,
)

initial_weights = 0.01 * (2 * np.random.rand(qnn.num_weights) - 1)
qnn_torch_model = TorchConnector(qnn, initial_weights=torch.tensor(initial_weights, dtype=torch.float32))

Assigned feature map parameters: 3
Assigned ansatz parameters: 12


  sampler = Sampler()
  self._weights.data = torch.tensor(initial_weights, dtype=torch.float)


### Process the Target Bariable for Binary Classification

In [9]:
def process(y):
    for i in range(0,len(y)): 
        if y[i]>19: y[i]=0
        else: y[i]=1
    return y

### Main

In [15]:
def main(file_name, date):

    print("\n--- Loading and Preprocessing Data ---")

    dataset_name = "qml_training-validation-data.csv"
    df = pd.read_csv(dataset_name)
    X = df[['Element', 'el_neg', 'B/GPa', 'Volume/A^3']].values
    y = df['SFE/mJm^-3'].values

    y = process(y)
    
    # y_scaler = MinMaxScaler(feature_range=(-1, 1))
    # y = y_scaler.fit_transform(y.reshape(-1,1))
    
    rkf = RepeatedKFold(n_splits=X.shape[0]//3, n_repeats=3)

    df = pd.DataFrame(columns=['element test', 'actual', 'predicted'])

    LEARNING_RATE = 0.01
    BATCH_SIZE = 30
    NUM_EPOCHS = 50 

    LOSS = nn.CrossEntropyLoss() # use torch.long
    # LOSS = nn.MSELoss() # haven't tried
    # LOSS = nn.BCELoss() # use torch.float
    
    i = 0

    print("\n--- Start K-Fold Loop ---")

    for train_indices, test_indices in rkf.split(X):
        model = HybridModel(qnn_torch_model) 
        
        X_train, y_train, X_test, y_test, element_test = prepare_dataset_k_fold(X, y, train_indices, test_indices)

        X_train_t = torch.tensor(X_train, dtype=torch.float32)
        y_train_t = torch.tensor(y_train, dtype=torch.long) # may need to change this
        X_test_t = torch.tensor(X_test, dtype=torch.float32)
        y_test_t = torch.tensor(y_test, dtype=torch.long)# and this if using a different Loss function. BCELoss should use float and CrossEntropy should use be long
        
        # Create DataLoaders
        train_dataset = TensorDataset(X_train_t, y_train_t)
        train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
        test_dataset = TensorDataset(X_test_t, y_test_t)
        test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
        
        print(f"Training data shape: X_train_t: {X_train_t.shape}, y_train_t: {y_train_t.shape}")
        print(f"Testing data shape: X_test_t: {X_test_t.shape}, y_test_t: {y_test_t.shape}")
    
        
        optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
        
        print(f"\n--- Starting Training {i}th---")
        train_losses = []
        test_losses = []
        
        for epoch in range(NUM_EPOCHS):
            # Training phase
            model.train()
            running_loss = 0.0
            for batch_X, batch_y in train_loader:
                optimizer.zero_grad(set_to_none=True)    # Clear gradients
                outputs = model(batch_X) # Forward pass

                
                
                # print(outputs)
                # print(batch_y)
                loss = LOSS(outputs, batch_y) # Calculate loss
                loss.backward()          # Backward pass (compute gradients)
                optimizer.step()         # Update weights
                running_loss += loss.item() * batch_X.size(0)
        
            epoch_loss = running_loss / len(train_loader.dataset)
            train_losses.append(epoch_loss)
        
            # Validation/Test phase
            model.eval()
            test_loss = 0.0
            with torch.no_grad(): # Disable gradient calculations
                for batch_X_test, batch_y_test in test_loader:
                    outputs_test = model(batch_X_test)
                    loss_test = LOSS(outputs_test, batch_y_test)
                    test_loss += loss_test.item() * batch_X_test.size(0)
        
            epoch_test_loss = test_loss / len(test_loader.dataset)
            test_losses.append(epoch_test_loss)
        
            print(f"Epoch {epoch+1}/{NUM_EPOCHS}, Train Loss: {epoch_loss:.4f}, Test Loss: {epoch_test_loss:.4f}")
        
        print("--- Training Finished ---")
        
        plt.figure(figsize=(10, 5))
        plt.plot(train_losses, label='Training Loss')
        plt.plot(test_losses, label='Test Loss')
        plt.title('Training and Test Loss Over Epochs')
        plt.xlabel('Epoch')
        plt.ylabel('MSE Loss')
        plt.legend()
        plt.grid(True)
        # plt.show()
        plt.savefig(f"QNN/classification/figure/{date}/{date}_training_history_#{i}.png")
        plt.close()
        
        model.eval()
        all_preds = []
        all_targets = []
        with torch.no_grad():
            for batch_X_test, batch_y_test in test_loader:
                outputs_test = model(batch_X_test)
                all_preds.extend(outputs_test.cpu().numpy())
                all_targets.extend(batch_y_test.cpu().numpy())
        
        all_preds = np.array(all_preds)
        all_targets = np.array(all_targets)
        all_preds_temp = []
        for item in all_preds:
            if(item[1] > item[0]):
                all_preds_temp.append(1)
            else:
                all_preds_temp.append(0)
        print(all_preds)
        all_preds = np.array(all_preds_temp)
        
        new_row = {'element test': element_test,
                   'actual': np.array(all_targets).flatten(),
                   'predicted': np.array(all_preds).flatten()}
        df.loc[len(df)] = new_row
        
        if NUM_TARGETS == 1:
            plt.figure(figsize=(8, 8))
            plt.scatter(all_targets, all_preds, alpha=0.5)
            plt.plot([0, 0, 1, 1],
                     [0, 0, 1, 1],
                     'k--', lw=2, label='Ideal')
            plt.xlabel('Actual Values')
            plt.ylabel('Predicted Values')
            plt.title('Actual vs. Predicted Values on Test Set')
            plt.legend()
            plt.grid(True)
            # plt.show()
            plt.savefig(f"QNN/classification/figure/{date}/{date}_test_result_#{i}.png")
            plt.close()

        i += 1
        
    df.at[0, "info"] = [f"DATASET: {dataset_name}, FEATURE_MAP_REPS = {FEATURE_MAP_REPS}, ANSATZ_REPS = {ANSATZ_REPS}, LEARNING_RATE = {LEARNING_RATE}, BATCH_SIZE = {BATCH_SIZE}, NUM_EPOCHS = {NUM_EPOCHS}, LOSS: {LOSS}"]
    df.to_csv(file_name, index=False) 


### run main() to start

In [16]:
date = '05_19_25_1'
file_name = f'QNN/classification/result/{date}.csv'
!mkdir QNN/classification/figure/$date

main(file_name, date)


mkdir: cannot create directory 'QNN/classification/figure/05_19_25_1': File exists

--- Loading and Preprocessing Data ---

--- Start K-Fold Loop ---
Training data shape: X_train_t: torch.Size([18, 3]), y_train_t: torch.Size([18])
Testing data shape: X_test_t: torch.Size([3, 3]), y_test_t: torch.Size([3])

--- Starting Training 0th---
Epoch 1/50, Train Loss: 0.6921, Test Loss: 0.6942
Epoch 2/50, Train Loss: 0.6890, Test Loss: 0.6946
Epoch 3/50, Train Loss: 0.6857, Test Loss: 0.6951
Epoch 4/50, Train Loss: 0.6823, Test Loss: 0.6954
Epoch 5/50, Train Loss: 0.6788, Test Loss: 0.6957
Epoch 6/50, Train Loss: 0.6753, Test Loss: 0.6959
Epoch 7/50, Train Loss: 0.6716, Test Loss: 0.6960
Epoch 8/50, Train Loss: 0.6679, Test Loss: 0.6961
Epoch 9/50, Train Loss: 0.6641, Test Loss: 0.6961
Epoch 10/50, Train Loss: 0.6602, Test Loss: 0.6961
Epoch 11/50, Train Loss: 0.6563, Test Loss: 0.6960


KeyboardInterrupt: 