In [1]:
import os
import random
import json
import math
from pathlib import Path
from datetime import datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.nn.utils as utils
import torch.autograd
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from torchvision.transforms import v2
from torchvision.models import ResNet50_Weights
from torchvision.io import read_image

from sklearn.metrics import (accuracy_score, confusion_matrix, f1_score,
                             precision_score, recall_score)

import qiskit
from qiskit import transpile
from qiskit.circuit import QuantumCircuit, Parameter
from qiskit_aer import Aer

import torchxrayvision as xrv

  from tqdm.autonotebook import tqdm


**1** Display current device and versions of Qiskit, PyTorch and CUDA

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
    print(f"Using: {torch.cuda.get_device_name(0)}")
    print(f"CUDA: {torch.version.cuda}")
else:
    print("CUDA is not available. Using CPU.")

print(f"Qiskit: {qiskit.__version__}")
torch_version = torch.__version__.split('+')[0]
print(f"Torch: {torch_version}")


Using: NVIDIA GeForce RTX 4070 Ti SUPER
CUDA: 12.4
Qiskit: 1.2.4
Torch: 2.4.1


**2** Mammography dataset for model training

In [3]:
class MammoDataset(Dataset):
    def __init__(self, root_dir, train=True, data_augmentation=True):
        self.image_paths = []
        self.labels = []

        # Base transformation: resizing, tensor conversion, and normalization
        base_transform = [transforms.Resize((250, 250)),
                          transforms.ToTensor(),
                          transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])]

        # Apply data augmentation if in training mode
        if train and data_augmentation:
            augmentation_transform = [transforms.RandomVerticalFlip(),
                                      v2.GaussianNoise(0.3),
                                      transforms.RandomRotation(45)]
            self.transform = transforms.Compose(base_transform + augmentation_transform)
        else:
            self.transform = transforms.Compose(base_transform)

        # Load image paths and corresponding labels
        for label in ['0', '1']:
            folder_path = os.path.join(root_dir, label)
            self.image_paths.extend([os.path.join(folder_path, img_name) for img_name in os.listdir(folder_path)])
            self.labels.extend([int(label)] * len(os.listdir(folder_path)))

    def __len__(self):
        return len(self.labels)

    # Fetch an image and its corresponding label
    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert('RGB')
        if self.transform:
            image = self.transform(image)
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return image, label

**3** Hybrid

In [4]:
class QuantumCircuit:    
    def __init__(self, n_qubits, backend, shots):
        # Initialize the quantum circuit with the specified number of qubits (number of outputs of the model)
        self._circuit = qiskit.QuantumCircuit(n_qubits)

        self.backend = backend
        self.shots = shots
        
        all_qubits = [i for i in range(n_qubits)]
        self.theta_0 = qiskit.circuit.Parameter('theta0')
        
        # Apply Hadamard gate to all qubits to create superposition
        self._circuit.h(all_qubits)

        self._circuit.barrier()
        
        # Apply parameterized Ry rotation to all qubits
        self._circuit.ry(self.theta_0, all_qubits)
        
        # Add a CNOT gate to create entanglement
        if n_qubits > 1:
            self._circuit.cx(0, 1)

        self._circuit.barrier()
        
        # Measure all qubits to retrieve their states
        self._circuit.measure_all()
        
        
    
    def run(self, thetas):
        # Assign parameter values for the circuit
        param_values = {
            self.theta_0: thetas[0],
        }
        
        # Bind the parameters to the circuit
        bound_circuit = self._circuit.assign_parameters(param_values)
        
        # Transpile the circuit for the specified backend
        transpiled_circuit = transpile(bound_circuit, self.backend)
        
        # Execute the circuit and obtain results
        job = self.backend.run(transpiled_circuit, shots=self.shots)
        
        # Get the result counts from the job
        result = job.result().get_counts()
        
        expectations = []

        # Check if the result is a list (multiple circuits)
        if isinstance(result, list):
            for i in result:
                counts = np.array(list(i.values()))
                states = np.array(list(i.keys())).astype(float)
                
                # Calculate probabilities for each state
                probabilities = counts / self.shots
                
                # Compute the expectation value for the current result
                expectation = np.sum(states * probabilities)
                expectations.append(expectation)
        else:
            # Handle single result scenario
            counts = np.array(list(result.values()))
            states = np.array(list(result.keys())).astype(float)
            
            probabilities = counts / self.shots
            
            # Compute the expectation value for the single result
            expectation = np.sum(states * probabilities)
            expectations.append(expectation)
        
        # Return the computed expectation values as a numpy array
        return np.array(expectations)

**3.1** Hybrid quantum-classical function

