In [None]:
from __future__ import annotations

try:
    import cudaq
    import cudaq_solvers as solvers
    import matplotlib.pyplot as plt
except ImportError:
    print("Installing required packages...")
    %pip install --quiet 'cudaq-solvers' 'matplotlib'
    print("Installed `cudaq`, `cudaq-solvers`, and `matplotlib` packages.")
    print("You may need to restart the kernel to import newly installed packages.")
    import cudaq
    import cudaq_solvers as solvers
    import matplotlib.pyplot as plt

import os
from collections.abc import Mapping, Sequence

import numpy as np
from scipy.optimize import minimize

In [2]:
cudaq.reset_target()

if cudaq.num_available_gpus() == 0:
    cudaq.set_target("qpp-cpu", option="fp64")
else:
    cudaq.set_target("nvidia", option="fp64")

In [3]:
def ansatz(n_qubits: int) -> cudaq.Kernel:
    # Create a CUDA-Q parameterized kernel
    paramterized_ansatz, variational_angles = cudaq.make_kernel(list)
    qubits = paramterized_ansatz.qalloc(n_qubits)

    # Using |+> as the initial state:
    paramterized_ansatz.h(qubits[0])
    paramterized_ansatz.cx(qubits[0], qubits[1])

    paramterized_ansatz.rx(variational_angles[0], qubits[0])
    paramterized_ansatz.cx(qubits[0], qubits[1])
    paramterized_ansatz.rz(variational_angles[1], qubits[1])
    paramterized_ansatz.cx(qubits[0], qubits[1])
    return paramterized_ansatz


def run_logical_vqe(cudaq_hamiltonian: cudaq.SpinOperator) -> tuple[float, list[float]]:
    # Set seed for easier reproduction
    rng = np.random.default_rng(42)

    # Initial angles for the optimizer
    init_angles = rng.random(2) * 1e-1

    # Obtain CUDA-Q Ansatz
    num_qubits = cudaq_hamiltonian.get_qubit_count()
    variational_kernel = ansatz(num_qubits)

    # Perform VQE optimization
    energy, params, _ = solvers.vqe(
        variational_kernel,
        cudaq_hamiltonian,
        init_angles,
        optimizer=minimize,
        method="SLSQP",
        tol=1e-10,
    )
    return energy, params

In [4]:
cudaq.register_operation("meas_id", np.identity(2))

In [5]:
def aim_physical_circuit(
    angles: list[float], basis: str, *, ignore_meas_id: bool = False
) -> cudaq.Kernel:
    kernel = cudaq.make_kernel()
    qubits = kernel.qalloc(2)

    # Bell state prep
    kernel.h(qubits[0])
    kernel.cx(qubits[0], qubits[1])

    # Rx Gate
    kernel.rx(angles[0], qubits[0])

    # ZZ rotation
    kernel.cx(qubits[0], qubits[1])
    kernel.rz(angles[1], qubits[1])
    kernel.cx(qubits[0], qubits[1])

    if basis == "z_basis":
        if not ignore_meas_id:
            kernel.for_loop(
                start=0,
                stop=2,
                function=lambda q_idx: getattr(kernel, "meas_id")(qubits[q_idx]),  # noqa: B009
            )
        kernel.mz(qubits)
    elif basis == "x_basis":
        kernel.h(qubits)
        if not ignore_meas_id:
            kernel.for_loop(
                start=0,
                stop=2,
                function=lambda q_idx: getattr(kernel, "meas_id")(qubits[q_idx]),  # noqa: B009
            )
        kernel.mz(qubits)
    else:
        raise ValueError("Unsupported basis provided:", basis)
    return kernel

In [6]:
def aim_logical_circuit(
    angles: list[float], basis: str, *, ignore_meas_id: bool = False
) -> cudaq.Kernel:
    kernel = cudaq.make_kernel()
    qubits = kernel.qalloc(6)

    kernel.for_loop(start=0, stop=3, function=lambda idx: kernel.h(qubits[idx]))
    kernel.cx(qubits[1], qubits[4])
    kernel.cx(qubits[2], qubits[3])
    kernel.cx(qubits[0], qubits[1])
    kernel.cx(qubits[0], qubits[3])

    # Rx teleportation
    kernel.rx(angles[0], qubits[0])

    kernel.cx(qubits[0], qubits[1])
    kernel.cx(qubits[0], qubits[3])
    kernel.h(qubits[0])

    if basis == "z_basis":
        if not ignore_meas_id:
            kernel.for_loop(
                start=0,
                stop=5,
                function=lambda idx: getattr(kernel, "meas_id")(qubits[idx]),  # noqa: B009
            )
        kernel.mz(qubits)
    elif basis == "x_basis":
        # ZZ rotation and teleportation
        kernel.cx(qubits[3], qubits[5])
        kernel.cx(qubits[2], qubits[5])
        kernel.rz(angles[1], qubits[5])
        kernel.cx(qubits[1], qubits[5])
        kernel.cx(qubits[4], qubits[5])
        kernel.for_loop(start=1, stop=5, function=lambda idx: kernel.h(qubits[idx]))
        if not ignore_meas_id:
            kernel.for_loop(
                start=0,
                stop=6,
                function=lambda idx: getattr(kernel, "meas_id")(qubits[idx]),  # noqa: B009
            )
        kernel.mz(qubits)
    else:
        raise ValueError("Unsupported basis provided:", basis)

    print(cudaq.draw(kernel))
    return kernel

