In [1]:
# Quantum Autoencoder for Enhanced Fraud Detection in Imbalanced Credit Card Dataset
# Implementation based on the IEEE Access 2024 paper by Huot et al.
# DOI: 10.1109/ACCESS.2024.3496901

# Core libraries for data processing and machine learning
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import (confusion_matrix, accuracy_score, precision_score,
                             recall_score, f1_score, roc_auc_score)
from sklearn.decomposition import PCA
from tqdm import tqdm

# Quantum machine learning framework
import pennylane as qml
import pennylane.numpy as pnp  # PennyLane-specific numpy for automatic differentiation

# IBM quantum
from qiskit_ibm_runtime import QiskitRuntimeService

# dotenv for environment variables
from dotenv import load_dotenv
import os
# Load environment variables
load_dotenv()


True

In [2]:
# ==========================================
# Data Loading and Preprocessing Pipeline
# ==========================================

# Load preprocessed credit card fraud dataset
# The dataset contains anonymized features V1-V28 (PCA transformed)
# and 'Class' label (0: normal, 1: fraud)
df = pd.read_csv("preprocessed-creditcard.csv")
X = df.drop("Class", axis=1).values  # Feature matrix
y = df["Class"].values                # Target labels

# Stratified train-test split to maintain class distribution
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

# Feature standardization using Z-score normalization
# Essential for quantum circuits as rotation angles should be bounded
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test  = scaler.transform(X_test)

# Dimensionality reduction using PCA to match quantum register size
# Reduces 30 features to 4 qubits for quantum processing
pca = PCA(n_components=4, random_state=42)
X_train_4d = pca.fit_transform(X_train)
X_test_4d  = pca.transform(X_test)

In [3]:
# To access saved credentials for the IBM quantum channel and select an instance
service = QiskitRuntimeService(
    channel="ibm_quantum",
    instance="yonsei-dedicated/internal/ybscandid",
    token=os.getenv("IBM_QUANTUM_TOKEN"),
)
backend = service.least_busy(operational=True, simulator=False, min_num_qubits=4)
print(f"Using backend: {backend.name}")

Using backend: ibm_yonsei


In [4]:
# ==========================================
# Quantum Autoencoder Architecture
# ==========================================

# Quantum system configuration
n_qubits = 4  # Number of qubits matching PCA dimensions
L = 4         # Number of variational layers for expressivity
dev = qml.device("qiskit.remote", wires=n_qubits, backend=backend)

def qae_layer(theta):
    """
    Single variational layer of the Quantum Autoencoder.
    
    Architecture follows the paper's design:
    1. Parameterized single-qubit rotations (RX, RY, RZ) for each qubit
    2. Circular CNOT gates for entanglement generation
    
    Args:
        theta: Parameter tensor of shape (n_qubits, 3) for rotation angles
    """
    # Apply parameterized rotations to each qubit
    # Each qubit gets 3 rotation parameters (X, Y, Z rotations)
    for w in range(n_qubits):
        qml.RX(theta[w, 0], wires=w)  # Rotation around X-axis
        qml.RY(theta[w, 1], wires=w)  # Rotation around Y-axis  
        qml.RZ(theta[w, 2], wires=w)  # Rotation around Z-axis
    
    # Circular entangling layer using CNOT gates
    # Creates quantum correlations between neighboring qubits
    for w in range(n_qubits):
        qml.CNOT(wires=[w, (w + 1) % n_qubits])

@qml.qnode(dev)
def qae_circuit(x, weights):
    """
    Complete Quantum Autoencoder circuit.
    
    Process:
    1. Data encoding: Maps classical data to quantum states
    2. Variational ansatz: Trainable quantum transformations
    3. Measurement: Extract reconstruction fidelity from trash qubit
    
    Args:
        x: Input feature vector (4D after PCA)
        weights: Variational parameters of shape (L, n_qubits, 3)
    
    Returns:
        Expectation value of Pauli-Z on the trash qubit (qubit 3)
    """
    # Data encoding: Map classical features to quantum state amplitudes
    # Uses Y-rotations as recommended for better gradient flow
    qml.AngleEmbedding(features=x, wires=range(n_qubits), rotation="Y")
    
    # Apply L layers of variational quantum transformations
    # Enables the autoencoder to learn complex data representations
    for l in range(L):
        qae_layer(weights[l])
    
    # Measure trash qubit (last qubit) to extract reconstruction quality
    # High fidelity reconstruction → trash qubit in |0⟩ state → ⟨Z⟩ ≈ +1
    # Poor reconstruction → mixed state → ⟨Z⟩ ≈ 0
    return qml.expval(qml.PauliZ(n_qubits - 1))

