# Solving time-dependent Ising Model using Pulser

In [None]:
import cmath
import numpy as np
import networkx as nx
from scipy.constants import hbar
from collections import Counter
from typing import Union, Literal
from networkx.drawing.nx_agraph import graphviz_layout
from qat.core import Variable, Schedule, Observable, Term, Result
from qat.core.qpu import QPUHandler
from qat.core.variables import ArithExpression, angle, cos, sin, real, imag, sqrt



## Defining an Ising Hamiltonian on MyQLM

On Pulser we can solve problems of shape 
$$ H = \hbar\sum_i \frac{\Omega(t)}{2} \sigma_i^x - \delta(t)n_i + \frac{1}{2}\sum_{i\neq j}U_{ij}n_i n_j$$
with $\sigma_i^x$ the Pauli $X$ operator applied on qubit $i$ and $n_i = \frac{1-\sigma_i^z}{2}$ with $\sigma_i^z$ the Pauli $Z$ operator applied on qubit $i$

Let's start by defining the parameters and the Hamiltonian

In [None]:
nqubits = 4
tmax = 23.0
t_variable = Variable("t")
u_variable = Variable("u")
# Omega_t = 1 - t_variable
Omega_t = 1
delta_t = u_variable
U_matrix = [[0, 1, 0, 0],
            [1, 0, 1, 0],
            [0, 1, 0, 1],
            [0, 0, 1, 0]]

In [None]:
hamiltonian = 0
for i in range(nqubits):
    hamiltonian += Omega_t / 2 * Observable(nqubits, pauli_terms=[Term(1, 'X', [i])])
    hamiltonian -= delta_t / 4 * Observable(nqubits, pauli_terms=[Term(1, 'I', [i]),
                                                                      Term(-1, 'Z', [i])])
    for j in range(i):
            hamiltonian += U_matrix[i][j] / 4 * Observable(nqubits, pauli_terms=[Term(1, 'II', [i, j]), 
                                                                                 Term(-1, 'IZ', [i, j]),
                                                                                 Term(-1, 'ZI', [i, j]),
                                                                                 Term(1, 'ZZ', [i, j])]) 
                                                 

The Hamiltonian is implemented as a `Schedule` object. You also have to define the duration of the evolution

In [None]:
schedule = Schedule(drive=hamiltonian,
                    tmax=tmax)

print(schedule)

From a schedule object we can build jobs to be submit to the QPU. An initial state can be defined in the job, as well as an Hamiltonian to be measured at the output.

In [None]:
# To simply sample the final state in the computational basis
job0 = schedule.to_job()

# To evaluate some observable at the end of the computation
H_target = Observable(nqubits, pauli_terms=[Term(1, "XX", [0, 1])])
jobobs = schedule.to_job(observable=H_target)

# Starting from |++++> state
job1 = schedule.to_job(psi_0='++++')

# Starting from |+1+1> state
job2 = schedule.to_job(psi_0='+1+1')

# Starting from a random initial state (simulator only)
vec = np.random.random(2**nqubits)
vec /= np.linalg.norm(vec)
job3 = schedule.to_job(psi_0=vec)

## Defining Pasqal's QPU

In [None]:
from pulser import Pulse, Sequence, Register
from pulser_simulation import Simulation
from pulser.waveforms import CustomWaveform, Waveform
from pulser.devices import Device, VirtualDevice, interaction_coefficients
from pulser.devices import MockDevice
from pulser.devices import Chadoq2

