In [1]:
# --------- Custom ansatzes: Hardware-Efficient and simple ADAPT-style builder ----------
from qiskit.circuit.library import TwoLocal
from qiskit import QuantumCircuit
from qiskit.circuit import Parameter
from qiskit.result import QuasiDistribution
import numpy as np
import copy
import warnings
import time
from qiskit_algorithms import SamplingVQE
from qiskit_algorithms.optimizers import COBYLA
#from qiskit_aer.primitives import Sampler


In [2]:
def build_hardware_efficient_ansatz(num_qubits, reps=2):
    """
    Build a standard hardware-efficient ansatz using TwoLocal (ry + cz).
    """
    return TwoLocal(num_qubits=num_qubits, rotation_blocks="ry", entanglement_blocks="cz", reps=reps, skip_final_rotation_layer=False)


In [3]:
def _single_qubit_ry_circuit(qubit_index, param_name=None):
    """Return a one-parameter QuantumCircuit that applies an Ry on qubit_index."""
    q = QuantumCircuit(1)  # placeholder single-qubit circuit; will be composed later
    p = Parameter(param_name or f"ry_q{qubit_index}")
    q.ry(p, 0)
    return q, p


In [4]:
def _two_qubit_rxrx_circuit(q0, q1, param_name=None):
    """Return a one-parameter 2-qubit ZZ-rotation-like block (here use RXX for demo)."""
    qc = QuantumCircuit(2)
    p = Parameter(param_name or f"rxx_{q0}_{q1}")
    # build a simple XX-rotation between the two qubits (mapped later to correct indices)
    qc.rxx(p, 0, 1)
    return qc, p


In [5]:
def construct_global_circuit_from_blocks(num_qubits, blocks):
    """
    Given a list of blocks where each block is (qc_block, param, target_qubits),
    assemble them into a single QuantumCircuit acting on num_qubits qubits.
    Each block.qc is a small circuit with indices 0..k-1; we append it to the global circuit
    using .compose with proper qubit mapping.
    """
    global_qc = QuantumCircuit(num_qubits)
    for qc_block, param, targets in blocks:
        # Map the block's local qubits [0..k-1] to global 'targets'
        # Use compose with front=False to append
        try:
            global_qc = global_qc.compose(qc_block, qubits=targets, inplace=False)
        except Exception:
            # fallback: append gate by gate (less efficient but robust)
            for inst, qargs, cargs in qc_block.data:
                # remap qargs: they refer to local indices 0..k-1
                new_qargs = [global_qc.qubits[targets[i.index]] if hasattr(i, "index") else global_qc.qubits[targets[i]] for i in qargs]
                global_qc.append(inst, new_qargs, cargs)
    return global_qc


