In [None]:
import matplotlib.pyplot as plt
import numpy as np
import random
import torch
import glob
import os
import pandas as pd
from PIL import Image
from torchvision import transforms

class DatasetHandler:
    def __init__(self, dataset_path):
        self.dataset_path = dataset_path
        self.train_dir = os.path.join(dataset_path, "train")
        self.test_dir = os.path.join(dataset_path, "test")
        self.train_csv = os.path.join(dataset_path, "train.csv")
        self.test_csv = os.path.join(dataset_path, "test.csv")

    def load_paths_labels_from_csv(self, split="train", img_ext=".png"):
        """
        split: 'train' or 'test'
        """
        if split == "train":
            img_dir = self.train_dir
            csv_path = self.train_csv
        elif split == "test":
            img_dir = self.test_dir
            csv_path = self.test_csv
        else:
            raise ValueError("split must be 'train' or 'test'")

        df = pd.read_csv(csv_path)

        imgs_path = []
        imgs_label = []

        for _, row in df.iterrows():
            img_id = str(row.iloc[0])
            no_flood = row.iloc[1]
            flood = row.iloc[2]

            # Binary class encoding
            # 0 -> no flood, 1 -> flood
            label = 0 if no_flood == 1 else 1

            img_path = os.path.join(img_dir, img_id)

            if os.path.exists(img_path):
                imgs_path.append(img_path)
                imgs_label.append(label)
            else:
                print(f"Warning: image not found -> {img_path}")

        # Shuffle paths and labels together
        c = list(zip(imgs_path, imgs_label))
        random.shuffle(c)
        imgs_path, imgs_label = zip(*c)

        return np.array(imgs_path), np.array(imgs_label)

    def train_validation_split(self, images, labels, split_factor=0.2):
        val_size = int(len(images) * split_factor)
        train_size = len(images) - val_size

        return (
            images[:train_size],
            labels[:train_size],
            images[train_size:],
            labels[train_size:]
        )

    def cnn_data_loader(self, imgs_path, imgs_label,
                        batch_size=16,
                        img_shape=(64, 64, 3),
                        n_classes=2):

        batch_in = np.zeros((batch_size, *img_shape))
        batch_out = np.zeros((batch_size, n_classes))

        while True:
            for i in range(batch_size):
                index = random.randint(0, len(imgs_path) - 1)

#               batch_in[i] = plt.imread(imgs_path[index]) / 255.0

                # img = Image.open(imgs_path[index]).convert("RGB")
                # img = img.resize((img_shape[1], img_shape[0]))  # (64, 64)
                # img = np.array(img) / 255.0                     # (64, 64, 3)
                # batch_in[i] = np.transpose(img, (2, 0, 1))      # (3, 64, 64)

                # assert img.shape == (img_shape[0], img_shape[1], img_shape[2])

                img = plt.imread(imgs_path[index])

                # resize explicitly
                img = np.array(
                    Image.fromarray((img * 255).astype(np.uint8)).resize(
                        (img_shape[1], img_shape[0])
                    )
                ) / 255.0

                # channel-last → channel-first (explicit axes)
              #  batch_in[i] = np.transpose(img, (2, 0, 1))

                batch_in[i] = img


                label = imgs_label[index]
                one_hot = np.zeros(n_classes)
                one_hot[label] = 1
                batch_out[i] = one_hot

            yield batch_in, batch_out

    def qcnn_data_loader(self, imgs_path, imgs_label,
                         batch_size=1,
                         img_shape=(64, 64, 3),
                         returnpath=False):

        batch_in = np.zeros((batch_size, img_shape[2], img_shape[0], img_shape[1]))
        batch_out = np.zeros(batch_size)

        while True:
            for i in range(batch_size):
                index = random.randint(0, len(imgs_path) - 1)
                # batch_in[i] = np.transpose(
                #     plt.imread(imgs_path[index]) / 255.0
                # )

                img = plt.imread(imgs_path[index])

                # ensure RGB
                if img.ndim == 2:  # grayscale
                    img = np.stack([img]*3, axis=-1)

                # resize to expected spatial size
                img = np.array(
                    Image.fromarray((img * 255).astype(np.uint8))
                        .resize((img_shape[1], img_shape[0]))
                ) / 255.0

                # explicit channel-first
                batch_in[i] = np.transpose(img, (2, 0, 1))


                batch_out[i] = imgs_label[index]

            if returnpath:
                yield (
                    torch.Tensor(batch_in),
                    torch.Tensor(batch_out).long(),
                    imgs_path[index]
                )
            else:
                yield (
                    torch.Tensor(batch_in),
                    torch.Tensor(batch_out).long()
                )