In [7]:
def generate_circuit_set(ignore_meas_id: bool = False) -> object:
    u_vals = [1, 5, 9]
    v_vals = [-9, -1, 7]
    circuit_dict = {}
    for u in u_vals:
        for v in v_vals:
            qubit_hamiltonian = (
                0.25 * u * cudaq.spin.z(0) * cudaq.spin.z(1)
                - 0.25 * u
                + v * cudaq.spin.x(0)
                + v * cudaq.spin.x(1)
            )
            angles = [float(angle) for angle in opt_params]
            print(f"Computed optimal angles={angles} for U={u}, V={v}")

            tmp_physical_dict = {}
            tmp_logical_dict = {}
            for basis in ("z_basis", "x_basis"):
                tmp_physical_dict[basis] = aim_physical_circuit(
                    angles, basis, ignore_meas_id=ignore_meas_id
                )
                tmp_logical_dict[basis] = aim_logical_circuit(
                    angles, basis, ignore_meas_id=ignore_meas_id
                )

            circuit_dict[f"{u}:{v}"] = {
                "physical": tmp_physical_dict,
                "logical": tmp_logical_dict,
            }
    print("\nFinished building optimized circuits!")
    return circuit_dict

In [1]:
sim_circuit_dict = generate_circuit_set()
circuit_layers = sim_circuit_dict.keys()

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



NameError: name 'opt_params' is not defined

During handling of the above exception, another exception occurred:

AttributeError: 'NameError' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

AssertionError
NameError: name 'opt_params' is not defined

During handling of the above exception, another exception occurred:

AttributeError: 'NameError' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

TypeError: object of type 'NoneType' has no len()

During handling of the above exception, another exception occurred:

AttributeError: 'TypeError' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

AssertionError
NameError: name 'opt_params' is not defined

During handling of the above exception, another exception occurred:

AttributeError: 'NameError' object has no attribute '_render_traceback_'

Durin

In [None]:
def _num_qubits(counts: Mapping[str, float]) -> int:
    for key in counts:
        if key.isdecimal():
            return len(key)
    return 0


def process_counts(
    counts: Mapping[str, float],
    data_qubits: Sequence[int],
    flag_qubits: Sequence[int] = (),
) -> dict[str, float]:
    new_data: dict[str, float] = {}
    for key, val in counts.items():
        if not all(key[i] == "0" for i in flag_qubits):
            continue

        new_key = "".join(key[i] for i in data_qubits)

        if not set("01").issuperset(new_key):
            continue

        new_data.setdefault(new_key, 0)
        new_data[new_key] += val

    return new_data


def decode(counts: Mapping[str, float]) -> dict[str, float]:
    """Decode physical counts into logical counts. Should be called after `process_counts`."""
    if not counts:
        return {}

    num_qubits = _num_qubits(counts)
    assert num_qubits % 4 == 0

    physical_to_logical = {
        "0000": "00",
        "1111": "00",
        "0011": "01",
        "1100": "01",
        "0101": "10",
        "1010": "10",
        "0110": "11",
        "1001": "11",
    }

    new_data: dict[str, float] = {}
    for key, val in counts.items():
        physical_keys = [key[i : i + 4] for i in range(0, num_qubits, 4)]
        logical_keys = [physical_to_logical.get(physical_key) for physical_key in physical_keys]
        if None not in logical_keys:
            new_key = "".join(logical_keys)
            new_data.setdefault(new_key, 0)
            new_data[new_key] += val

    return new_data


def ev_x(counts: Mapping[str, float]) -> float:
    ev = 0.0

    for k, val in counts.items():
        ev += val * ((-1) ** int(k[0]) + (-1) ** int(k[1]))

    total = sum(counts.values())
    ev /= total
    return ev


def ev_xx(counts: Mapping[str, float]) -> float:
    ev = 0.0

    for k, val in counts.items():
        ev += val * (-1) ** k.count("1")

    total = sum(counts.values())
    ev /= total
    return ev


def ev_zz(counts: Mapping[str, float]) -> float:
    ev = 0.0

    for k, val in counts.items():
        ev += val * (-1) ** k.count("1")

    total = sum(counts.values())
    ev /= total
    return ev


def aim_logical_energies(
    data_ordering: object, counts_list: Sequence[dict[str, float]]
) -> tuple[dict[tuple[int, int], float], dict[tuple[int, int], float]]:
    counts_data = {
        data_ordering[i]: decode(
            process_counts(
                counts,
                data_qubits=[1, 2, 3, 4],
                flag_qubits=[0, 5],
            )
        )
        for i, counts in enumerate(counts_list)
    }
    return _aim_energies(counts_data)


