In [1]:
import numpy as np
from qiskit import *
from qiskit_optimization import QuadraticProgram
from qiskit.quantum_info import SparsePauliOp
from qiskit_optimization.converters import (InequalityToEquality, IntegerToBinary, 
                                            LinearEqualityToPenalty, LinearInequalityToPenalty,
                                            MaximizeToMinimize, QuadraticProgram2Ising)
# SPSA
from qiskit_optimization.spsa import minimize_spsa

from docplex.mp.model import Model

# SciPy minimizer routine
from scipy.optimize import minimize

from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_ibm_runtime import Estimator, Sampler, Session
from qiskit.result import QuasiDistribution
from qiskit.circuit.library import QAOAAnsatz

In [2]:
service = QiskitRuntimeService()

In [4]:
class QAOASolver:
    def __init__(self, session, qaoa_reps=2):
        self.input_types = (SparsePauliOp, )
        self.output_types = (QuasiDistribution, )
        self.session = session
        self.qaoa_reps = 2

    def run(self, hamiltonian):
        ansatz = QAOAAnsatz(hamiltonian, reps=2)
        estimator = Estimator(session=session, options={"shots": int(1e4)})
        sampler = Sampler(session=session, options={"shots": int(1e4)})

        def cost_func(params, ansatz, hamiltonian, estimator):
            """Return estimate of energy from estimator
        
            Parameters:
                params (ndarray): Array of ansatz parameters
                ansatz (QuantumCircuit): Parameterized ansatz circuit
                hamiltonian (SparsePauliOp): Operator representation of Hamiltonian
                estimator (Estimator): Estimator primitive instance
        
            Returns:
                float: Energy estimate
            """
            cost = estimator.run(ansatz, hamiltonian, parameter_values=params).result().values[0]
            return cost

        x0 = 2 * np.pi * np.random.rand(ansatz.num_parameters)
        res = minimize(cost_func, x0, args=(ansatz, hamiltonian, estimator), method="COBYLA")
        # Assign solution parameters to ansatz
        qc = ansatz.assign_parameters(res.x)
        # Add measurements to our circuit
        qc.measure_all()
        # Sample ansatz at optimal parameters
        samp_dist = sampler.run(qc, shots=int(1e4)).result().quasi_dists[0]
        # Close the session since we are now done with it
        session.close()
        return samp_dist

In [3]:
import copy
import warnings

class PropertySet(dict):
    """A default dictionary-like object"""
    def __missing__(self, key):
        return None

class CompositeWorkflow:
    def __init__(self, passes, name=None, store_final_output=False, strict_validation=True):
        self.passes = passes
        self.stages = {}
        self.name = name
        self.store_final_output = store_final_output
        self.input_types = None
        self.output_types = None
        self.strict_validation = strict_validation
        self._validate_passes()
        self.property_set = PropertySet()

    def _validate_passes(self):
        self.input_types = self.passes[0].input_types
        input_types = self.passes[0].input_types
        for idx, individual_pass in enumerate(self.passes):
            # If the pass is itself a CompositeWorkflow then run its validation
            if isinstance(individual_pass, CompositeWorkflow):
                if individual_pass.name in self.stages:
                    raise Exception(f'Duplicate stage name {individual_pass.name}')
                self.stages[individual_pass.name] = individual_pass
                individual_pass._validate_passes()
                continue
            temp_input_types = individual_pass.input_types
            # Look to see if there is overlap between last passes outputs
            # and this passes inputs
            input_overlap = set(input_types).intersection(set(temp_input_types))
            # If no overlap, bad news
            if not input_overlap:
                raise Exception(f"{temp_input_types} not compatible with {input_types}")
            # If partial overlap, then can only weakly validate the workflow, raise
            # unless explicitly disabled
            if len(input_overlap) != len(temp_input_types) and self.strict_validation:
                diffs = set(input_types).difference(set(temp_input_types))
                raise Exception(f"Possible inputs {diffs} not valid for this pass")
            input_types = individual_pass.output_types
        self.output_types = individual_pass.output_types
    
    def run(self, input):
        temp = copy.deepcopy(input)
        working_props = self.property_set
        for individual_pass in self.passes:
            individual_pass.property_set = working_props
            temp = individual_pass.run(temp)
            if isinstance(individual_pass, CompositeWorkflow):
                if individual_pass.store_final_output:
                    working_props[individual_pass.name] = {"final_output": temp}
        return temp

