In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import importlib
import rfm_q
importlib.reload(rfm_q)  # Reload the updated module
from rfm import *
from qiskit import QuantumCircuit
from qiskit_aer import Aer
import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
import csv
import scipy
from torch.utils.data import TensorDataset, DataLoader
import hickle
from numpy.linalg import solve
import time
from qiskit.circuit.library import ZZFeatureMap
from qiskit_aer import Aer
from qiskit_machine_learning.kernels import FidelityQuantumKernel
import numpy as np
from typing import Optional
from qiskit import QuantumCircuit
from qiskit_machine_learning.state_fidelities import BaseStateFidelity

In [3]:
def pre_process(torchset,n_samples,num_classes=10):
    indices = list(np.random.choice(len(torchset),n_samples))

    trainset = []
    for ix in indices:
        x,y = torchset[ix]
        ohe_y = torch.zeros(num_classes)
        ohe_y[y] = 1
        trainset.append(((x/np.linalg.norm(x)).reshape(-1),ohe_y))
    return trainset

In [4]:
train_data = scipy.io.loadmat('MNIST_4x4/4x4MNIST_Train&Test/MNIST_Train_Nox16.mat')
test_data = scipy.io.loadmat('MNIST_4x4/4x4MNIST_Train&Test/MNIST_Test_Nox16.mat')

X_train = train_data['VV'][:50]
X_test = test_data['UU'][:50]

print(X_train.shape)
print(X_test.shape)

(50, 16)
(50, 16)


In [5]:
y_train = []
y_test = []

csv_file_path1 = 'MNIST_4x4/4x4MNIST_Train&Test/mnist_train.csv'
csv_file_path2 = 'MNIST_4x4/4x4MNIST_Train&Test/mnist_test.csv'

# Open the CSV file in read mode.
with open(csv_file_path1, newline='', encoding='utf-8') as csvfile:
    csvreader = csv.reader(csvfile)

    for row in csvreader:
        if row:  
            y_train.append(int(row[0]))

with open(csv_file_path2, newline='', encoding='utf-8') as csvfile:
    csvreader = csv.reader(csvfile)

    for row in csvreader:
        if row:  
            y_test.append(int(row[0]))

num_classes = 10
y_train = np.eye(num_classes)[y_train[:50]]
y_test = np.eye(num_classes)[y_test[:50]]

print(y_train.shape)
print(y_test.shape)

(50, 10)
(50, 10)


In [6]:
def get_data(loader):
    X_list, y_list = [], []
    for batch in loader:
        inputs, labels = batch
        X_list.append(inputs)
        y_list.append(labels)
    X = torch.cat(X_list, dim=0)
    y = torch.cat(y_list, dim=0)
    return X, y

In [7]:
def encode_features(X):
    """Scales each column of X to [0, 1] then multiplies by pi."""
    X = X.copy().astype(np.float64)  # ensure float type
    for j in range(X.shape[1]):
        col_min = X[:, j].min()
        col_max = X[:, j].max()
        if abs(col_max - col_min) < 1e-12:
            X[:, j] = 0.0
        else:
            X[:, j] = (X[:, j] - col_min) / (col_max - col_min)
    X *= np.pi
    return X

In [8]:
def make_kernel_batches(x_vec, y_vec, batch_size, is_symmetric, evaluate_duplicates):
    n_x, n_y = x_vec.shape[0], y_vec.shape[0]
    current_batch = []
    batch_count = 0

    def generator():
        nonlocal current_batch, batch_count
        for i in range(n_x):
            js = range(i, n_y) if is_symmetric else range(n_y)
            for j in js:
                if evaluate_duplicates == "off_diagonal" and is_symmetric and i == j:
                    continue
                if evaluate_duplicates == "none" and np.array_equal(x_vec[i], y_vec[j]):
                    continue
                current_batch.append((i, j, x_vec[i], y_vec[j]))
                if len(current_batch) == batch_size:
                    batch_count += 1
                    yield current_batch
                    current_batch = []
        if current_batch:
            batch_count += 1
            yield current_batch

    # Materialize all batches to count them
    all_batches = list(generator())
    return iter(all_batches), batch_count