def aim_physical_energies(
    data_ordering: object, counts_list: Sequence[dict[str, float]]
) -> tuple[dict[tuple[int, int], float], dict[tuple[int, int], float]]:
    counts_data = {
        data_ordering[i]: process_counts(
            counts,
            data_qubits=[0, 1],
        )
        for i, counts in enumerate(counts_list)
    }
    return _aim_energies(counts_data)


def _aim_energies(
    counts_data: Mapping[tuple[int, int, str], dict[str, float]],
) -> tuple[dict[tuple[int, int], float], dict[tuple[int, int], float]]:
    evxs: dict[tuple[int, int], float] = {}
    evxxs: dict[tuple[int, int], float] = {}
    evzzs: dict[tuple[int, int], float] = {}
    totals: dict[tuple[int, int], float] = {}

    for key, counts in counts_data.items():
        h_params, basis = key
        key_a, key_b = h_params.split(":")
        u, v = int(key_a), int(key_b)
        if basis.startswith("x"):
            evxs[u, v] = ev_x(counts)
            evxxs[u, v] = ev_xx(counts)
        else:
            evzzs[u, v] = ev_zz(counts)

        totals.setdefault((u, v), 0)
        totals[u, v] += sum(counts.values())

    energies = {}
    uncertainties = {}
    for u, v in evxs.keys() & evzzs.keys():
        string_key = f"{u}:{v}"
        energies[string_key] = u * (evzzs[u, v] - 1) / 4 + v * evxs[u, v]

        uncertainty_xx = 2 * v**2 * (1 + evxxs[u, v]) - u * v * evxs[u, v] / 2
        uncertainty_zz = u**2 * (1 - evzzs[u, v]) / 2

        uncertainties[string_key] = np.sqrt(
            (uncertainty_zz + uncertainty_xx - energies[string_key] ** 2) / (totals[u, v] / 2)
        )

    return energies, uncertainties


def _get_energy_diff(
    bf_energies: dict[str, float],
    physical_energies: dict[str, float],
    logical_energies: dict[str, float],
) -> tuple[list[float], list[float]]:
    physical_energy_diff = []
    logical_energy_diff = []

    # Data ordering following `bf_energies` keys
    for layer in bf_energies.keys():
        physical_sim_energy = physical_energies[layer]
        logical_sim_energy = logical_energies[layer]
        true_energy = bf_energies[layer]
        u, v = layer.split(":")
        print(f"Layer=({u}, {v}) has brute-force energy of: {true_energy}")
        print(f"Physical circuit of layer=({u}, {v}) got an energy of: {physical_sim_energy}")
        print(f"Logical circuit of layer=({u}, {v}) got an energy of: {logical_sim_energy}")
        print("-" * 72)

        if logical_sim_energy < physical_sim_energy:
            print("Logical circuit achieved the lower energy!")
        else:
            print("Physical circuit achieved the lower energy")
        print("-" * 72, "\n")

        physical_energy_diff.append(
            -1 * (true_energy - physical_sim_energy)
        )  # Multiply by -1 since negative energies
        logical_energy_diff.append(-1 * (true_energy - logical_sim_energy))
    return physical_energy_diff, logical_energy_diff

In [None]:
def submit_aim_circuits(
    circuit_dict: object,
    *,
    folder_path: str = "future_aim_results",
    shots_count: int = 1000,
    noise_model: cudaq.mlir._mlir_libs._quakeDialects.cudaq_runtime.NoiseModel | None = None,
    run_async: bool = False,
) -> dict[str, list[dict[str, int]]] | None:
    if run_async:
        os.makedirs(folder_path, exist_ok=True)
    else:
        aim_results = {"physical": [], "logical": []}

    for layer in circuit_dict.keys():
        if run_async:
            print(f"Posting circuits associated with layer=('{layer}')")
        else:
            print(f"Running circuits associated with layer=('{layer}')")

        for basis in ("z_basis", "x_basis"):
            if run_async:
                u, v = layer.split(":")

                tmp_physical_results = cudaq.sample_async(
                    circuit_dict[layer]["physical"][basis], shots_count=shots_count
                )
                with open(
                    f"{folder_path}/physical_{basis}_job_u={u}_v={v}_result.txt", "w"
                ) as file:
                    file.write(str(tmp_physical_results))

                tmp_logical_results = cudaq.sample_async(
                    circuit_dict[layer]["logical"][basis], shots_count=shots_count
                )
                with open(f"{folder_path}/logical_{basis}_job_u={u}_v={v}_result.txt", "w") as file:
                    file.write(str(tmp_logical_results))
            else:
                tmp_physical_results = cudaq.sample(
                    circuit_dict[layer]["physical"][basis],
                    shots_count=shots_count,
                    noise_model=noise_model,
                )
                tmp_logical_results = cudaq.sample(
                    circuit_dict[layer]["logical"][basis],
                    shots_count=shots_count,
                    noise_model=noise_model,
                )
                aim_results["physical"].append({k: v for k, v in tmp_physical_results.items()})
                aim_results["logical"].append({k: v for k, v in tmp_logical_results.items()})
    if not run_async:
        print("\nCompleted all circuit sampling!")
        return aim_results
    else:
        print("\nAll circuits submitted for async sampling!")