In [5]:
def cost_fn(weights, X_batch):
    """
    Quantum Autoencoder cost function for batch training.
    
    The cost function implements the reconstruction error minimization:
    Cost = 1/N * Σ(1 - fidelity_i)
    
    Where fidelity = (1 + ⟨Z⟩)/2 represents the probability of
    the trash qubit being in |0⟩ state (perfect reconstruction).
    
    Args:
        weights: Variational parameters for the quantum circuit
        X_batch: Mini-batch of training samples
    
    Returns:
        Average reconstruction error for the batch
    """
    expvals = []
    
    # Process each sample in the batch
    for sample in X_batch:
        # Convert to PennyLane tensor for automatic differentiation
        features = pnp.array(sample, requires_grad=False)
        
        # Forward pass through quantum autoencoder
        # Returns ⟨Z⟩ expectation value from trash qubit measurement
        expval = qae_circuit(features, weights)
        expvals.append(expval)
    
    # Convert expectation values to reconstruction fidelities
    # Fidelity = (1 + ⟨Z⟩)/2 maps [-1,1] → [0,1]
    # ⟨Z⟩ = +1 → fidelity = 1 (perfect reconstruction)
    # ⟨Z⟩ = -1 → fidelity = 0 (poor reconstruction)
    fidelities = [(1 + e) / 2 for e in expvals]
    
    # squared reconstruction errors
    errors = [(1 - f)**2 for f in fidelities]
    
    # Return mean reconstruction error across the batch
    return pnp.mean(pnp.stack(errors))

In [6]:
# ==========================================
# Quantum Autoencoder Training Pipeline
# ==========================================

# Optimization configuration
opt = qml.AdamOptimizer(stepsize=0.001)  # Adam optimizer with learning rate 0.001
epochs = 50                              # Number of training epochs
batch_size = 16                          # Mini-batch size for stochastic optimization

# Initialize variational parameters using small random values
# Shape: (L layers, n_qubits, 3 rotation parameters per qubit)
# Small initialization prevents vanishing gradients in quantum circuits
weights = pnp.random.normal(0, 0.01, (L, n_qubits, 3), requires_grad=True)

# Training loop with batch-wise gradient descent
print("Starting Quantum Autoencoder Training...")
print(f"Dataset: {len(X_train_4d)} samples, {n_qubits} qubits, {L} layers")
print("-" * 60)

for epoch in range(epochs):
    # Create batch indices for mini-batch training
    batch_iter = range(0, len(X_train_4d), batch_size)
    
    # Process each mini-batch
    for idx in tqdm(batch_iter, desc=f"Epoch {epoch+1}/{epochs}", ncols=80):        
        # Extract current mini-batch
        X_batch = X_train_4d[idx : idx + batch_size]
        
        # Perform one optimization step
        # Lambda function captures X_batch for the optimizer
        weights = opt.step(lambda w: cost_fn(w, X_batch), weights)
    
    # Evaluate training loss on a subset for monitoring
    train_loss = cost_fn(weights, X_train_4d[:batch_size])
    print(f"Epoch {epoch+1:>2d}  Reconstruction Loss = {train_loss.item():.4f}")

print("\nTraining completed!")

Starting Quantum Autoencoder Training...
Dataset: 756 samples, 4 qubits, 4 layers
------------------------------------------------------------


Epoch 1/50:   0%|                                        | 0/48 [00:00<?, ?it/s]

  import pkg_resources
Epoch 1/50:   0%|                                      | 0/48 [2:10:51<?, ?it/s]


