# Max-2-SAT using QAOA


In [None]:
# Suppress warnings
import warnings
warnings.filterwarnings("ignore")

# Set this to true to use the local Aer simulator
USE_LOCAL_SIM = True

# Give any backend name here to choose a specific one
BACKEND_NAME = ''

# Max number of retries for errors while running estimator
MAX_RETRIES = 20

In [None]:
# To run on hardware, select the backend with the fewest number of jobs in the queue

from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_aer import AerSimulator

service = QiskitRuntimeService(channel='ibm_quantum')
real_backend = service.least_busy(operational=True, simulator=False) if not BACKEND_NAME else service.backend(BACKEND_NAME)
sim = AerSimulator.from_backend(real_backend)

if USE_LOCAL_SIM:
    backend = sim
else:
    backend = real_backend
    
print(f"Backend chosen: {backend.name}")

## Step 1 - Map classical inputs to a quantum problem

### Define the problem

The boolean expression is defined using tuples where each tuple represents a clause. Each member of the tuple represent a literal and the tuple represent the disjunction of literals. Positive numbers in the tuple represent the variables and negative numbers represent the negation of that variable.


In [None]:
from IPython.display import display, Math

# Define the boolean expression
# clauses = [(1,3), (-1,2), (-2, 3)]
# clauses = [(1,2), (2,3), (1, 3)]
clauses = [(1, -2), (2, -3), (3, -1), (1, 2)]

# Get a list of variables
variables = set()
for i, j in clauses:
    variables.add(abs(i))
    variables.add(abs(j))

print("List of variables:")
display(Math(', '.join(f'x_{i}' for i in sorted(variables))))

# Print the boolean expression
latex_clauses = []
for clause in clauses:
    clause_literals = [f'\\overline{{x_{abs(lit)}}}' if lit < 0 else f'x_{abs(lit)}' for lit in clause]
    latex_clauses.append(f'({clause_literals[0]} \\lor {clause_literals[1]})')

print("Given boolean expression:", end='')
display(Math(' \\land '.join(latex_clauses)))

### Associate a quadratic penalty for each clause

For every type of clause we can associate a quadratic penalty to it as shown below. The quadratic penalties for each clause can be added together to form a composite penalty function, which can be minimized

| Clause                                 | Penalty                     |
| -------------------------------------- | --------------------------- |
| $$x_i \lor x_j$$                       | $$1 - x_i - x_j + x_i x_j$$ |
| $$x_i \lor \overline{x_j}$$            | $$x_j - x_i x_j$$           |
| $$\overline{x_i} \lor x_j$$            | $$x_i - x_i x_j$$           |
| $$\overline{x_i} \lor \overline{x_j}$$ | $$x_i x_j$$                 |


In [None]:
# Calculate the constant term, linear terms and quadratic terms in the composite penalty function

constant_term = 0
linear_terms = {}
quadratic_terms = {}

for clause in clauses:
    lit_1, lit_2 = clause if abs(clause[0]) < abs(clause[1]) else clause[::-1]
    var_1, var_2 = abs(lit_1), abs(lit_2)

    if lit_1 > 0:
        if lit_2 > 0:
            constant_term += 1
            linear_terms[var_1] = linear_terms.get(var_1, 0) - 1
            linear_terms[var_2] = linear_terms.get(var_2, 0) - 1
            quadratic_terms[(var_1, var_2)] = quadratic_terms.get((var_1, var_2), 0) + 1
        elif lit_2 < 0:
            linear_terms[var_2] = linear_terms.get(var_2, 0) + 1
            quadratic_terms[(var_1, var_2)] = quadratic_terms.get((var_1, var_2), 0) - 1
    elif lit_1 < 0:
        if lit_2 > 0:
            linear_terms[var_1] = linear_terms.get(var_1, 0) + 1
            quadratic_terms[(var_1, var_2)] = quadratic_terms.get((var_1, var_2), 0) - 1
        elif lit_2 < 0:
            quadratic_terms[(var_1, var_2)] = quadratic_terms.get((var_1, var_2), 0) + 1

print("Constant term: ", constant_term)
print("Linear terms: ", sorted(linear_terms.items()))
print("Quadratic terms: ", sorted(quadratic_terms.items()))

In [None]:
# Load the quadratic program

from qiskit_optimization import QuadraticProgram

mod = QuadraticProgram("max-2-sat")

for i in sorted(variables):
    mod.binary_var(name=f"x{i}")
    
mod.minimize(
    constant=constant_term, 
    linear={f"x{k}": v for k,v in  linear_terms.items() if v != 0}, 
    quadratic={(f"x{k[0]}", f"x{k[1]}"): v for k, v in quadratic_terms.items() if v != 0})

print(mod.prettyprint())

### Convert quadratic program into Ising Hamiltonian


In [None]:
from qiskit_optimization import translators

hamiltonian, offset = translators.to_ising(mod)
print(hamiltonian)

### Build Ansatz for the QAOA algorithm


In [None]:
# QAOA ansatz circuit
from qiskit.circuit.library import QAOAAnsatz