In [None]:
TYPE_REG = Literal["automatic", "from_coord", "square", "rectangle", "triangular_lattice", "hexagon", "max_connectivity"]
class PasqalAQPU(QPUHandler):

    def __init__(self, *, device: Union[Device, VirtualDevice] = MockDevice, register_type: str = "automatic") -> None:
        super().__init__()
        self.device = device
        self.register_type = register_type
        self.register = None

    
    def ising_hamiltonian(self, nqubits, Omega_t, delta_t, phi, U_matrix):
        hamiltonian = 0
        for i in range(nqubits):
            hamiltonian += Omega_t / 2 * (cos(phi) * Observable(nqubits, pauli_terms=[Term(1, 'X', [i])]) - sin(phi) * Observable(nqubits, pauli_terms=[Term(1, 'Y', [i])]))
            hamiltonian -= delta_t / 4 * Observable(nqubits, pauli_terms=[Term(1, 'I', [i]),
                                                                      Term(-1, 'Z', [i])])
            for j in range(i):
                    hamiltonian += U_matrix[i][j] / 4 * Observable(nqubits, pauli_terms=[Term(1, 'II', [i, j]), 
                                                                                         Term(-1, 'IZ', [i, j]),
                                                                                         Term(-1, 'ZI', [i, j]),
                                                                                         Term(1, 'ZZ', [i, j])])
        return hamiltonian


    def ising_hamiltonian_from_register(self, nqubits, Omega_t, delta_t, phi, register=None):
        if register is None and self.register is None:
                raise ValueError("""Either a register should be provided or a register should be defined before.""")
        elif not register is None:
            self.register = register
        U_matrix = self.interactions_from_register(self.register)
        hamiltonian = 0
        for i in range(nqubits):
            hamiltonian += Omega_t / 2 * (cos(phi) * Observable(nqubits, pauli_terms=[Term(1, 'X', [i])]) - sin(phi) * Observable(nqubits, pauli_terms=[Term(1, 'Y', [i])]))
            hamiltonian -= delta_t / 4 * Observable(nqubits, pauli_terms=[Term(1, 'I', [i]),
                                                                      Term(-1, 'Z', [i])])
            for j in range(i):
                    hamiltonian += U_matrix[i][j] / 4 * Observable(nqubits, pauli_terms=[Term(1, 'II', [i, j]), 
                                                                                         Term(-1, 'IZ', [i, j]),
                                                                                         Term(-1, 'ZI', [i, j]),
                                                                                         Term(1, 'ZZ', [i, j])])
        return hamiltonian
    

    def distances_from_register(self, register):
        positions = register._to_abstract_repr()
        n = len(positions)
        distance_matrix = np.zeros((n, n))
        for i in range(n):
            for j in range(n):
                distance_matrix[i][j] += np.sqrt((positions[i]["x"] - positions[j]["x"])**2 + (positions[i]["y"] - positions[j]["y"])**2) 
        return distance_matrix


    def interactions_from_distances(self, distances):
        n, m = np.size(distances)
        interactions = np.zeros((n, m))
        for i in range(n):
            for j in range(m):
                if distances[i][j]==0:
                    interactions[i][j] = None
                else:
                    interactions[i][j] += interaction_coefficients.c6_dict[self.device.rydberg_level]/distances[i][j]**6
                
        return interactions


    def interactions_from_register(self, register):
        distances = self.distances_from_register(register)
        return self.interactions_from_distances(distances)

    
    def register_from_coordinates(self, pos, prefix=None):
        self.register_type = "from_coord"
        self.nbqubits = len(pos.keys())
        self.register = Register.from_coordinates(pos, prefix)
        return self.register


    def register_from_distances(self, distances, prefix=None, side=None, rows=None, columns=None, atoms_per_row=None, layers=None, nqubits=None):
        if self.register_type == "from_coord":
            raise ValueError("""Cannot define a "from_coord" register from distances. Should be declared using "register_from_coordinates".""")
        
        if isinstance(distances, np.ndarray):
            nbqubitsLines, nbqubitsColumns = np.shape(distances)
            if nbqubitsLines != nbqubitsColumns:
                raise ValueError("Distances should be a square matrix or a float")
            self.nbqubits = nbqubitsLines
            distances[np.isnan(distances)] = 0
        elif isinstance(distances, float):
            if self.register_type == "automatic":
                raise ValueError("Cannot define a register arbitrarily only with distances")
        else:
            raise ValueError("distances should be a matrix or a float")

        if self.register_type == "automatic":
            print(distances)
            G = nx.from_numpy_array(distances)
            # Get the positions of the atoms using the spring method
            pos = nx.spring_layout(G)
            print(pos)
            # The positions are initially not scaled using this method
            interactions_indices = np.nonzero(distances)
            first_interaction = distances[interactions_indices[0][0]][interactions_indices[1][0]]
            scale_graph = np.linalg.norm(pos[0] - pos[1])
            for node in pos:
                pos[node] *= first_interaction / scale_graph
            print(pos)
            self.register = Register(pos)
        else:
            if isinstance(distances, np.ndarray):
                non_zero_distances = distances[distances != 0]
                if len(non_zero_distances) == 0:
                    raise ValueError("At least one distance should be non zero")
                if len(non_zero_distances[non_zero_distances != non_zero_distances[0]]):
                    raise ValueError("The distances between each atoms should be the same")
                spacing = non_zero_distances[0]

            if self.register_type == "square":
                if isinstance(distances, float):
                    if side is not None:
                        self.nbqubits = side**2
                    else:
                        raise ValueError("side must be defined for a square register")
                side = int(np.sqrt(self.nbqubits))
                if side**2 != self.nbqubits:
                    raise ValueError("For a square register number of qubits should be a square")
                self.register = Register.square(side, spacing, prefix)
            
            elif self.register_type == "rectangle":    
                if not columns is None and not rows is None:
                    self.nbqubits = columns*rows
                else:
                    raise ValueError("rows and columns must be defined for a rectangle register")
                self.register = Register.rectangle(rows, columns, spacing, prefix)

            elif self.register_type == "triangular_lattice":
                if not rows is None and atoms_per_row is None:
                    self.nbqubits = rows*atoms_per_row
                else:
                    raise ValueError("rows and atoms_per_row must be defined for a triangular lattice register")
                self.register = Register.triangular_lattice(rows, atoms_per_row, spacing, prefix)
            
            elif self.register_type == "hexagon":
                if not layers is None:
                    nbqubits = 1
                    for layerindex in range(layers):
                        nbqubits += (layerindex + 1) * 6
                    self.nbqubits = nbqubits
                else:
                    raise ValueError("layers must be defined for a haxegon register")
                self.register = Register.hexagon(layers, spacing, prefix)

            elif self.register_type == "max_connectivity":
                if isinstance(distances, float):
                    if not nqubits is None:
                        self.nbqubits = nqubits
                    else:
                        raise ValueError("nqubits must be defined for a register with max connectivity")
                self.register = Register.max_connectivity(self.nbqubits, self.device, spacing, prefix)
        return self.register


    def distances_from_interaction_matrix(self, interaction_matrix:np.ndarray, symetrized = False):
        c6coeff = interaction_coefficients.c6_dict[self.device.rydberg_level]
        (nbqubitsLine, nbqubitsColumn) = np.shape(interaction_matrix)
        if nbqubitsLine != nbqubitsColumn:
            raise ValueError("Interaction matrix should be a square matrix")
        if not symetrized:
            symetrized_int_matrix = (interaction_matrix + np.transpose(interaction_matrix))/2
        distances = np.zeros((nbqubitsLine, nbqubitsColumn))
        for i in range(nbqubitsLine):
            for j in range(nbqubitsColumn):
                interaction_value = interaction_matrix[i][j]
                if not interaction_value:
                    distances[i][j] = None
                else:
                    distances[i][j] = (c6coeff / interaction_matrix[i][j])**(1/6)

        return distances


    def register_from_interaction_matrix(self, interaction_matrix:np.ndarray):
        distances = self.distances_from_interaction_matrix(interaction_matrix)
        print(distances)
        return self.register_from_distances(distances)
       

    def extract_job_param(self, job):
        self.circuit = job.circuit
        self.schedule = job.schedule
        self.jobtype = job.type
        self.measObservable = job.observable
        self.qubits = job.qubits
        self.nbshots = job.nbshots
        self.aggregate_data = job.aggregate_data
        self.amp_threshoold = job.amp_threshold
        self.parameter_map = job.parameter_map
        self.psi_0 = job.psi_0_str
        self.variables = job.get_variables()
    

    def extract_schedule_param(self):
        # Extract the relevant parameters
        # In the shape of [{operator: {qbit_id: coeff}} for each sequence]
        self.nbqbits = self.schedule.nbqbits
        self.tmax = self.schedule._tmax
        self.drive_coeffs = self.schedule.drive_coeffs
        self.nsequences = len(schedule.drive_obs)
        self.scheduleVariables = self.schedule.get_variables()
        self.scheduleTimeVariable = self.schedule.tname

        sequences = []
        for i in range(self.nsequences):
            # extract the Hamiltonian
            obs = self.schedule.drive_obs[i]
            parameters = {"identity":obs._constant_coeff.get_value(), "X":{}, "Y": {}, "Z":{}, "ZZ":{}}
            for term in obs._terms:
                parameters[term.op][tuple(term.qbits)] = term._coeff.get_value()
            # extract ti, tf
            drive_coeff = schedule.drive_coeffs[i].get_value()
            if schedule.drive_coeffs[i].is_abstract:
                heavisideIndex = drive_coeff.to_thrift().find("heaviside")
                if heavisideIndex == -1:
                    parameters["ti"] = 0
                    parameters["tf"] = int(self.schedule._tmax.get_value())
                    parameters["drive_coeff"] = drive_coeff
                else:
                    extract_times = drive_coeff.to_thrift()[heavisideIndex:].split()
                    parameters["ti"] = int(extract_times[2])  # in ns
                    parameters["tf"] = int(extract_times[3])  # in ns
                    parameters["drive_coeff"] = ArithExpression.from_string(drive_coeff.to_thrift()[:heavisideIndex]+"1 "+" ".join(extract_times[4:]))
            else:
                parameters["ti"] = 0
                parameters["tf"] = int(self.schedule._tmax.get_value())
                parameters["drive_coeff"] = drive_coeff
            sequences.append(parameters)

        return sequences


    def extract_U_matrix(self, sequences):
        # U_matrix
        U_matrix = np.zeros((self.nbqbits, self.nbqbits))
        # Extract the qubits on which to apply the ZZ gates and the coefficients
        # Should be the same for each sequence
        for sequence in sequences:
            sequence["ZZ"]["id"]=0
        ZZ_list = list({sequence["ZZ"]["id"]: sequence["ZZ"] for sequence in sequences}.values())
        del ZZ_list[0]["id"]
        if len(ZZ_list)>1:
            raise ValueError("ZZ coefficients cannot be changed during a computation")
        # Build an adjacency matrix out of it
        ZZ_listitem = ZZ_list[0].items()
        for ZZ_index, ZZ_coeff in ZZ_listitem:
            if isinstance(ZZ_coeff, ArithExpression):
                raise ValueError("ZZ coefficients cannot be an expression")
            else:
                if ZZ_coeff < 0:
                    raise ValueError("ZZ coefficients should be positive")
                U_matrix[ZZ_index] += ZZ_coeff
        # U*ni*nj = U/4(II+IZ+ZI+ZZ)
        return 4. * U_matrix


    def extract_omegas_phases(self, sequences):
        omegas_list = []
        phases_list = []
        for sequence in sequences:
            X_coeff = list(set(sequence["X"].values()))
            Y_coeff = list(set(sequence["X"].values()))
            nxcoeff = len(X_coeff)
            nycoeff = len(Y_coeff)
            if nxcoeff > 1 or nycoeff > 1:
                raise ValueError("Omega should be the same in each sequence")
            if nxcoeff == 0:
                X_coeff = 0
            if nycoeff == 0:
                Y_coeff = 0
            R_coeff = sequence["drive_coeff"] * (X_coeff[0] + 1j* Y_coeff[0])
            omegas_list.append(sqrt(real(R_coeff)**2 + imag(R_coeff)**2))
            phases_list.append(-1*angle(R_coeff))

        return omegas_list, phases_list


    def extract_deltas(self, sequences, U_matrix):
        deltas_list = []
        constant_from_U = np.sum(U_matrix) / 4  # U*ni*nj = U/4(II+IZ+ZI+ZZ) 
        for sequence in sequences:
            identity_coeff = sequence["identity"]  # delta*ni = delta*(I+Z)/2
            deltas_list.append(-4*sequence["drive_coeff"]*(identity_coeff - constant_from_U))
        return deltas_list


    def evaluateExpression(self, expression, myqlmSequences, ti, tf):
        if isinstance(expression, ArithExpression):
            expression = expression(**self.parameter_map)
            if self.scheduleTimeVariable in expression.get_variables():
                wvfOmega = CustomWaveform(np.array([expression(**{self.scheduleTimeVariable:t_value}) for t_value in range(ti, tf, 1)]))
                return wvfOmega
        return expression


    def build_pulser_sequence(self, myqlmSequences, omegas, deltas, phases):
        for i in range(self.nsequences):
            duration = myqlmSequences[i]["tf"] - myqlmSequences[i]["ti"]

            omega = self.evaluateExpression(omegas[i], myqlmSequences, myqlmSequences[i]["ti"], myqlmSequences[i]["tf"])
            isTimeDependentOmega = isinstance(omega, Waveform)
    
            delta = self.evaluateExpression(deltas[i], myqlmSequences, myqlmSequences[i]["ti"], myqlmSequences[i]["tf"])
            isTimeDependentDelta = isinstance(delta, Waveform)
            phase = self.evaluateExpression(phases[i], myqlmSequences, myqlmSequences[i]["ti"], myqlmSequences[i]["tf"])
            isTimeDependentPhase = isinstance(phase, Waveform)
            if isTimeDependentPhase:
                phase = phase.first_value
            if not(isTimeDependentOmega or isTimeDependentDelta or isTimeDependentPhase):
                # If all parameters are constant
                # Add a constant pulse
                # In this case we could like to send an eom pulse
                self.sequence.add(Pulse.ConstantPulse(duration, omega, delta, phase), channel="ch")
            elif not isTimeDependentOmega:
                # In this case we could like to send an eom pulse
                self.sequence.add(Pulse.ConstantAmplitude(duration, omega, delta, phase), channel="ch")
            elif not isTimeDependentDelta:
                self.sequence.add(Pulse.ConstantDetuning(duration, omega, delta, phase, channel="ch"))  # Does not work
            else:
                self.sequence.add(Pulse(duration, omega, delta, phase, channel="ch"))


    def convertPulserResult(self, pulserResult, N_samples):
        myqlmResult = Result()
        for i in range(N_samples):
            final_state = pulserResult.get_final_state()
            # state (int) – the index of the state in comput. basis
            Result.add_sample(final_state, amplitude=final_state.type, probability=None, err=None, intermediate_measurements=None)
        if self.job_type=="SAMPLE":
            return Result
        # elif self.job_type=="OBS":
        #     return Result.
        else:
            raise ValueError("""Job type can only be "OBS" or "SAMPLE".""")


    def submit_job(self, job, sampling_rate=1.0, config=None, evaluation_times='Full', with_modulation=False, N_samples=1000):
        self.extract_job_param(job)
        myqlmSequences = self.extract_schedule_param()
        # Extract U matrix
        U_matrix = self.extract_U_matrix(myqlmSequences)
        # Extract Omega(t), delta(t) for each sequence
        omegas, phases = self.extract_omegas_phases(myqlmSequences)
        deltas = self.extract_deltas(myqlmSequences, U_matrix)
        # From the U_matrix we can build the register
        if self.register_type == "from_coord":
            if self.register is None:
                raise ValueError(""""from_coord" register type: register should be defined beforehand.""")
        else:
            self.register = self.register_from_interaction_matrix(U_matrix)
        
        if self.register_type != "automatic":
            U_matrix = self.interactions_from_register(self.register)
            # Should we update deltas ?

        # From these parameters compose a Pulser Sequence
        print(self.register)
        self.sequence = Sequence(self.register, self.device)
        print(self.sequence.available_channels)
        self.sequence.declare_channel("ch", "rydberg_global")
        self.build_pulser_sequence(myqlmSequences, omegas, deltas, phases)
        # Configure slm for self.psi_0 initialization
        # Configure measurements from self.measObservable
        self.sim = Simulation(self.sequence, sampling_rate, config, evaluation_times, with_modulation)
        pulserResult = self.sim.run(progress_bar = True)
        # myqlmResult = self.convertPulserResult(pulserResult, N_samples)
        return pulserResult

In [None]:
qpu = PasqalAQPU()

In [None]:
qpu.submit_job(job0(u=1))