In [1]:
import pandas as pd
import numpy as np
import torch
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from matplotlib import pyplot as plt
import os
from torch.utils.data import Subset
import math
from torch.functional import F
import pennylane as qml

# Hyperparameters
apply_augmentation = True
batch_size = 8
load_saved_model = False
training_epoch = 150
learning_rate = 0.0008
momentum = 0.9
weight_decay = 0.000005
device = "cuda"

In [2]:
fraction_size = 1

root_folder = "AIDER"
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 256x256
    transforms.ToTensor(),          # Convert images to Tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize with ImageNet stats
])
dataset = datasets.ImageFolder(root=root_folder, transform=transform)

# Split train and test and put in dataloader
full_dataset_size = len(dataset)
subset_size = int(fraction_size * full_dataset_size)
subset_indices = torch.randperm(full_dataset_size)[:subset_size].tolist()
subset = Subset(dataset, subset_indices)

train_size = int(0.6 * len(subset))
test_size = len(subset) - train_size

train_dataset, test_dataset = torch.utils.data.random_split(subset, [train_size, test_size])


In [3]:
if apply_augmentation:
    # Geometry augmentation
    transform1 = transforms.Compose([
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(80),
        transforms.Resize((224, 224)),  # Resize images to 256x256
        transforms.ToTensor(),          # Convert images to Tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize with ImageNet stats
    ])
    augmented1 = datasets.ImageFolder(root=root_folder, transform=transform1)
    
    # Color augmentation
    transform2 = transforms.Compose([
        transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.5),
        transforms.Resize((224, 224)),  # Resize images to 256x256
        transforms.ToTensor(),          # Convert images to Tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize with ImageNet stats
    ])
    augmented2 = datasets.ImageFolder(root=root_folder, transform=transform2)
    
    # Noise augmentation
    transform3 = transforms.Compose([
        transforms.GaussianBlur(3),
        transforms.Resize((224, 224)),  # Resize images to 256x256
        transforms.ToTensor(),          # Convert images to Tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize with ImageNet stats
    ])
    augmented3 = datasets.ImageFolder(root=root_folder, transform=transform3)


train_dataset = torch.utils.data.ConcatDataset([train_dataset, augmented1, augmented2, augmented3])
dataloader_train = DataLoader(train_dataset, batch_size=32, shuffle=True)
dataloader_test = DataLoader(test_dataset, batch_size=32, shuffle=False)

print("Train size: ", len(train_dataset), ", Test size: ", len(test_dataset))

Train size:  23158 , Test size:  2574


In [4]:
# QUANTUM BLOCK
n_qubits = 4
dev = qml.device("default.qubit", wires=n_qubits)
n_layers = 4
weight_shapes = {"weights": (n_layers, n_qubits)}
dev_quantum = torch.device(device)

# Define the quantum node
@qml.qnode(dev)
def qnode(inputs, weights):
    qml.AmplitudeEmbedding(inputs, wires=range(n_qubits), normalize=True, pad_with=0.0)
    qml.BasicEntanglerLayers(weights, wires=range(n_qubits))
    return [qml.expval(qml.PauliZ(wires=i)) for i in range(n_qubits)]

expanded_circuit = qml.transforms.broadcast_expand(qnode)
class QNet(torch.nn.Module):
    def __init__(self, n_embd):
        super().__init__()
        self.qlayer = qml.qnn.TorchLayer(expanded_circuit, weight_shapes)

    def forward(self, x):
        
        x = self.qlayer(x.to('cpu')).to(dev_quantum)  # Ensure compatibility with quantum layer

        return x

In [9]:
# from architectures.vit import ViT
from vit_pytorch import ViT
from torch import optim
from torch import nn
from torchvision import models

#self.features = models.resnet34(pretrained =True)

class QCNN(nn.Module): 
    def __init__(self) -> None:
        super(QCNN, self).__init__()
        # self.features = models.resnet34(pretrained =True)
        self.resnet = models.resnet50(pretrained =True)
        self.seq_resnet = nn.Sequential(
            nn.Linear(1000, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.1),
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.1),
            nn.Linear(256, 64),
            nn.ReLU(inplace=True),
            nn.Dropout(0.1),
            nn.Linear(64, 16),
            # nn.ReLU(inplace=True),
            # nn.Dropout(0.1),
            # nn.Linear(16, 5)
        )
        self.seq_to_qnet = nn.Sequential(
            nn.ReLU(inplace=True),
            nn.Linear(16, 4)
        )
        self.vit = ViT(
            image_size = 256,
            patch_size = 32,
            num_classes = 2,
            dim = 1024,
            depth = 10,
            heads = 16,
            mlp_dim = 2048,
            dropout = 0.1,
            emb_dropout = 0.1
        )
        self.qnet = QNet(4) # From 2 to 16
        # Do interpolation to 16 features
        self.seq_after_qnet = nn.Sequential(
            nn.ReLU(inplace=True),
            nn.Linear(4, 16)
        )
        
        self.seq = nn.Sequential(
            nn.ReLU(inplace=True),
            nn.Linear(16, 5)
        ) # From 16 to 5
        
        

    def forward(self, x):
        x = self.resnet(x) 
        x1 = self.seq_resnet(x) # 16 features
        x2 = self.seq_to_qnet(x1)
        x2 = self.qnet(x2) 
        x2 = self.seq_after_qnet(x2) #16 features
        fusion = x1 + x2
        output = self.seq(fusion)
        return output


