This runs with in Qiskit 1.2.0 environment provided by qbraid.

In [88]:
import os
import numpy as np
import pandas as pd
from qiskit.quantum_info import SparsePauliOp
from qiskit_algorithms import QAOA, SamplingVQE
from qiskit_algorithms.optimizers import COBYLA

Sampler setup

In [None]:
USE_BACKEND = "aer"

from qiskit_aer.primitives import Sampler as AerSampler
sampler = AerSampler(run_options={"shots": 4096})

Loading data

In [None]:
tickers_path = "data/prices.csv"

if not os.path.exists(tickers_path):
    import yfinance as yf
    os.makedirs("data", exist_ok=True)
    tickers = ['GOOG', 'XOM', 'AAPL', 'AMZN', 'GLD', 'DUK', 'SO', 'AEP']
    df = yf.download(tickers, start="2017-01-01", end="2017-03-01",
                     group_by="ticker", auto_adjust=True)
    df = df.stack(level=0).rename_axis(['Date', 'Ticker']).reset_index()
    df.to_csv(tickers_path, index=False)
else:
    df = pd.read_csv(tickers_path)

df["Date"] = pd.to_datetime(df["Date"])
df = df.sort_values(["Ticker", "Date"])
df["Return"] = df.groupby("Ticker")["Close"].pct_change().fillna(0)
df["LogReturn"] = np.log1p(df["Return"])

log_return_vectors = df.groupby("Ticker")["LogReturn"].apply(np.array)
log_return_vectors = log_return_vectors.apply(lambda x: x[x != 0])

tickers = log_return_vectors.index.tolist()
stacked_data = np.vstack(log_return_vectors.values)
X = stacked_data.T
J = stacked_data @ stacked_data.T
print("X shape:", X.shape, " J shape:", J.shape)

X shape: (38, 8)  J shape: (8, 8)


Helper functions

In [None]:
def Phi(T):
    U, _, Vt = np.linalg.svd(T, full_matrices=False)
    return U @ Vt

def deflate_J(J, b):
    r = float(b.T @ J @ b)
    bbT = np.outer(b, b)
    return J - (2/r) * (J @ bbT @ J) + (J @ bbT @ J @ bbT @ J) / (r**2)

def ising_op_from_J(J):
    D = J.shape[0]
    paulis, coeffs = [], []
    for i in range(D):
        for j in range(i+1, D):
            coeff = -2.0 * J[i, j]
            if abs(coeff) > 1e-12:
                z = ['I'] * D
                z[i] = z[j] = 'Z'
                paulis.append(''.join(reversed(z)))
                coeffs.append(coeff)
    if not paulis:
        return SparsePauliOp.from_list([('I'*D, 0.0)])
    return SparsePauliOp.from_list(list(zip(paulis, coeffs)))

QAOA solver for each component

In [None]:
def solve_component_qaoa(J, sampler, reps=2, shots=4096):
    """
    Solve for spin vector b ∈ {-1,+1}^D that maximizes bᵀ J b
    using QAOA (Qiskit 1.2+).
    """
    H = ising_op_from_J(J)
    opt = COBYLA(maxiter=200)

    qaoa = QAOA(sampler=sampler, optimizer=opt, reps=reps)
    result = qaoa.compute_minimum_eigenvalue(H)

    qdist = result.eigenstate  # quasi-distribution
    best_key, best_prob = max(qdist.items(), key=lambda kv: kv[1])

    # Handle both string and integer key types
    if isinstance(best_key, str):
        # Already a bitstring like "0101"
        bits = np.array([int(c) for c in best_key[::-1]])
    else:
        # Convert integer index to bitstring of correct length
        num_qubits = H.num_qubits
        bitstring = format(best_key, f"0{num_qubits}b")
        bits = np.array([int(c) for c in bitstring[::-1]])

    b = 1 - 2 * bits  # map 0→+1, 1→−1

    print(f"[QAOA] best bitstring: {best_key}, as bits={bits[::-1]}")
    return b.astype(float)


def do_l1_pca(J_init, X, K, sampler):
    J = J_init.copy()
    components = []
    for _ in range(K):
        b = solve_component_qaoa(J, sampler)
        components.append(b)
        J = deflate_J(J, b)
    Bopt = np.vstack(components).T
    X_Bopt = X @ Bopt
    R_L1 = Phi(X_Bopt)
    emb = R_L1.T @ X
    return Bopt, R_L1, emb

In [None]:
K = 1
Bopt, R_L1, emb = do_l1_pca(J, X, K, sampler)
print("Bopt:", Bopt[:, 0])

[QAOA] best bitstring: 218, as bits=[1 1 0 1 1 0 1 0]
Bopt: [ 1. -1.  1. -1. -1.  1. -1. -1.]


Partition tickers by sign

In [None]:
group_pos = [tickers[i] for i, v in enumerate(Bopt[:,0]) if v >= 0]
group_neg = [tickers[i] for i, v in enumerate(Bopt[:,0]) if v < 0]

print("\nPartitioned Tickers based on L1 PCA (QAOA):")
print("Group 1 (≥ 0):", group_pos)
print("Group 2 (< 0):", group_neg)


Partitioned Tickers based on L1 PCA (QAOA):
Group 1 (≥ 0): ['AAPL', 'AMZN', 'GOOG']
Group 2 (< 0): ['AEP', 'DUK', 'GLD', 'SO', 'XOM']


For K=2

In [92]:
K = 2
Bopt, R_L1, emb = do_l1_pca(J, X, K, sampler)
print("Bopt:", Bopt[:, 0])

[QAOA] best bitstring: 218, as bits=[1 1 0 1 1 0 1 0]
[QAOA] best bitstring: 0, as bits=[0 0 0 0 0 0 0 0]
Bopt: [ 1. -1.  1. -1. -1.  1. -1. -1.]


In [93]:
group_pos = [tickers[i] for i, v in enumerate(Bopt[:,0]) if v >= 0]
group_neg = [tickers[i] for i, v in enumerate(Bopt[:,0]) if v < 0]

print("\nPartitioned Tickers based on L1 PCA (QAOA):")
print("Group 1 (≥ 0):", group_pos)
print("Group 2 (< 0):", group_neg)


Partitioned Tickers based on L1 PCA (QAOA):
Group 1 (≥ 0): ['AAPL', 'AMZN', 'GOOG']
Group 2 (< 0): ['AEP', 'DUK', 'GLD', 'SO', 'XOM']
