# Experiments for image encoding evaluation

## Setup

### Google Drive setup

In [20]:
import os
import subprocess
from pathlib import Path

GEQIE_PATH = Path(os.getcwd()).parent
WORK_DIR = Path(".")  # directory for logs and outputs, overridden in Colab scenario

DRIVE_MOUNT_PATH = Path("/content/drive")
_DRIVE_WORK_DIR = DRIVE_MOUNT_PATH / "MyDrive" / "Colab Notebooks" / "GEQIE"  # overrides WORK_DIR in Colab

CURRENT_PATH = Path(os.path.abspath("."))
IMAGES_PATH = CURRENT_PATH.parent / "experiments" / "generated_images"

try:
    from google.colab import drive

    print("Environment is Google Colab.")
    WORK_DIR = _DRIVE_WORK_DIR

    GEQIE_PATH = Path("/content/geqie")
    print(f"--- GEQIE path set to '{GEQIE_PATH}'")

    IMAGES_PATH = GEQIE_PATH / "experiments" / "generated_images"
    print(f"--- Images path set to '{IMAGES_PATH}'")

    print(f"--- Mounting Google Drive at '{DRIVE_MOUNT_PATH}'...")
    drive.mount(DRIVE_MOUNT_PATH.as_posix())

    print("Installing dependencies...")

    ! git clone https://github.com/merQlab/geqie.git $GEQIE_PATH 2>/dev/null
    ! uv pip install $GEQIE_PATH 2>/dev/null
    ! uv pip install -r $GEQIE_PATH/experiments/requirements/requirements.in 2>/dev/null

    # Check if GPU is available
    try:
        result = subprocess.run(['nvidia-smi'])
        print("--- NVIDIA GPU detected. Installing qiskit-aer-gpu")
        ! uv pip install qiskit-aer-gpu 2>/dev/null
    except Exception:
        pass

    print("Dependencies installed successfully. (restart may be required -> 'Restart runtime and run all')")
except ModuleNotFoundError as e:
    print("Environment is not Google Colab... skipping.")
except Exception as e:
    print("Error encountered: " + str(e))

Environment is not Google Colab... skipping.


### Imports

In [None]:
from dataclasses import dataclass
from datetime import datetime
from itertools import product
from typing import List

import numpy as np
import pandas as pd

from PIL import Image, ImageOps
from scipy.stats import pearsonr
from tqdm import tqdm

import qiskit
from qiskit_aer import AerSimulator
from qiskit_aer.noise import NoiseModel
from qiskit_aer.noise.errors import depolarizing_error

In [22]:
import geqie
from geqie.encodings import frqci, frqi, ifrqi, mcqi, ncqi, neqr, qrci, qualpi

### Constants

In [23]:
GRAYSCALE_METHODS = [
    frqi,
    ifrqi,
    neqr,
    qualpi,
]
GRAYSCALE_PATH = IMAGES_PATH / "grayscale"
GRAYSCALE_IMAGE_PATHS = sorted(GRAYSCALE_PATH.glob("*.png"))

RGB_METHODS = [
    frqci,
    mcqi,
    ncqi,
    qrci,
]
RGB_PATH = IMAGES_PATH / "rgb"
RGB_IMAGE_PATHS = sorted(RGB_PATH.glob("*.png"))

### Helpers

In [24]:
def append_row_to_csv(file_path: Path, row: dict) -> None:
    """Append a row to a CSV file, creating the file with headers if it doesn't exist."""
    file_exists = file_path.exists()
    df = pd.DataFrame([row])
    df.to_csv(file_path, mode='a', header=not file_exists, index=False)

In [25]:
def plot_images_side_by_side(original: np.ndarray, retrieved: np.ndarray) -> None:
    image_original = Image.fromarray(original)
    image_retrieved = Image.fromarray(retrieved)
    display(image_original.resize((image_original.width*30, image_original.height*30), resample=Image.NEAREST))
    display(image_retrieved.resize((image_retrieved.width*30, image_retrieved.height*30), resample=Image.NEAREST))

In [26]:
def PSNR(original: np.ndarray, retrieved: np.ndarray) -> float:
    mse = np.mean((original - retrieved) ** 2)
    if mse == 0:
        return float("inf")
    max_pixel = 255.0
    psnr = 20 * np.log10(max_pixel / np.sqrt(mse))
    return psnr

## Experiments

### Configuration

In [None]:
N_SHOTS = 1024

CSV_RESULTS_PATH = WORK_DIR / "results" 
CSV_RESULTS_PATH.mkdir(parents=True, exist_ok=True)

RESULTS_CSV_FILENAME_TEMPLATE = "{time_of_execution}_{colormode}.csv"

