In [1]:
# Import Libraries
import pandas as pd
import yfinance as yf
import numpy as np
import itertools
import math
from math import pi
import random
from scipy.optimize import minimize
import matplotlib.pyplot as plt

from qiskit import QuantumCircuit, ClassicalRegister, QuantumRegister, transpile
from qiskit.providers.aer import AerSimulator
from qiskit.visualization import plot_histogram

  from qiskit.providers.aer import AerSimulator


In [2]:

class CustomQAOA:
    def __init__(self, assets, gamma_coeff=1.0, penalty=10.0, budget=2, layers=2, stepsize=0.02, max_steps=600, Nshots=1024):
        """
        Initialize the CustomQAOA optimizer for portfolio optimization.

        Parameters:
        - assets (list): List of stock tickers.
        - gamma_coeff (float): Coefficient for the risk term in the Hamiltonian.
        - penalty (float): Penalty coefficient for the budget constraint.
        - budget (int): Desired number of assets in the portfolio.
        - layers (int): Number of QAOA layers.
        - stepsize (float): Step size for the optimizer.
        - max_steps (int): Maximum number of optimization steps.
        - Nshots (int): Number of shots for the quantum simulator.
        """
        # Portfolio and QAOA Parameters
        self.assets = assets
        self.N = len(assets)
        self.gamma_coeff = gamma_coeff
        self.penalty = penalty
        self.budget = budget
        self.layers = layers
        self.stepsize = stepsize
        self.max_steps = max_steps
        self.Nshots = Nshots

        # Initialize Quantum Backend (Using Aer Simulator)
        self.backend = AerSimulator()

        # Fetch Data and Build QUBO
        self._fetch_data()
        self.Q = self._build_qubo()

        # Initialize QAOA Parameters (betas and gammas)
        self.init_params = self._initialize_parameters()

    # ----------------------- Helper Functions -----------------------

    def _num2bin(self, x, r):
        """
        Convert a number to its binary representation with r bits.
        """
        return format(x, '0{}b'.format(r))

    def _all_combinations(self, n):
        """
        Generate all binary combinations of length n.
        """
        return [''.join(seq) for seq in itertools.product('01', repeat=n)]

    def _counts_to_probs(self, counts):
        """
        Convert Qiskit counts to a probability list.
        """
        bits = len(next(iter(counts)))
        probs = []
        total = sum(counts.values())
        for comb in self._all_combinations(bits):
            probs.append(counts.get(comb, 0) / total)
        return probs

    def _max_string(self, counts):
        """
        Return the bitstring with the highest count.
        """
        return max(counts, key=counts.get)

    def _eval_solution(self, x, m):
        """
        Evaluate the objective function x^T Q x.
        """
        return np.matmul(np.matmul(x, m), x)

    def _brute_force_solver(self, m):
        """
        Brute-force solver to find the optimal bitstring adhering to the budget.
        """
        combinations = itertools.product([0, 1], repeat=self.N)
        best_cost = math.inf
        best_vector = None
        for x in combinations:
            if sum(x) != self.budget:
                continue  # Enforce the budget constraint
            v = np.array(x)
            cost = self._eval_solution(v, m)
            if cost < best_cost:
                best_cost = cost
                best_vector = x
        return best_vector, best_cost

    def _matrix_convert_inv(self, m):
        """
        Convert the QUBO matrix as per QAOA requirements.
        This function ensures that the QUBO matrix is correctly scaled.
        """
        buffer = np.zeros((self.N, self.N))
        for i in range(self.N):
            for j in range(self.N):
                if i == j:
                    buffer[i][j] += m[i][j] / 2  # Diagonal terms
                else:
                    buffer[i][j] += m[i][j] / 4  # Off-diagonal terms
        return buffer

    def _add_gates(self, qr, qc, m1, gamma):
        """
        Add cost unitary gates to the quantum circuit.
        """
        for i in range(len(m1)):
            for j in range(i+1, len(m1)):  # Ensure i < j to avoid double application
                if m1[i][j] != 0:
                    qc.cx(qr[i], qr[j])
                    qc.rz(gamma * m1[i][j], qr[j])
                    qc.cx(qr[i], qr[j])
        return qc

    def _fetch_data(self):
        """
        Fetch historical stock data and compute returns and covariance.
        """
        StockStartDate = '2022-01-01'
        StockEndDate = '2023-01-01'
        interval = '1d'

        # Download Adjusted Close prices
        df = yf.download(self.assets, start=StockStartDate,
                         end=StockEndDate, interval=interval)['Adj Close']

        # Check for NaN values and drop them
        df = df.dropna()

        # Calculate daily returns
        ret = df.pct_change().dropna()

        # Annualize mean returns and covariance matrix
        self.R = ret.mean() * 252  # Annualized mean returns
        self.Sigma = ret.cov() * 252  # Annualized covariance matrix

        # Check for negative returns
        if (self.R < 0).any():
            print("Warning: Some assets have negative mean returns.")

        print("\nAnnualized Mean Returns:\n", self.R)
        print("\nAnnualized Covariance Matrix:\n", self.Sigma)

    def _build_qubo(self):
        """
        Construct the QUBO matrix for portfolio optimization with sum(x_i) = budget constraint.
        """
        Q = np.zeros((self.N, self.N))

        # Penalty coefficient
        P = self.penalty

        # Construct the QUBO matrix
        for i in range(self.N):
            for j in range(self.N):
                if i == j:
                    # Diagonal terms: γ * Σ_ii - R_i + P * (1 - 2B)
                    Q[i][j] += self.gamma_coeff * self.Sigma.iloc[i, j] - \
                        self.R.iloc[i] + P * (1 - 2 * self.budget)
                elif j > i:
                    # Off-diagonal terms: γ * Σ_ij + 2P
                    Q[i][j] += self.gamma_coeff * self.Sigma.iloc[i, j] + 2 * P
                    Q[j][i] = Q[i][j]  # Ensure symmetry

        print("\nQUBO Matrix:\n", Q)
        return Q

    def _initialize_parameters(self):
        """
        Initialize QAOA parameters (betas and gammas) randomly within a reasonable range.
        """
        number_of_parameters = 2 * self.layers
        x0 = [random.uniform(0, pi) for _ in range(
            number_of_parameters)]  # Non-negative values
        print("\nInitial Parameters:")
        print("Betas:", x0[:self.layers])
        print("Gammas:", x0[self.layers:])
        return x0

    # ----------------------- QAOA Methods -----------------------

    def _multi_layer_qaoa_expectation(self, m, betas, gammas):
        """
        Multi-layer QAOA execution to calculate expectation value.
        """
        mPauli = self._matrix_convert_inv(m)
        qr = QuantumRegister(self.N, 'q')
        cr = ClassicalRegister(self.N, 'c')
        qc = QuantumCircuit(qr, cr)

        # Initialize with Hadamard gates
        for i in range(self.N):
            qc.h(qr[i])
        qc.barrier()

        # Apply p layers of QAOA
        for j in range(len(betas)):
            # Add cost unitary
            self._add_gates(qr, qc, mPauli, gammas[j])
            qc.barrier()

            # Add mixer unitary (RY gates)
            for k in range(self.N):
                qc.ry(betas[j], qr[k])
            qc.barrier()

        # Measurement
        qc.measure(qr, cr)

        # Transpile and run
        qc_transpiled = transpile(qc, self.backend)
        job = self.backend.run(qc_transpiled, shots=self.Nshots)
        counts = job.result().get_counts()

        # Calculate expectation value
        expectation = 0
        total_counts = sum(counts.values())
        for bitstring, count in counts.items():
            x = [int(bit) for bit in bitstring]
            obj = self._eval_solution(x, m)
            expectation += obj * count
        expectation /= total_counts

        return expectation

    def _qaoa_optimize_expectation(self, m, layer, method="Nelder-Mead"):
        """
        Optimize QAOA parameters to minimize the expectation value.
        """
        def objective(x):
            size = len(x)
            size2 = size // 2
            betas = x[:size2]
            gammas = x[size2:]
            expectation = self._multi_layer_qaoa_expectation(m, betas, gammas)
            return expectation

        print("\n--- QAOA Optimization with Expectation Value ---")
        print("Number of Layers:", layer)
        print("Initial Parameters (Betas and Gammas):", self.init_params)

        # Optimization using specified method
        res1 = minimize(objective, self.init_params,
                        method=method, options={'maxiter': self.max_steps})
        res2 = minimize(objective, res1.x, method=method,
                        options={'maxiter': self.max_steps})

        best_betas = res2.x[:layer]
        best_gammas = res2.x[layer:]
        print("Best Betas:", best_betas)
        print("Best Gammas:", best_gammas)

        # Run QAOA with optimized parameters to get expectation value
        expectation = self._multi_layer_qaoa_expectation(
            m, best_betas, best_gammas)
        print("\n--- Optimization Results ---")
        print("Expectation Value:", expectation)

        return expectation, res1, res2, best_betas, best_gammas

    # ----------------------- Main Run Function -----------------------
    def _brute_force_solver(self, m):
        """
        Brute-force solver to find the optimal bitstring adhering to the budget.
        """
        combinations = itertools.product([0, 1], repeat=self.N)
        best_cost = math.inf
        best_vector = None
        print("\nFinding the exact solution using brute force...")
        for x in combinations:
            if sum(x) != self.budget:
                continue  # Enforce the budget constraint
            v = np.array(x)
            cost = self._eval_solution(v, m)
            print(f"Option Bit String: {x} with Eigen Value = {cost:.4f}")
            if cost < best_cost:
                best_cost = cost
                best_vector = x
                print(f"New optimal found: {x} with E_g = {cost:.4f}")
        return best_vector, best_cost

    def _run_qaoa_circuit(self, m, betas, gammas):
        """
        Run QAOA circuit with given betas and gammas and return measurement counts.
        """
        mPauli = self._matrix_convert_inv(m)
        qr = QuantumRegister(self.N, 'q')
        cr = ClassicalRegister(self.N, 'c')
        qc = QuantumCircuit(qr, cr)

        # Initialize with Hadamard gates
        for i in range(self.N):
            qc.h(qr[i])
        qc.barrier()

        # Apply p layers of QAOA
        for j in range(len(betas)):
            # Add cost unitary
            self._add_gates(qr, qc, mPauli, gammas[j])
            qc.barrier()

            # Add mixer unitary (RY gates)
            for k in range(self.N):
                qc.ry(2 * betas[j], qr[k])  # Note the factor of 2
            qc.barrier()

        # Measurement
        qc.measure(qr, cr)

        # Transpile and run
        qc_transpiled = transpile(qc, self.backend)
        job = self.backend.run(qc_transpiled, shots=self.Nshots)
        counts = job.result().get_counts()

        return counts, qc

    def run(self):
        """
        Execute the full QAOA-based portfolio optimization process and present the results.
        """
        print("\n=== Starting Portfolio Optimization using QAOA ===")

        # Step 1: Expectation-Based QAOA Optimization
        print("\n--- Running Expectation-Based QAOA Optimization ---")
        expectation, res1, res2, best_betas, best_gammas = self._qaoa_optimize_expectation(
            m=self.Q,
            layer=self.layers,
            method="Nelder-Mead"
        )

        print("\n--- Optimization Results ---")
        print("Optimized objective function value:", expectation)

        # Step 2: Run QAOA Circuit with Optimized Parameters
        counts, qc = self._run_qaoa_circuit(self.Q, best_betas, best_gammas)

        # Process measurement results
        total_counts = sum(counts.values())
        probabilities = {k: v / total_counts for k, v in counts.items()}
        sorted_probs = sorted(probabilities.items(),
                              key=lambda item: item[1], reverse=True)

        # Find the most frequent bitstring that satisfies the budget constraint
        for bitstring, prob in sorted_probs:
            # Reverse due to endianness
            bit_list = [int(bit) for bit in bitstring[::-1]]
            if sum(bit_list) == self.budget:
                optimal_bitstring = bit_list
                optimal_cost = self._eval_solution(optimal_bitstring, self.Q)
                print("\n--- QAOA Bitstring Results ---")
                print("Optimal Bitstring:", optimal_bitstring)
                print("Objective Value:", optimal_cost)
                print("Probability of Optimal Bitstring:", prob)
                break
        else:
            print("\nNo valid bitstring found that satisfies the budget constraint.")

        # Visualize Measurement Outcomes
        print("\n--- Plotting Measurement Histogram ---")
        plot_histogram(counts)
        plt.show()

        print("\n=== Brute-Force Optimization ===")
        best_vector, best_cost = self._brute_force_solver(self.Q)
        classical_bitstring = best_vector  # Do not reverse here
        print(f"\nExact solution: {classical_bitstring}")
        print("Brute-Force Optimal Cost:", best_cost)

        # Step 4: Display Selected Assets from Brute-Force Solution
        asset_selection = {asset: bit for asset,
                           bit in zip(self.assets, classical_bitstring)}
        selected_assets = [asset for asset,
                           bit in asset_selection.items() if bit == 1]
        print("\nSelected Assets in Optimal Portfolio:", selected_assets)

        print("\n=== Portfolio Optimization Completed ===")

In [None]:
# Define the list of assets (tickers)
assets = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'TSLA']  # Example: 5 tech stocks
# Instantiate the CustomQAOA class with desired parameters
qaoa_optimizer = CustomQAOA(
    assets=assets,
    gamma_coeff=1.0,    # Adjust based on risk preference
    penalty=100.0,      # Increased penalty to enforce budget constraint
    budget=2,           # Desired number of assets in the portfolio
    layers=2,           # Number of QAOA layers
    stepsize=0.02,      # Optimizer step size
    max_steps=600,      # Maximum optimization steps
    Nshots=1024         # Number of shots for simulation
)
# Execute the optimization process
qaoa_optimizer.run()