In [6]:
def evaluate_quadratic_program(bitstring, program):
    constant = program.objective.constant
    linear_elements = program.objective.linear.to_dict()
    quadratic_elements = program.objective._quadratic.to_dict()

    # Flip string so 0th bit is 0th array element for easy math
    x = np.fromiter(list(bitstring[::-1]), dtype=int)
    
    sum = constant
    for element, val in linear_elements.items():
        sum += x[element] * val
    for element, val in quadratic_elements.items():
        sum += x[element[0]] * val * x[element[1]]
    return sum

In [7]:
from functools import wraps

def validate_output_type(func):     
    @wraps(func)
    def wrapper(*args, **kwargs):
        output_types = args[0].output_types
        out = func(*args, **kwargs)
        if not isinstance(out, output_types):
            raise TypeError(f'Output type not in {output_types}')
        return out
    return wrapper  

In [8]:
class EvaluateProgramSolution:
    """Evaluate solutions
    """
    def __init__(self, program=None):
        self.program = program
        self.input_types = (QuasiDistribution, )
        self.output_types = (tuple, )
        self.property_set = PropertySet()

    @validate_output_type
    def run(self, dist):
        if self.program is None:
            program = self.property_set['qubo-transformer']['final_output']
        best_val = np.inf
        best_bits = None
        for bitstring in dist.binary_probabilities():
            temp = evaluate_quadratic_program(bitstring, program)
            if temp < best_val:
                best_val = temp
                best_bits = bitstring
        best_bits = best_bits[::-1]
        out = (best_val, np.fromiter(best_bits, int))
        return out

In [9]:
mdl = Model("docplex model")
x = mdl.binary_var("x")
y = mdl.integer_var(lb=-2, ub=2, name="y")
mdl.minimize(2 * y - x)
# Min-result should be x = 1, y = -2 => -5
#print(mdl.export_as_lp_string())

In [140]:
backend = service.get_backend('ibm_nazca')

In [11]:
qp = QuadraticProgram.from_docplex_mp(mdl)

In [12]:
transformer = CompositeWorkflow([InequalityToEquality(), # Transformation
                                 IntegerToBinary(), # Transformation
                                 LinearEqualityToPenalty(), # Transformation
                                 LinearInequalityToPenalty(), # Transformation
                                 MaximizeToMinimize(), # Transformation
                                ], name='qubo-transformer', store_final_output=True)

In [15]:
#session = Session(backend=backend)
#cw = CompositeWorkflow([transformer,
#                        QuadraticProgram2Ising(), # Quantum part - to Hamiltonian
                        #QAOASolver(session), # Quantum part - execution on QPU
                        #EvaluateProgramSolution() # Quantum part - bit-string to program variables
                        ])

In [13]:
qubo = transformer.run(qp)
hamiltonian = QuadraticProgram2Ising().run(qubo)

In [14]:
hamiltonian

SparsePauliOp(['IIZI', 'IZII', 'ZIII', 'IIIZ', 'IIII'],
              coeffs=[-1. +0.j, -2. +0.j, -1. +0.j,  0.5+0.j, -0.5+0.j])

In [16]:
np.linalg.eig(hamiltonian.to_matrix())[1][:,1]

array([0.+0.j, 1.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j,
       0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j, 0.+0.j])