In [9]:
def evaluate_kernel_matrix(x_vec, y_vec, feature_map, fidelity, enforce_psd=True, evaluate_duplicates="off_diagonal", batch_size=256):
    if y_vec is None:
        y_vec = x_vec
        is_symmetric = True
    else:
        is_symmetric = np.array_equal(x_vec, y_vec)

    n_x, n_y = x_vec.shape[0], y_vec.shape[0] # n_x, n_x is num sample at this point
    kernel_matrix = np.ones((n_x, n_y))

    print(f"kernel_matrix shape: {kernel_matrix.shape}")
    # Get batch generator and total count
    batch_gen, total_batches = make_kernel_batches(x_vec, y_vec, batch_size, is_symmetric, evaluate_duplicates)

    # Process batches with tqdm progress bar
    for batch in tqdm(batch_gen, total=total_batches, desc="[Quantum Kernel Eval]"):
        i_indices, j_indices, lefts, rights = zip(*batch)
        lefts = np.array(lefts)
        rights = np.array(rights)

        job = fidelity.run(
            [feature_map] * len(batch),
            [feature_map] * len(batch),
            lefts,
            rights
        )
        results = job.result().fidelities

        for k in range(len(batch)):
            i, j = i_indices[k], j_indices[k]
            kernel_matrix[i, j] = results[k]
            if is_symmetric:
                kernel_matrix[j, i] = results[k]

    if is_symmetric and enforce_psd:
        eigvals, eigvecs = np.linalg.eigh(kernel_matrix)
        eigvals = np.clip(eigvals, 0, None)
        kernel_matrix = eigvecs @ np.diag(eigvals) @ eigvecs.T

    return kernel_matrix

In [10]:
def quantum_kernel_matrix(X1, X2, q_kernel, M=None, do_encode=True):
    """
    Evaluates the quantum kernel matrix for inputs X1 and X2.
    Applies optional encoding and a linear transformation via M.
    """
    if isinstance(X1, torch.Tensor):
        X1 = X1.cpu().numpy()
    if isinstance(X2, torch.Tensor):
        X2 = X2.cpu().numpy()

    if do_encode:
        X1 = encode_features(X1)
        X2 = encode_features(X2)

    if M is not None:
        sqrtM = np.real_if_close(np.linalg.cholesky(M))
        X1 = X1 @ sqrtM
        X2 = X2 @ sqrtM

    print(f"Input X1 shape: {X1.shape}")
    print(f"Input X2 shape: {X2.shape}")

    feature_map = q_kernel.feature_map
    fidelity = q_kernel.fidelity
    print("[DEBUG] Evaluating Quantum Kernel...")
    start_time = time.time()
    K = evaluate_kernel_matrix(x_vec=X1, y_vec=X2, feature_map=feature_map, fidelity=fidelity)
    end_time = time.time()
    print(f"[DEBUG] Kernel evaluated: shape {K.shape}, time {end_time - start_time:.2f} s")
    return K