In [None]:
!pip install 'qiskit>=1.0'

In [None]:
!pip install qiskit-aer

In [None]:
!pip install pylatexenc

In [None]:
!pip install matplotlib

In [None]:
from qiskit import QuantumRegister, ClassicalRegister, QuantumCircuit
from qiskit_aer import AerSimulator, Aer

In [None]:
from qiskit import QuantumRegister, ClassicalRegister, QuantumCircuit

In [None]:
from qiskit_aer import AerSimulator, Aer

In [None]:
import qiskit
from qiskit.circuit import Parameter,ControlledGate
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
%matplotlib inline
import torch
from torch.autograd import Function
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F

In [None]:
if torch.cuda.is_available():
  device = torch.device("cuda:0")
  print("Running on the GPU")
else:
  device = torch.device("cpu")
  print("Running on the CPU")

In [None]:
#np.random.seed = 314

NUM_QUBITS = 4
NUM_SHOTS = 800 #3000
SHIFT = np.pi/4
LEARNING_RATE = 0.0002
MOMENTUM = 0.5

SIMULATOR = Aer.get_backend('qasm_simulator')
#SIMULATOR = AerSimulator(method='automatic')

In [None]:
# create list of all possible outputs of quantum circuit (2**NUM_QUBITS possible)
import itertools
def create_QC_OUTPUTS():
    measurements = list(itertools.product([0, 1], repeat=NUM_QUBITS))
    return [''.join([str(bit) for bit in measurement]) for measurement in measurements]

QC_OUTPUTS = create_QC_OUTPUTS()
print(QC_OUTPUTS)

In [None]:
class QiskitCircuit():


    def __init__(self, n_qubits, backend, shots):
        # --- Circuit definition ---
        n_qubits = 4
        self.circuit = qiskit.QuantumCircuit(n_qubits)
        self.n_qubits = n_qubits
        self.thetas = [Parameter(f"theta_{i}") for i in range(4)]
        self.circuit.h(0)
        self.circuit.cx(0, 1)
        self.circuit.cx(1, 2)
        self.circuit.cx(3, 2)

        self.circuit.barrier()

        for k in range(0, 4):
            self.circuit.ry(self.thetas[k], k)

        self.circuit.barrier()

        self.circuit.cx(1, 0)
        self.circuit.cx(2, 1)
        self.circuit.cx(1, 0)

        self.circuit.measure_all()
        # ---------------------------

        self.backend = backend
        self.shots = shots


    def N_qubit_expectation_Z(self,counts, shots, nr_qubits):
        expects = np.zeros(len(QC_OUTPUTS))
        for k in range(len(QC_OUTPUTS)):
            key = QC_OUTPUTS[k]
            perc = counts.get(key, 0)/shots
            expects[k] = perc
        return expects


    def run(self, params_tensor):
        # params_tensor: torch.Tensor of shape (NUM_QUBITS,)

        # 1. Convert to Python floats
        param_values = params_tensor.detach().cpu().numpy().tolist()

        # 2. Build a TRUE Parameter → value dict
        param_bind = {
            param: val
            for param, val in zip(self.circuit.parameters, param_values)
        }

        # 3. Bind parameters to circuit
        bound_circuit = self.circuit.assign_parameters(param_bind)
        
        # 4. Run the bound circuit
        job_sim = SIMULATOR.run(bound_circuit, shots=self.shots)

        result_sim = job_sim.result()
        counts = result_sim.get_counts(bound_circuit)

        return self.N_qubit_expectation_Z(
            counts, self.shots, self.n_qubits
        )

In [None]:
circuit = QiskitCircuit(NUM_QUBITS, SIMULATOR, NUM_SHOTS)
#print('Expected value for rotation [pi/4]: {}'.format(circuit.run(torch.Tensor([np.pi/4]*NUM_QUBITS))))
circuit.circuit.draw(output='mpl')#, filename='Figures/{}-qubit circuit ryN.jpg'.format(NUM_QUBITS)