In [63]:
#session = Session(backend=backend)
from qiskit.primitives import Estimator as QiskitEstimator
from qiskit.primitives import Sampler as QiskitSampler

#estimator = Estimator(session=session, options={"shots": int(1e4)})
#sampler = Sampler(session=session, options={"shots": int(1e4)})
estimator = QiskitEstimator(options={"shots": int(1e4)})
sampler = QiskitSampler(options={"shots": int(1e4)})

ansatz = QAOAAnsatz(hamiltonian, reps=5)
def cost_func(params, ansatz, hamiltonian, estimator):
    """Return estimate of energy from estimator

    Parameters:
        params (ndarray): Array of ansatz parameters
        ansatz (QuantumCircuit): Parameterized ansatz circuit
        hamiltonian (SparsePauliOp): Operator representation of Hamiltonian
        estimator (Estimator): Estimator primitive instance

    Returns:
        float: Energy estimate
    """
    cost = estimator.run(ansatz, hamiltonian, parameter_values=params).result().values[0]
    return cost

x0 = 2*np.pi*np.random.random(size=ansatz.num_parameters)
res = minimize_spsa(cost_func, x0, args=(ansatz, hamiltonian, estimator), maxiter=250)
# Assign solution parameters to ansatz
qc = ansatz.assign_parameters(res.x)
# Add measurements to our circuit
qc.measure_all()
# Sample ansatz at optimal parameters
samp_dist = sampler.run(qc, shots=int(1e4)).result().quasi_dists[0]
# Close the session since we are now done with it
#session.close()

In [64]:
sorted_dict = dict(sorted(samp_dist.binary_probabilities().items(), key=lambda item: -item[1]))
sorted_dict

{'0001': 0.3692,
 '0000': 0.2224,
 '0011': 0.0839,
 '1001': 0.0763,
 '0010': 0.0523,
 '1000': 0.0507,
 '0101': 0.0474,
 '0100': 0.0303,
 '1011': 0.0166,
 '0111': 0.0119,
 '1010': 0.0111,
 '1101': 0.0108,
 '0110': 0.0075,
 '1100': 0.0059,
 '1111': 0.0025,
 '1110': 0.0012}

In [99]:
from qiskit.circuit.library import EfficientSU2, TwoLocal

In [130]:
#ansatz = EfficientSU2(hamiltonian.num_qubits)
ansatz = TwoLocal(hamiltonian.num_qubits, 'ry', 'cx', 'linear', reps=1)
def cost_func(params, ansatz, hamiltonian, estimator):
    """Return estimate of energy from estimator

    Parameters:
        params (ndarray): Array of ansatz parameters
        ansatz (QuantumCircuit): Parameterized ansatz circuit
        hamiltonian (SparsePauliOp): Operator representation of Hamiltonian
        estimator (Estimator): Estimator primitive instance

    Returns:
        float: Energy estimate
    """
    cost = estimator.run(ansatz, hamiltonian, parameter_values=params).result().values[0]
    return cost

x0 = 2*np.pi*np.random.random(size=ansatz.num_parameters)
res = minimize_spsa(cost_func, x0, args=(ansatz, hamiltonian, estimator), maxiter=100)

In [131]:
qc = ansatz.assign_parameters(res.x)
# Add measurements to our circuit
qc.measure_all()
# Sample ansatz at optimal parameters
samp_dist = sampler.run(qc, shots=int(1e4)).result().quasi_dists[0]

In [132]:
samp_dist

{0: 0.0174,
 1: 0.9763,
 2: 0.0033,
 3: 0.0007,
 5: 0.0006,
 6: 0.0009,
 9: 0.0001,
 13: 0.0007}

In [141]:
est = Estimator(backend, options={"shots": int(1e4)})

In [142]:
est.run(QuantumCircuit(4), SparsePauliOp.from_list([('IIII', 1.0)]))

<RuntimeJob('cjjjn376hmsgmh10m1jg', 'estimator')>