IBMRuntimeError: 'Error closing session: \'HTTPSConnectionPool(host=\\\'api.quantum.ibm.com\\\', port=443): Max retries exceeded with url: /runtime/sessions/d13gmha3grvg008j0bn0 (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x13c966420>: Failed to resolve \\\'api.quantum.ibm.com\\\' ([Errno 8] nodename nor servname provided, or not known)"))\''

In [None]:
# Retrieve the job by its ID
job = service.job('d13gmj2mya70008eja5g')
job_result = job.result()

# Extract expectation values from the published results
expvals = []
for idx, pub in enumerate(job_result):
    evs = pub.data.evs           # list of expectation values for this publish
    print(f"Expectation values for publication {idx}: {evs}")
    expvals.extend(evs)

# 4. Convert to fidelities and use for downstream metrics
p0_test = np.array([(ev + 1.0) / 2.0 for ev in expvals])
print("Reconstructed fidelities:", p0_test)

Expectation values for publication 0: [-0.22784967]
Reconstructed fidelities: [0.38607517]


In [None]:
# ==========================================
# Model Evaluation and Fraud Detection
# ==========================================

# Compute reconstruction fidelities for test set
# Lower fidelity indicates anomalous (potentially fraudulent) transactions
print("Computing reconstruction fidelities for test set...")
p0_test = np.array([(qae_circuit(x, weights) + 1) / 2 for x in X_test_4d])

print(f"Fidelity statistics:")
print(f"  Mean: {np.mean(p0_test):.4f}")
print(f"  Std:  {np.std(p0_test):.4f}")
print(f"  Min:  {np.min(p0_test):.4f}")
print(f"  Max:  {np.max(p0_test):.4f}")

# ==========================================
# Performance Metrics Evaluation
# ==========================================

def metrics(y_true, y_pred):
    """
    Compute comprehensive evaluation metrics for binary classification.
    
    Includes standard metrics plus G-Mean which is particularly important
    for imbalanced datasets as it balances sensitivity and specificity.
    """
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0,1]).ravel()
    
    # Standard classification metrics
    acc  = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, zero_division=0)
    rec  = recall_score(y_true, y_pred, zero_division=0)  # Sensitivity
    f1   = f1_score(y_true, y_pred, zero_division=0)
    
    # Specificity (True Negative Rate)
    spec = tn / (tn + fp) if (tn + fp) else 0.
    
    # Geometric Mean of Sensitivity and Specificity
    # Balanced metric for imbalanced datasets
    gmean = (rec * spec) ** 0.5
    
    return dict(TN=tn, FP=fp, FN=fn, TP=tp,
                Accuracy=acc, Precision=prec,
                Recall=rec, F1=f1, Specificity=spec, Gmean=gmean)

# Evaluate performance across multiple thresholds
print("\n" + "=" * 70)
print("FRAUD DETECTION PERFORMANCE EVALUATION")
print("=" * 70)

for T in [0.5, 0.6, 0.7]:
    # Classification rule: Low fidelity (p0 < T) indicates fraud
    # This assumes normal transactions have high reconstruction fidelity
    y_pred = (p0_test < T).astype(int)
    
    m = metrics(y_test, y_pred)
    
    print(f"\nThreshold: {T} (Fidelity < {T} → Fraud)")
    print("-" * 40)
    print("Confusion Matrix:", {k:m[k] for k in ["TN","FP","FN","TP"]})
    print("Accuracy={Accuracy:.4f} Precision={Precision:.3f} Recall={Recall:.3f}"
          " F1={F1:.3f} Specificity={Specificity:.3f} G-Mean={Gmean:.3f}".format(**m))

# Area Under the ROC Curve (threshold-independent metric)
# Uses (1 - p0) as fraud score since lower fidelity indicates fraud
auc = roc_auc_score(y_test, 1 - p0_test)
print(f"\nAUC-ROC Score: {auc:.4f}")
print("\nNote: AUC-ROC > 0.5 indicates better than random performance")
print("      Values closer to 1.0 indicate better fraud detection capability")