# model = ViT(
#     image_size = 256,
#     patch_size = 16,
#     num_classes = 5,
#     dim = 1024,
#     depth = 10,
#     heads = 16,
#     mlp_dim = 2048,
#     dropout = 0.1,
#     emb_dropout = 0.1
# ).to(device)

model = QCNN().to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
example = next(iter(dataloader_train))
print(model(example[0].to(device)).shape)

RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x32 and 16x5)

In [6]:
from tqdm import tqdm
from sklearn.metrics import accuracy_score, roc_auc_score, roc_curve, auc
import torch.nn.functional as F
from sklearn.metrics import confusion_matrix
import seaborn as sns

class Engine(object):
    def __init__(self, model, optimizer, device, ema=None):
        # Initialize the Engine with the model, optimizer, and the device it's running on.
        self.model = model
        self.optimizer = optimizer
        self.device = device
        # Current epoch of training.
        self.cur_epoch = 0
        # Number of iterations the training has run.
        self.cur_iter = 0
        # The best validation epoch, used to track the epoch with the best validation performance.
        self.bestval_epoch = 0
        # Lists to track the training and validation losses.
        self.train_loss = []
        self.val_loss = []
        # Criterion for calculating loss. Here, it's Mean Squared Error Loss for regression tasks.
        self.criterion = torch.nn.CrossEntropyLoss()

    """ Block to begin training """
    def train(self, dataloader_train):
        loss_epoch = 0.
        num_batches = 0
        # Set the model to training mode.
        self.model.train()
        
        # Train loop
        # tqdm is used to display the training progress for each epoch.
        pbar = tqdm(dataloader_train, desc='Train Epoch {}'.format(self.cur_epoch))
        for data in pbar:
            # efficiently zero gradients
            # Zero the gradients before running the backward pass.
            self.optimizer.zero_grad(set_to_none=True)
            images = data[0].to(self.device, dtype=torch.float32)   # Image that will be fed into network
            gt_label = data[1].to(self.device, dtype=torch.long)

            # Pass the images through the model to get predictions.
            pred_label = self.model(images)
            
            # Calculate the loss, backpropagation, and optimization
            loss = self.criterion(pred_label, gt_label)
            loss.backward()
            
            # Perform a single optimization step (parameter update).
            self.optimizer.step()

            # Aggregate the loss for the epoch
            loss_epoch += float(loss.item())
            num_batches += 1
            pbar.set_description("Loss: {:.4f}".format(loss.item()))
            
        pbar.close()
        avg_loss = loss_epoch / num_batches
        self.train_loss.append(avg_loss)
        
        self.cur_epoch += 1
        pbar.set_description("Epoch: {}, Average Loss: {:.4f}".format(self.cur_epoch, avg_loss))

    def test(self, dataloader_test):
        # self.model.eval()  # Set the model to evaluation mode
        loss_epoch = 0.
        num_batches = 0
        
        # Prepare to collect predictions and ground truth
        predictions = []
        ground_truths = []
        
        with torch.no_grad():  # No need to calculate gradients
            pbar = tqdm(dataloader_test, desc='Test Epoch {}'.format(self.cur_epoch))
            for data in pbar:
                images = data[0].to(self.device, dtype=torch.float32)   # Image that will be fed into network
                gt_label = data[1].to(self.device, dtype=torch.long)  # GT_label

                # Pass the images through the model to get predictions.
                pred_label = self.model(images)

                # Calculate the loss, backpropagation, and optimization
                loss = self.criterion(pred_label, gt_label)
                loss_epoch += float(loss.item())
                num_batches += 1
                
                # We want to put this back on the CPU to calculate the metrics
                predictions.extend(pred_label.argmax(dim=1).cpu().numpy().flatten())
                ground_truths.extend(gt_label.cpu().numpy().flatten())
                pbar.set_description("Test Loss: {:.4f}".format(loss.item()))

        avg_loss = loss_epoch / num_batches
        self.val_loss.append(avg_loss)

        # Print the accuracy here
        accuracy = accuracy_score(ground_truths, predictions)
        
        # Generate the confusion matrix
        cm = confusion_matrix(ground_truths, predictions)
        
        # Plot the heatmap using Seaborn
        plt.figure(figsize=(4,4))
        sns.heatmap(cm, annot=True, fmt="d", cmap='Blues')
        plt.title(f'Confusion Matrix for Epoch {self.cur_epoch}')
        plt.ylabel('Actual Labels')
        plt.xlabel('Predicted Labels')
        plt.show()
        
        
        print(f"Test Epoch: {self.cur_epoch}, Average Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")
        
        return avg_loss


In [7]:
trainer = Engine(model, optimizer, device, ema=None)

# Load the saved model if load_saved_model is set to True
if load_saved_model:
	model.load_state_dict(torch.load('logs/final_model.pth'))
 
# Count the total number of trainable parameters
model_parameters = filter(lambda p: p.requires_grad, model.parameters())
params = sum([np.prod(p.size()) for p in model_parameters])
print ('======Total trainable parameters: ', params)

for epoch in range(trainer.cur_epoch, training_epoch):
	trainer.train(dataloader_train)

	# Test the model every 20 epochs and save it to logs folder
	if (epoch) % 10 == 0:
		trainer.test(dataloader_test)
		torch.save(model.state_dict(), os.path.join('logs', 'final_model.pth'))




Train Epoch 0:   0%|          | 0/724 [00:00<?, ?it/s]


RuntimeError: size mismatch (got input: [5], target: [32])