In [5]:
class HybridFunction(torch.autograd.Function):
    @staticmethod
    def forward(ctx, input, quantum_circuit, shift):
        ctx.shift = shift  # Save the shift value
        ctx.quantum_circuit = quantum_circuit  # Save the quantum circuit

        # Run the quantum circuit and get the expectation value
        expectation_z = ctx.quantum_circuit.run(input.tolist()[0])
        
        # Create a result tensor on the same device as the input
        result = torch.tensor([expectation_z], device=input.device)
        
        ctx.save_for_backward(input, result)  # Save tensors for backward pass

        return result
        
    @staticmethod
    def backward(ctx, grad_output):
        input, expectation_z = ctx.saved_tensors
        
        device = grad_output.device 
        input = input.to(device)

        input_list = input.tolist()[0]
        input_tensor = torch.tensor(input_list, device=device) 

        shift_value = torch.tensor(ctx.shift, device=device) * torch.ones_like(input_tensor) 

        # Calculate shifted values
        shift_right = input_tensor + shift_value
        shift_left = input_tensor - shift_value

        gradients = [] 
        for i in range(len(input_tensor)):
            # Run the quantum circuit for shifted values
            expectation_right = ctx.quantum_circuit.run([shift_right[i].item()])
            expectation_left = ctx.quantum_circuit.run([shift_left[i].item()])

            # Create tensors for results
            expectation_right = torch.tensor([expectation_right], device=device)
            expectation_left = torch.tensor([expectation_left], device=device)

            # Calculate the gradient
            gradient = expectation_right - expectation_left
            gradients.append(gradient) 

        # Convert gradients list to tensor
        gradients = torch.cat(gradients).to(device).float()

        return gradients * grad_output.float(), None, None 


**3.2** Hybrid quantum-classical layer

In [6]:
class Hybrid(nn.Module):
    def __init__(self, n_qubits, backend, shots, shift):
        super(Hybrid, self).__init__()
        self.quantum_circuit = QuantumCircuit(n_qubits, backend, shots) 
        self.shift = shift  
        
    def forward(self, input):
        batch_results = []
        for i in range(input.shape[0]):
            # Apply the hybrid function for each input
            result = HybridFunction.apply(input[i].unsqueeze(0), self.quantum_circuit, self.shift)
            batch_results.append(result)
        return torch.cat(batch_results) 

**4** Models

In [7]:
class Resnet(torch.nn.Module):
    def __init__(self, out_features = 2):
        super(Resnet, self).__init__()
        self.resnet = models.resnet50(weights=ResNet50_Weights.DEFAULT)
        self.resnet.fc = torch.nn.Linear(self.resnet.fc.in_features, out_features)
    def forward(self, x):
        return self.resnet(x)

In [8]:
class qcNet(nn.Module):
    def __init__(self):
        super(qcNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, kernel_size=5, stride=2, padding=1)
        self.conv2 = nn.Conv2d(6, 15, kernel_size=3, stride=2, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=1)
        self.drop1 = nn.Dropout2d(p=0.2)
        self.drop2 = nn.Dropout2d(p=0.5)
        self.fc1 = nn.Linear(55815, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 1)
        sim = Aer.get_backend('aer_simulator_statevector')
        self.theta_qubit = nn.Parameter(torch.FloatTensor(np.random.uniform(0, 2 * np.pi, size=(self.fc1.out_features,))))
        theta_qubit = torch.clamp(theta_qubit, min=0, max=2 * np.pi)
        self.hybrid = Hybrid(self.fc1.out_features, sim, 1000, self.theta_qubit)
        

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = self.drop1(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = self.drop1(x)
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.drop2(x)
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        x = self.hybrid(x)
        x = torch.cat((x, 1 - x), -1)
        return x

In [9]:
class Net(nn.Module):
    def __init__(self, theta_min = 0, theta_max = 2 * np.pi):
        super(Net, self).__init__()
        
        # Load the pre-trained ResNet50 model
        self.resnet = models.resnet50(weights=ResNet50_Weights.DEFAULT)
        
        # Modify the fully connected layer to have 84 outputs
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, 84)
    
        self.drop1 = nn.Dropout(p=0.2)  # Dropout layer to prevent overfitting
        self.fc1 = nn.Linear(84, 1)  # Output layer for one output
        
        # Initialize the quantum simulator backend
        sim = Aer.get_backend('qasm_simulator')
        self.theta_qubit = nn.Parameter(torch.FloatTensor(np.random.uniform(theta_min, theta_max, size=(self.fc1.out_features,))))

        self.hybrid = Hybrid(self.fc1.out_features, sim, 10000, self.theta_qubit)

    def forward(self, x):
        x = self.resnet(x)
        x = self.drop1(x) 
        #x = F.leaky_relu(self.fc1(x))
        x = F.tanh(self.fc1(x))
        x = self.hybrid(x)
    
        return x

**5** Dataset and dataloader

In [10]:
train_dir = '/mnt/c/Users/Win10/Desktop/researchData/data/png/train'
test_dir = '/mnt/c/Users/Win10/Desktop/researchData/data/png/test'

train_dataset = MammoDataset(root_dir=train_dir, data_augmentation=True)
test_dataset = MammoDataset(root_dir=test_dir, train=False)

batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

**6** Saving models

In [11]:
def create_output_directory():
    now = datetime.now()
    folder_name = now.strftime("%Y-%m-%d_%H-%M") 
    output_dir = os.path.join("checkpoints", folder_name)

    os.makedirs(output_dir, exist_ok=True)
    print(f"Created: {output_dir}")
    return output_dir