#### Noise configuration

In [None]:
NOISE_LEVELS = [0.01, 0.1, 0.2, 0.5, 0.9, 1.0]
NOISE_MODEL_NAME = "depolarizing_1q"

In [None]:
def add_local_noise_layer(circuit: qiskit.QuantumCircuit) -> None:
    circuit.id(range(circuit.num_qubits))

#### Transpilation

In [None]:
TRANSPILATION_BASIS = ["cx", "u"]

def get_cnot_count(circuit):
    transpiled_circuit = qiskit.transpile(circuit, basis_gates=TRANSPILATION_BASIS, optimization_level=0)
    return transpiled_circuit.count_ops().get("cx", 0)

#### Results definition

In [29]:
@dataclass
class ResultsRow:
    method_name: str
    image_name: str
    pcc_clean: float
    psnr_clean: float

    noise_model: str
    noise_levels: List[float]
    pcc_noised: List[float]
    psnr_noised: List[float]

    circuit_depth: int
    circuit_size: int
    cnot_count: int
    n_qubits: int
    n_shots: int

    def to_dict(self) -> dict:
        result = self.__dict__
        noise_levels = result.pop("noise_levels")
        pcc_noised = result.pop("pcc_noised")
        psnr_noised = result.pop("psnr_noised")

        for i, level in enumerate(noise_levels):
            result[f"pcc_noised_{level}"] = pcc_noised[i]
            result[f"psnr_noised_{level}"] = psnr_noised[i]

        return result

### Grayscale experiment

In [None]:
time_of_execution = datetime.now().strftime("%Y_%m_%d__%H_%M_%S")
GRAYSCALE_RESULTS_PATH = CSV_RESULTS_PATH / RESULTS_CSV_FILENAME_TEMPLATE.format(
    time_of_execution=time_of_execution,
    colormode="grayscale",
)

print(f"Saving results to '{GRAYSCALE_RESULTS_PATH}'")

for (method, image_path) in tqdm(list(product([GRAYSCALE_METHODS[2]], GRAYSCALE_IMAGE_PATHS[8:]))):
    try:
        method_name = method.__name__.split('.')[-1]
        image_name = image_path.stem

        image = Image.open(image_path)
        image = ImageOps.grayscale(image)
        image = np.asarray(image)

        device = "GPU" if "GPU" in AerSimulator().available_devices() else "CPU"
        circuit = geqie.encode(method.init_function, method.data_function, method.map_function, image)
        result = geqie.simulate(circuit, N_SHOTS, device=device, return_padded_counts=False)
        retrieved_image = method.retrieve_function(result)

        circuit_size = circuit.size()
        circuit_depth = circuit.depth()
        cnot_count=get_cnot_count(circuit)
        num_qubits = circuit.num_qubits

        pcc_clean_image, _ = pearsonr(image.flatten(), retrieved_image.flatten())
        psnr_clean_image = PSNR(image, retrieved_image)

        psnr_noised = []
        pcc_noised = []
        for noise_level in NOISE_LEVELS:
            noise_model = NoiseModel()
            depol_error = depolarizing_error(noise_level, 1)
            noise_model.add_all_qubit_quantum_error(depol_error, ["id"])

            circuit = geqie.encode(method.init_function, method.data_function, method.map_function, image)
            add_local_noise_layer(circuit)
            result_noised = geqie.simulate(
                circuit=circuit,
                n_shots=N_SHOTS,
                noise_model=noise_model,
                device=device,
                return_padded_counts=False,
            )
            retrieved_noised_image = method.retrieve_function(result_noised)
            del circuit, noise_model, result_noised

            pcc_noised_image, _ = pearsonr(image.flatten(), retrieved_noised_image.flatten())
            pcc_noised.append(pcc_noised_image)

            psnr_noised_image = PSNR(image, retrieved_noised_image)
            psnr_noised.append(psnr_noised_image)

        results_row = ResultsRow(
            method_name=method_name,
            image_name=image_name,
            pcc_clean=pcc_clean_image,
            psnr_clean=psnr_clean_image,

            noise_model=NOISE_MODEL_NAME,
            noise_levels=NOISE_LEVELS,
            pcc_noised=pcc_noised,
            psnr_noised=psnr_noised,

            cnot_count=cnot_count,
            circuit_size=circuit_size,
            circuit_depth=circuit_depth,
            n_qubits=num_qubits,
            n_shots=N_SHOTS,
        )

        append_row_to_csv(GRAYSCALE_RESULTS_PATH, results_row.to_dict())
    except MemoryError:
        print(f"MemoryError encountered. Skipping '{method}': '{image_path}'.")
        continue
print("Done.")