In [11]:
def q_rfm(train_loader, test_loader,
          iters=3, name=None, batch_size=2, reg=1e-3,
          train_acc=False, loader=True, classif=True):
    print("[DEBUG] Entered q_rfm function...")
    
    # Use dummy training data to set the feature dimension
    X_train_dummy, _ = get_data(train_loader) if loader else train_loader
    feature_dim = X_train_dummy.shape[1]
    feature_map = ZZFeatureMap(feature_dimension=feature_dim, reps=1, entanglement="full")
    
    print("[DEBUG] Creating FidelityQuantumKernel...")
    q_kernel = FidelityQuantumKernel(feature_map=feature_map)
    print("[DEBUG] FidelityQuantumKernel created successfully")

    if loader:
        print("[DEBUG] Loaders provided")
        X_train, y_train = get_data(train_loader)
        X_test, y_test = get_data(test_loader)
    else:
        print("[DEBUG] Loaders not used, loading manually")
        X_train, y_train = train_loader
        X_test, y_test = test_loader
        X_train = torch.from_numpy(X_train).float()
        X_test = torch.from_numpy(X_test).float()
        y_train = torch.from_numpy(y_train).float()
        y_test = torch.from_numpy(y_test).float()
    
    print(f"[DEBUG] X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
    print(f"[DEBUG] X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")
    
    feature_dim = X_train.shape[1]
    feature_map.feature_dimension = feature_dim

    for i in range(iters):
        print(f"[DEBUG] Iteration {i+1}/{iters} started...")
        print("[DEBUG] Computing K_train...")
        K_train = quantum_kernel_matrix(X_train, X_train, q_kernel)
        print(f"[DEBUG] K_train computed, shape: {K_train.shape}")
        sol = solve(K_train + reg * np.eye(len(K_train)), y_train.numpy()).T
        
        if train_acc:
            preds = (sol @ K_train).T
            y_pred = torch.from_numpy(preds)
            preds_class = torch.argmax(y_pred, dim=-1)
            labels = torch.argmax(y_train, dim=-1)
            count = torch.sum(labels == preds_class).numpy()
            print("Round " + str(i) + " Train Acc: ", count / len(labels))
        
        K_test = quantum_kernel_matrix(X_train, X_test, q_kernel)
        preds = (sol @ K_test).T
        mse = np.mean(np.square(preds - y_test.numpy()))
        print("Round " + str(i) + " MSE: ", mse)
        
        if classif:
            y_pred = torch.from_numpy(preds)
            preds_class = torch.argmax(y_pred, dim=-1)
            labels = torch.argmax(y_test, dim=-1)
            count = torch.sum(labels == preds_class).numpy()
            print("Round " + str(i) + " Acc: ", count / len(labels))

    return mse, K_train, K_test

In [None]:
# Convert to torch tensors.
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# Create DataLoaders.
batch_size = 16
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Run q_rfm on dummy data to test MSE and accuracy.
mse_final, K_train, K_test = q_rfm(train_loader, test_loader, iters=2, loader=True, classif=True, train_acc=True)
print(f"Final MSE from q_rfm on 4x4 data: {mse_final}")
print(f"K_train shape: {K_train.shape}")
print(f"K_test shape: {K_test}")

[DEBUG] Entered q_rfm function...
[DEBUG] Creating FidelityQuantumKernel...
[DEBUG] FidelityQuantumKernel created successfully
[DEBUG] Loaders provided
[DEBUG] X_train shape: torch.Size([50, 16]), y_train shape: torch.Size([50, 10])
[DEBUG] X_test shape: torch.Size([50, 16]), y_test shape: torch.Size([50, 10])
[DEBUG] Iteration 1/2 started...
[DEBUG] Computing K_train...
Input X1 shape: (50, 16)
Input X2 shape: (50, 16)
[DEBUG] Evaluating Quantum Kernel...
kernel_matrix shape: (50, 50)


[Quantum Kernel Eval]:   0%|          | 0/5 [00:00<?, ?it/s]

[Quantum Kernel Eval]: 100%|██████████| 5/5 [16:38<00:00, 199.65s/it]


[DEBUG] Kernel evaluated: shape (50, 50), time 998.61 s
[DEBUG] K_train computed, shape: (50, 50)
Round 0 Train Acc:  1.0
Input X1 shape: (50, 16)
Input X2 shape: (50, 16)
[DEBUG] Evaluating Quantum Kernel...
kernel_matrix shape: (50, 50)


[Quantum Kernel Eval]: 100%|██████████| 10/10 [33:58<00:00, 203.89s/it]


[DEBUG] Kernel evaluated: shape (50, 50), time 2039.21 s
Round 0 MSE:  0.09851583488029791
Round 0 Acc:  0.34
[DEBUG] Iteration 2/2 started...
[DEBUG] Computing K_train...
Input X1 shape: (50, 16)
Input X2 shape: (50, 16)
[DEBUG] Evaluating Quantum Kernel...
kernel_matrix shape: (50, 50)


[Quantum Kernel Eval]: 100%|██████████| 5/5 [16:37<00:00, 199.48s/it]


[DEBUG] Kernel evaluated: shape (50, 50), time 997.78 s
[DEBUG] K_train computed, shape: (50, 50)
Round 1 Train Acc:  1.0
Input X1 shape: (50, 16)
Input X2 shape: (50, 16)
[DEBUG] Evaluating Quantum Kernel...
kernel_matrix shape: (50, 50)


[Quantum Kernel Eval]:  80%|████████  | 8/10 [28:17<07:04, 212.40s/it]

Reduce number of classes
Reduce to 20 samples (20 train, 5 test)