output_dir = create_output_directory()

def save_model(model, epoch, output_dir):
    model_path = os.path.join(output_dir, f"model_epoch_{epoch}.pth")
    torch.save(model.state_dict(), model_path)
    print(f"Saved model: {model_path}")

Created: checkpoints/2024-12-04_22-34


In [12]:
model = Net().to(device)

class_weights = torch.tensor([1, 2])
quantum_loss_func = nn.BCELoss(reduction='none').to(device)
optimizer = optim.SGD(model.parameters(), lr=1e-6) 
optimizer_quantum = optim.Adam([model.theta_qubit], lr=1e-2)
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, 2)

In [13]:
total_losses = {}
for theta in np.arange(np.pi / 12, 2 * np.pi, np.pi / 12):
    print(theta)
    
    model = Net(theta - np.pi /12, theta).to(device)
    
    model.train()
    for epoch in range(1):
        total_loss = []

        for batch_idx, (image, label) in enumerate(train_loader):
            
            image = image.to(device)
            label = label.double().unsqueeze(1).to(device)

            optimizer.zero_grad()
            optimizer_quantum.zero_grad()

            # Forward pass through the model
            prediction = model(image)

            # Calculate quantum loss
            loss_per_sample = quantum_loss_func(prediction, label)

            sample_weights = torch.where(label == 1, class_weights[1], class_weights[0])

            quantum_loss = (loss_per_sample * sample_weights).mean()
        
            # Backward pass and compute gradients
            quantum_loss.backward()

            # Clipping gradients
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            nn.utils.clip_grad_norm_([model.theta_qubit], max_norm=1.0)

            # Update classical weights
            optimizer.step()
            # Update quantum parameters
            optimizer_quantum.step()

            total_loss.append(quantum_loss.item())
            print(f"Batch {batch_idx + 1} completed with total loss: {quantum_loss.item():.4f}")

    # Average loss per epoch
    avg_loss = np.mean(total_loss)
    print(f"Epoch {epoch + 1}/{1}, Average Loss: {avg_loss:.4f}")
    
    total_losses[str(theta)] = avg_loss

print(total_losses)

min_theta = min(total_losses, key=total_losses.get)
min_result = total_losses[min_theta]


0.2617993877991494


KeyboardInterrupt: 

In [None]:
optimal_theta = min_theta  
model = Net(optimal_theta).to(device)

TypeError: must be real number, not str

**7** Training

In [None]:
model.train()
epochs = 10
for epoch in range(epochs):
    total_loss = []

    for batch_idx, (image, label) in enumerate(train_loader):
        
        image = image.to(device)
        label = label.double().unsqueeze(1).to(device)

        optimizer.zero_grad()
        optimizer_quantum.zero_grad()

        # Forward pass through the model
        prediction = model(image)

        # Calculate quantum loss
        loss_per_sample = quantum_loss_func(prediction, label)

        sample_weights = torch.where(label == 1, class_weights[1], class_weights[0])

        quantum_loss = (loss_per_sample * sample_weights).mean()
    
        # Backward pass and compute gradients
        quantum_loss.backward()

        # Clipping gradients
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        nn.utils.clip_grad_norm_([model.theta_qubit], max_norm=1.0)

        # Update classical weights
        optimizer.step()
        # Update quantum parameters
        optimizer_quantum.step()

        total_loss.append(quantum_loss.item())
        print(f"Batch {batch_idx + 1} completed with total loss: {quantum_loss.item():.4f}")

    # Average loss per epoch
    avg_loss = np.mean(total_loss)
    print(f"Epoch {epoch + 1}/{epochs}, Average Loss: {avg_loss:.4f}")
    
    save_model(model, epoch + 1, output_dir)

**8** Test

In [None]:
#model_path = ''
#model.load_state_dict(torch.load(model_path))

model.eval()
all_labels = [] 
all_predictions = []  

with torch.no_grad():  # Disable gradient calculation
    for batch_idx, (image, label) in enumerate(test_loader):
        label = label.unsqueeze(1) 
        image = image.to(device)
        label = label.to(device)
        
        try:
            prediction = model(image)
            predicted_class = prediction.argmax(dim=1)
            all_labels.extend(label.cpu().numpy())
            all_predictions.extend(predicted_class.cpu().numpy())
        except:
            continue

**9** Metrics

In [None]:
precision = precision_score(all_labels, all_predictions)
recall = recall_score(all_labels, all_predictions)
f1 = f1_score(all_labels, all_predictions)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

accuracy = accuracy_score(all_labels, all_predictions)
print(f"Accuracy: {accuracy:.4f}")

**10** Confusion matrix

In [None]:
# Generate and plot confusion matrix
conf_matrix = confusion_matrix(all_labels, all_predictions)

plt.figure(figsize=(6, 4))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['Negative', 'Positive'],
            yticklabels=['Negative', 'Positive'])
plt.xlabel('Predict')
plt.ylabel('Real')
plt.title('Confusion Matrix')
plt.show()