In [6]:
def build_adaptive_ansatz(qp, num_qubits, sampler, optimizer,
                          pool_type="single_ry",
                          max_iter=6,
                          tol=1e-4,
                          verbose=True,
                          time_limit_per_candidate=10.0):
    """
    Simple greedy ADAPT-like builder:
      - Start with an empty ansatz (just identity).
      - Define a pool of candidate operators (single-qubit Ry on each qubit, optionally two-qubit RXX between neighbors).
      - For each iteration:
          For each operator in the pool not yet selected:
              - Temporarily append it to the current ansatz (new single parameter)
              - Run a short VQE (SamplingVQE with COBYLA) to optimize parameters of the *entire* ansatz (warm-start previous params)
              - Record resulting energy
          - Choose the operator that gives the largest energy drop and permanently add it
      - Stop when no operator improves energy by more than tol or max_iter reached.
    Notes:
      - This is a demonstration-level greedy algorithm (evaluates energy reduction by trial).
      - It is computationally heavier since it runs many VQE evaluations (one per candidate per iteration).
    """
    if sampler is None:
        raise RuntimeError("ADAPT builder requires a working Sampler backend (qiskit-aer).")

    # Create operator pool
    pool = []
    if pool_type == "single_ry":
        for q in range(num_qubits):
            qc_block, param = _single_qubit_ry_circuit(q, param_name=f"ry_q{q}")
            pool.append((qc_block, param, [q]))
        # also add some two-qubit operators between neighbors for richer pool
        for q in range(num_qubits - 1):
            qc_block, param = _two_qubit_rxrx_circuit(q, q + 1, param_name=f"rxx_{q}_{q+1}")
            pool.append((qc_block, param, [q, q + 1]))
    else:
        raise ValueError("Unknown pool_type")

    # Current ansatz blocks (list of tuples (qc_block, param, targets))
    selected_blocks = []
    selected_params = []  # keep parameter objects for warm-start mapping

    # Keep track of best energy so far using a classical baseline (NumPy) if available
    from qiskit_algorithms import NumPyMinimumEigensolver
    from qiskit_optimization.algorithms import MinimumEigenOptimizer
    classical_energy = None
    try:
        classical_solver = NumPyMinimumEigensolver()
        meo_class = MinimumEigenOptimizer(classical_solver)
        classical_res = meo_class.solve(qp)
        classical_energy = classical_res.fval
        if verbose:
            print(f"[ADAPT] Classical baseline energy: {classical_energy:.6f}")
    except Exception:
        classical_energy = None

    # Helper to build current ansatz QuantumCircuit
    def build_ansatz_circuit(blocks):
        if len(blocks) == 0:
            return QuantumCircuit(num_qubits)  # identity
        return construct_global_circuit_from_blocks(num_qubits, blocks)

    # Helper to run a short VQE and return energy (fval)
    def evaluate_ansatz_energy(ansatz_qc, initial_point=None, maxiter=150):
        try:
            # Build parameter list from ansatz circuit
            params = list(ansatz_qc.parameters)
            # Build SamplingVQE with given ansatz
            vqe = SamplingVQE(sampler=sampler, ansatz=ansatz_qc, optimizer=optimizer)
            meo = MinimumEigenOptimizer(vqe)
            # Solve QP; this will run VQE and return result with fval
            res = meo.solve(qp)
            return res.fval, res
        except Exception as e:
            warnings.warn(f"[ADAPT] VQE evaluation failed: {e}")
            return None, None

    # Start greedy selection
    best_energy = None
    best_result = None

    if verbose:
        print("[ADAPT] Starting adaptive ansatz construction...")

    for it in range(max_iter):
        if verbose:
            print(f"\n[ADAPT] Iteration {it+1}/{max_iter}. Current selected operators: {len(selected_blocks)}")
        candidate_energies = []
        candidate_results = []
        candidate_blocks = []

        # Evaluate baseline energy with current ansatz (if not set)
        curr_ansatz = build_ansatz_circuit(selected_blocks)
        if len(selected_blocks) == 0:
            base_energy, base_res = evaluate_ansatz_energy(curr_ansatz)
            best_energy = base_energy
            best_result = base_res
            if verbose:
                print(f"[ADAPT] Base energy with empty ansatz: {best_energy}")
        else:
            # best_energy reflects previous iteration best
            pass

        # For each operator in the pool not already selected, try adding it
        for idx, (qc_block, param, targets) in enumerate(pool):
            # skip if this exact block is already selected (simple identity check via param name)
            names_selected = [str(b[1]) for b in selected_blocks]
            if str(param) in names_selected:
                continue

            # Build temporary blocks = selected_blocks + this candidate
            temp_blocks = copy.deepcopy(selected_blocks)
            # When composing, ensure the candidate's local circuit uses parameters distinct from other blocks
            temp_blocks.append((qc_block, param, targets))
            temp_ansatz = build_ansatz_circuit(temp_blocks)

            # Evaluate energy for temp ansatz (short VQE)
            start_time = time.time()
            energy, res = evaluate_ansatz_energy(temp_ansatz)
            elapsed = time.time() - start_time
            if energy is None:
                if verbose:
                    print(f"[ADAPT] Candidate {param} evaluation failed; skipping.")
                continue
            candidate_energies.append(energy)
            candidate_results.append(res)
            candidate_blocks.append((qc_block, param, targets))
            if verbose:
                print(f"[ADAPT] Candidate {param} -> energy {energy:.6f} (eval time {elapsed:.1f}s)")

        # If no candidate produced a valid energy, break
        if len(candidate_energies) == 0:
            if verbose:
                print("[ADAPT] No valid candidate evaluations; stopping.")
            break

        # Choose candidate with minimum energy
        best_idx = int(np.argmin(candidate_energies))
        chosen_energy = candidate_energies[best_idx]
        chosen_block = candidate_blocks[best_idx]
        chosen_res = candidate_results[best_idx]

        energy_drop = (best_energy - chosen_energy) if (best_energy is not None and chosen_energy is not None) else None
        if verbose:
            print(f"[ADAPT] Best candidate reduces energy to {chosen_energy:.6f} (drop: {energy_drop:.6f})")

        # Check improvement tolerance
        if energy_drop is None or (energy_drop is not None and energy_drop <= tol):
            if verbose:
                print("[ADAPT] Energy improvement below tolerance -> stopping adaptive growth.")
            break

        # Accept chosen block permanently
        selected_blocks.append(chosen_block)
        best_energy = chosen_energy
        best_result = chosen_res

        # Remove chosen param from pool (so not selected again)
        pool = [p for p in pool if str(p[1]) != str(chosen_block[1])]

    # Final ansatz circuit
    final_ansatz = build_ansatz_circuit(selected_blocks)
    if verbose:
        print(f"\n[ADAPT] Finished. Selected {len(selected_blocks)} operators. Final energy estimate: {best_energy}")
    return final_ansatz, best_result

#print(final_ansatz)

#print(best_result)

In [7]:
# Global configuration used everywhere in VQE
num_assets = 4
seed = 123
risk_factor = 0.5
budget = num_assets // 2

ansatz_reps = 2
cobyla_maxiter = 200
cvar_alpha = 0.2


# Build portfolio optimization QP
portfolio = PortfolioOptimization(mu, sigma, risk_factor, budget)
qp = portfolio.to_quadratic_program()


NameError: name 'PortfolioOptimization' is not defined

In [None]:
#  Adaptive (ADAPT-style greedy) — note: this will run multiple short VQEs while building the ansatz

optimizer = COBYLA(maxiter=100)  # shorter inner optimizations for speed during adaptive selection
ansatz_adapt, adapt_result = build_adaptive_ansatz(qp, num_assets, sampler, optimizer,
                                                   pool_type="single_ry",
                                                   max_iter=6,
                                                   tol=1e-4,
                                                   verbose=True)
# You can further fine-tune the final ansatz by running a longer VQE on the final ansatz:
final_optimizer = COBYLA(maxiter=300)
final_vqe = SamplingVQE(sampler=sampler, ansatz=ansatz_adapt, optimizer=final_optimizer)
final_meo = MinimumEigenOptimizer(final_vqe)
final_result = final_meo.solve(qp)
print_result(final_result, portfolio, "Sampling VQE (ADAPT-style final)")


In [None]:
#  Hardware-efficient (fast — existing code already used this)

ansatz = build_hardware_efficient_ansatz(num_assets, reps=ansatz_reps)
optimizer = COBYLA(maxiter=cobyla_maxiter)
vqe = SamplingVQE(sampler, ansatz=ansatz, optimizer=optimizer)
meo_vqe = MinimumEigenOptimizer(vqe)
result_vqe = meo_vqe.solve(qp)
print_result(result_vqe, portfolio, "Sampling VQE (Hardware-Efficient)")