In [None]:
class TorchCircuit(Function):

    @staticmethod
    def forward(ctx, i):
        if not hasattr(ctx, 'QiskitCirc'):
            ctx.QiskitCirc = QiskitCircuit(NUM_QUBITS, SIMULATOR, shots=NUM_SHOTS)

        exp_value = ctx.QiskitCirc.run(i)

        result = torch.tensor([exp_value])

        ctx.save_for_backward(result, i)

        return result

    @staticmethod
    def backward(ctx, grad_output):

        forward_tensor, i = ctx.saved_tensors
#         print('forward_tensor = {}'.format(forward_tensor))
        input_numbers = i
#         print('input_numbers = {}'.format(input_numbers))
        gradients = torch.Tensor()

        for k in range(1*NUM_QUBITS):
            shift_right = input_numbers.detach().clone()
            shift_right[k] = shift_right[k] + SHIFT
            shift_left = input_numbers.detach().clone()
            shift_left[k] = shift_left[k] - SHIFT

#             print('shift_right = {}, shift_left = {}'.format(shift_right, shift_left))

            expectation_right = ctx.QiskitCirc.run(shift_right)
            expectation_left  = ctx.QiskitCirc.run(shift_left)
#             print('expectation_right = {}, \nexpectation_left = {}'.format(expectation_right, expectation_left))

            gradient = torch.tensor([expectation_right]) - torch.tensor([expectation_left])
            # rescale gradient
#             gradient = gradient / torch.norm(gradient)
#             print('gradient for k={}: {}'.format(k, gradient))
            gradients = torch.cat((gradients, gradient.float()))

        result = torch.Tensor(gradients)
#         print('gradients = {}'.format(result))
#         print('grad_output = {}'.format(grad_output))

        return (result.float() * grad_output.float()).T


In [None]:
dataset_root = '/Users/shaheer/NUST/QCHack/archive (2)'
handler = DatasetHandler(dataset_root)

In [None]:
classes = ['flood','no flood']

In [None]:
handler = DatasetHandler(dataset_root)

imgs_path, imgs_label = handler.load_paths_labels_from_csv(split="train")
print('Dataset images:', len(imgs_path))
print('Dataset labels:', len(imgs_label))
print('Dataset sample ->', imgs_path[0], imgs_label[0])

In [None]:
train_imgs, train_labels, val_imgs, val_labels = handler.train_validation_split(
    imgs_path, imgs_label, split_factor=0.2
)

print('X_train shape:', train_imgs.shape, 'Y_train shape:', train_labels.shape)
print('  X_val shape:', val_imgs.shape, '  Y_val shape:', val_labels.shape)


In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3)

        self.fc4 = nn.Linear(2304, 1*NUM_QUBITS)

        self.qc = QiskitCircuit(NUM_QUBITS, SIMULATOR, shots=NUM_SHOTS)

        self.fc5 = nn.Linear(16, 2)

    def forward(self, x):
        # CNN feature extraction
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2(x), 2))
        x = F.relu(F.max_pool2d(self.conv3(x), 2))

        x = x.view(-1, 2304)
        
        # Reduce to quantum circuit parameters
        x = self.fc4(x)
        x = np.pi*torch.tanh(x)

        # Run through quantum circuit
        x_for_qc = x[0, :len(self.qc.circuit.parameters)]
        x = self.qc.run(x_for_qc)
        
        # Convert to tensor and add batch dimension
        x = torch.tensor([x], dtype=torch.float32)

        # Final classification layer
        x = self.fc5(x)
        x = F.softmax(x, dim=1)

        return x


    def predict(self, x):
        # apply softmax
        pred = self.forward(x)
        ans = torch.argmax(pred[0]).item()
        return torch.tensor(ans)

network = Net()#.to(device)
optimizer = optim.Adam(network.parameters(), lr=0.0002)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

#from torchsummary import summary
#summary(network, (3, 64, 64))

In [None]:
train_loader = iter(handler.qcnn_data_loader(train_imgs, train_labels, batch_size = 1, img_shape = (64,64,3)))
test_loader = iter(handler.qcnn_data_loader(val_imgs, val_labels, batch_size = 1, img_shape = (64,64,3)))

In [None]:
# Uncomment to load a pre-trained model
# checkpoint = torch.load('/Users/shaheer/NUST/QCHack/model-bell-2.pt')
# network.load_state_dict(checkpoint['model_state_dict'])
# optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
# epoch = checkpoint['epoch']
# loss = checkpoint['loss']