ansatz = QAOAAnsatz(hamiltonian, reps=2)

print("Ansatz in basic gates: ")
ansatz.decompose(reps=3).draw(output="mpl", style="iqp")

In [None]:
print("QAOA structure with reps=1:")
ansatz.decompose().draw(output="mpl", style="iqp")

## Step 2: Optimize problem for quantum execution.


### ISA Circuit

Schedule a series of [`qiskit.transpiler`](https://docs.quantum-computing.ibm.com/api/qiskit/transpiler) passes to optimize the circuit for the selected backend and make it compatible with the instruction set architecture (ISA) of the backend.


In [None]:
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager

target = backend.target
pm = generate_preset_pass_manager(target=target, optimization_level=3)

ansatz_isa = pm.run(ansatz)
ansatz_isa.draw(output="mpl", idle_wires=False, style="iqp")

### ISA Observables

Transform the Hamiltonian to make it backend compatible before running jobs.


In [None]:
hamiltonian_isa = hamiltonian.apply_layout(ansatz_isa.layout)
hamiltonian_isa

## Step 3: Execute using Qiskit Primitives


Use a [`Session`](https://docs.quantum-computing.ibm.com/api/qiskit-ibm-runtime/qiskit_ibm_runtime.Session) to execute all calls within a single block.

In [None]:
from qiskit_ibm_runtime import Session
from qiskit_ibm_runtime import EstimatorV2 as Estimator

session = Session(backend=backend)

estimator = Estimator(session=session)
estimator.options.dynamical_decoupling.enable = True
estimator.options.default_shots = 10_000

Define the cost function over which to minimize. This is done by computing the expectation value of the Hamiltonian with respect to the parameterized ansatz circuit.


In [None]:
# Cost function returing estimate of energy from estimator
num_retries = 0

def cost_func(params, ansatz, hamiltonian, estimator):
    global num_retries
    try:
        pub = (ansatz, [hamiltonian], [params])
        result = estimator.run(pubs=[pub]).result()
        cost = result[0].data.evs[0]
    except Exception as e:
        print("Error while running estimator: ", str(e))
        if num_retries < MAX_RETRIES:
            global session
            session = Session(backend=backend)
            estimator = Estimator(session=session)
            estimator.options.dynamical_decoupling.enable = True
            estimator.options.default_shots = 10_000
            num_retries += 1
            return cost_func(params, ansatz, hamiltonian, estimator)
        raise e

    return cost

Use the [COBYLA routine from SciPy through the minimize function](https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html):


In [None]:
import numpy as np
from scipy.optimize import minimize

# Set an initial set of random parameters:
x0 = 2 * np.pi * np.random.rand(ansatz_isa.num_parameters)

res = minimize(cost_func, x0, args=(ansatz_isa, hamiltonian_isa, estimator), method="COBYLA")

The solution will be encoded in the output distribution of the ansatz circuit bound with the optimal parameters from the minimization. Therefore, a [`Sampler`](https://docs.quantum-computing.ibm.com/api/qiskit-ibm-runtime/qiskit_ibm_runtime.SamplerV2) primitive is instantiated with the same `Session`.


In [None]:
from qiskit_ibm_runtime import SamplerV2 as Sampler

sampler = Sampler(session=session)
sampler.options.dynamical_decoupling.enable = True
sampler.options.default_shots = 10_000

In [None]:
print(res)

## Step 4: Post-process, return result in classical format


Plug in the solution vector of parameter angles (`x`) into the ansatz circuit to get the result


In [None]:
# Assign solution parameters to ansatz
qc = ansatz.assign_parameters(res.x)

# Add measurements to the circuit
qc.measure_all()
qc_isa = pm.run(qc)
qc_isa.draw(output="mpl", idle_wires=False, style="iqp")

In [None]:
result = sampler.run([qc_isa]).result()
samp_dist = result[0].data.meas.get_counts()
session.close()

Visualize the solution


In [None]:
from IPython.display import HTML
from qiskit.visualization import plot_distribution

# Calculate the satisfied clauses for each case
data = []
for case, probability in samp_dist.items():
    values = dict(zip(sorted(variables), map(int, reversed(list(case)))))
    row = list(reversed(list(case)))
    sat = constant_term
    for var, coefficient in linear_terms.items():
        sat += values[var] * coefficient
    for vars, coefficient in quadratic_terms.items():
        sat += values[vars[0]] * values[vars[1]] * coefficient
    sat = len(clauses) - sat
    row.extend([sat, probability/10000])
    data.append(row)

# Sort the data according to probability
data.sort(key=lambda x: x[len(variables) + 1], reverse=True)

# Add header for the table
data.insert(0, [f'x{i}' for i in sorted(variables)] + ["Satisfied Clauses", "Probability" ])

# Display the equation and solutions
display(Math(' \\land '.join(latex_clauses)))
display(HTML(
    '<table><tr>{}</tr></table>'.format(
        '</tr><tr>'.join(
            '<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in data)
        )
))

# Show histogram
plot_distribution(samp_dist, figsize=(15, 5))