In [None]:
train_loss_list = []
val_loss_list = []
epochs = 50

loss_func = nn.CrossEntropyLoss()

for epoch in range(epochs):
  train_loader = iter(handler.qcnn_data_loader(train_imgs, train_labels, batch_size = 1, img_shape = (64,64,3)))
  test_loader = iter(handler.qcnn_data_loader(val_imgs, val_labels, batch_size = 1, img_shape = (64,64,3)))
  total_loss = []


  for batch_idx in range(len(train_labels)):
    data, target = next(train_loader)
    # print(batch_idx)
    optimizer.zero_grad()
    # Forward pass
    output = network(data)
    # Calculating loss
    loss = loss_func(output, target)
    # Backward pass
    loss.backward()
    # Optimize the weights
    optimizer.step()

    total_loss.append(loss.item())

    print('\r Epoch %d ~ Batch %d (%d) ~ Loss %f ' % (epoch, batch_idx, len(train_imgs)-1, loss.item()), end='\t\t')

  with torch.no_grad():
    val_loss = []
    targets = []
    predictions = []
    for batch_idx in range(len(val_imgs)):
      data, target = next(test_loader)
      output = network(data)
      loss = loss_func(output, target)
      val_loss.append(loss.item())

      targets.append(target.item())

      predictions.append(network.predict(data).item())


  train_loss_list.append(sum(total_loss)/len(total_loss))
  val_loss_list.append(sum(val_loss)/len(val_loss))

  print('Training [{:.0f}%]\t Training Loss: {:.4f} Validation Loss: {:.4f}'.format(
      100. * (epoch + 1) / epochs, train_loss_list[-1], val_loss_list[-1]))

  if epoch % 3 == 1:
    print(confusion_matrix(targets, predictions,normalize='true'))
    print(classification_report(targets, predictions, target_names=classes, digits=4))
    torch.save({
            'epoch': epoch,
            'model_state_dict': network.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': train_loss_list[-1],
            }, '/Users/shaheer/NUST/QCHack/model-bell-2.pt')
    #torch.save(network.state_dict(), '/Users/shaheer/NUST/QCHack/model-bell.pt')

In [None]:
plt.plot(train_loss_list)
plt.plot(val_loss_list)
plt.title('Hybrid NN Training Convergence for {}-qubit'.format(NUM_QUBITS))
plt.xlabel('Training Iterations')
plt.ylabel('Cross Entropy Loss')
plt.legend(['Training', 'Validation'])
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit

def func(x, a, b, c):
  return a * np.exp(-b * x) + c

x = np.linspace(0,len(train_loss_list),len(train_loss_list))
y = func(x, 2.5, 1.3, 0.5)
yn = np.array(val_loss_list)

popt, pcov = curve_fit(func, x, yn)

plt.figure()
x1 = np.linspace(0,100+len(train_loss_list),100+len(train_loss_list))
plt.plot(x, yn, 'ko', label="Loss")
plt.plot(x1, func(x1, *popt), 'r-', label="Fitted Curve")
plt.legend()
plt.show()

In [None]:
test_loader = iter(handler.qcnn_data_loader(val_imgs, val_labels, batch_size = 1, img_shape = (64,64,3)))
accuracy = 0
number = 0

predictions = []
targets = []

for ct in range(len(val_imgs)):

  data, target = next(test_loader)
  number +=1
  output = network.predict(data).item()

  predictions.append(output)
  targets.append(target.item())

  accuracy += (output == target[0].item())*1
  print('\r ' + str(ct), end='')

In [None]:
plt.hist(targets, bins = 10)

In [None]:
print("Performance on test data is : {}/{} = {}%".format(accuracy,number,100*accuracy/number))

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
cm = confusion_matrix(targets, predictions,normalize='true')

In [None]:
fig, axes = plt.subplots(nrows = 1, ncols = 1, figsize = (12,10))

cmd = ConfusionMatrixDisplay(cm, display_labels=classes)
cmd.plot(ax=axes, cmap='Blues', xticks_rotation='vertical')
print('S2')
print('Accuracy:', cm.diagonal(), 'mean: ', cm.diagonal().mean())
print(classification_report(targets, predictions, target_names=classes, digits=4))
axes.get_images()[0].set_clim(0, 1)
plt.show()
plt.close()