# Solving $l_0$ compressed sensing on random linear system with various classical and quantum solvers

## Using classical solvers

In [1]:
!uv add "cvxpy[CVXOPT,GLPK,GUROBI,SCIP]"

[2K[2mResolved [1m91 packages[0m [2min 381ms[0m[0m                                        [0m
[2K[2mPrepared [1m6 packages[0m [2min 1.20s[0m[0m                                             
[2mUninstalled [1m1 package[0m [2min 198ms[0m[0m
[2K[2mInstalled [1m12 packages[0m [2min 51ms[0m[0m                               [0m
 [32m+[39m [1mcffi[0m[2m==1.17.1[0m
 [32m+[39m [1mclarabel[0m[2m==0.11.0[0m
 [32m+[39m [1mcvxopt[0m[2m==1.3.2[0m
 [32m+[39m [1mcvxpy[0m[2m==1.6.5[0m
 [32m+[39m [1mgurobipy[0m[2m==12.0.2[0m
 [32m+[39m [1mjinja2[0m[2m==3.1.6[0m
 [32m+[39m [1mmarkupsafe[0m[2m==3.0.2[0m
 [32m+[39m [1mosqp[0m[2m==1.0.4[0m
 [31m-[39m [1mpandas[0m[2m==2.3.0[0m
 [32m+[39m [1mpandas[0m[2m==2.2.3[0m
 [32m+[39m [1mpycparser[0m[2m==2.22[0m
 [32m+[39m [1mpyscipopt[0m[2m==5.5.0[0m
 [32m+[39m [1mscs[0m[2m==3.2.7.post2[0m


In [None]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [1]:
import numpy as np
import time
import cvxpy as cp

def solve_l0_cvxpy_miqp(
        A_matrix,       # Sensing matrix (M, N)
        y_vector,       # Measurement vector (M,)
        lambda_val,     # Sparsity regularization parameter for L0 norm
        N_signal,       # Number of signal components
        M_measure,      # Number of measurements
        big_M_value=None, # Big-M for constraints s_i vs x_i
        solver=None,    # CVXPY solver (e.g., cp.GUROBI, cp.MOSEK, cp.ECOS_BB)
        verbose_solver=False,
        verbose_script=True):
    """
    Solves L0-regularized problem: min 0.5 ||A s - y||^2 + lambda ||s||_0
    as a Mixed-Integer Quadratic Program (MIQP) using CVXPY.

    Returns s_recovered (signal values), x_recovered (binary support).`
    """

    # --- Define CVXPY Variables ---
    s_vars = cp.Variable(N_signal, name="s")  # Continuous signal values
    x_vars = cp.Variable(N_signal, boolean=True, name="x") # Binary support variables

    # --- Estimate Big-M if not provided ---
    # Should be an upper bound on any |s_i|
    if big_M_value is None:
        try:
            # A rough estimate based on a simple least squares solution
            s_lstsq_temp = np.linalg.lstsq(A_matrix, y_vector, rcond=None)[0]
            big_M_value = 5 * np.max(np.abs(s_lstsq_temp)) if np.any(s_lstsq_temp) else 100.0
        except np.linalg.LinAlgError: # Fallback if lstsq fails (e.g. M < N, underdetermined)
             big_M_value = 100.0 # Default if lstsq fails
        if big_M_value < 1.0 : big_M_value = 10.0 # Ensure it's reasonably large
        if verbose_script: print(f"  CVXPY MIQP: Estimated Big-M = {big_M_value:.2f}")

    if np.isinf(big_M_value) or np.isnan(big_M_value) or big_M_value <=0 :
        if verbose_script: print(f"  CVXPY MIQP: Invalid Big-M {big_M_value}, defaulting to 100.0")
        big_M_value = 100.0


    # --- Define Objective Function ---
    # 0.5 * ||A s - y||^2_2 + lambda * sum(x_i)
    # CVXPY's sum_squares is ||.||^2, so we need 0.5 factor.
    l2_term = 0.5 * cp.sum_squares(A_matrix @ s_vars - y_vector)
    l0_term = lambda_val * cp.sum(x_vars)
    objective = cp.Minimize(l2_term + l0_term)

    # --- Define Constraints (Big-M formulation) ---
    # -M x_i <= s_i <= M x_i
    constraints = []
    for i in range(N_signal):
        constraints.append(s_vars[i] <= big_M_value * x_vars[i])
        constraints.append(s_vars[i] >= -big_M_value * x_vars[i])
        # constraints.append(cp.abs(s_vars[i]) <= big_M_value * x_vars[i]) # Alternative for some solvers

    # --- Create and Solve Problem ---
    problem = cp.Problem(objective, constraints)

    s_recovered = np.zeros(N_signal)
    x_recovered = np.zeros(N_signal)

    if verbose_script:
        print(f"  CVXPY MIQP: Solving with N={N_signal}, M={M_measure}, lambda={lambda_val}, BigM={big_M_value:.1f}")
        if solver: print(f"  Using solver: {solver}")
        else: print("  Using default CVXPY MIQP solver (ensure one is installed: GUROBI, MOSEK, CPLEX, CBC, GLPK_MI, ECOS_BB etc.)")

    try:
        # Solve the problem
        # ECOS_BB is a common open-source mixed-integer SOCP solver that CVXPY can use.
        # GLPK_MI can handle MIQPs if the quadratic part is convex (which ours is).
        # If commercial solvers like GUROBI or MOSEK are installed, CVXPY will find them.
        if solver:
            problem.solve(solver=solver, verbose=verbose_solver)
        else:
            problem.solve(verbose=verbose_solver) # Let CVXPY pick

        if problem.status in [cp.OPTIMAL, cp.OPTIMAL_INACCURATE]:
            if verbose_script: print(f"  CVXPY MIQP: Optimization successful (Status: {problem.status}). Objective value: {problem.value:.4e}")
            s_recovered = s_vars.value
            x_recovered = (x_vars.value > 0.5).astype(float) # Ensure binary from boolean var

            # It's possible s_vars.value might have small non-zeros where x_vars.value is ~0.
            # Enforce s_i = 0 if x_i = 0.
            if s_recovered is not None and x_recovered is not None:
                 s_recovered[x_recovered < 0.5] = 0.0
            else: # Handle cases where solution is None despite OPTIMAL status (rare)
                 if verbose_script: print("  CVXPY MIQP: Warning - s_vars or x_vars value is None despite optimal status.")
                 s_recovered = np.zeros(N_signal) # Fallback
                 x_recovered = np.zeros(N_signal) # Fallback


        else:
            if verbose_script: print(f"  CVXPY MIQP: Optimization FAILED. Status: {problem.status}")
            s_recovered = np.zeros(N_signal) # Fallback
            x_recovered = np.zeros(N_signal) # Fallback
            if s_vars.value is not None : s_recovered = s_vars.value # Use partial if available
            if x_vars.value is not None : x_recovered = (x_vars.value > 0.5).astype(float)
            s_recovered[x_recovered < 0.5] = 0.0


    except Exception as e:
        if verbose_script: print(f"  CVXPY MIQP: An error occurred during solve: {e}")
        s_recovered = np.zeros(N_signal) # Fallback
        x_recovered = np.zeros(N_signal) # Fallback

    return s_recovered, x_recovered


In [2]:
# --- Main Test Script ---
if __name__ == '__main__':
    N_signal = 25
    M_measurements = 15
    sparsity_true = 10
    lambda_l0_param = 0.01 # Lambda for the L0 objective

    np.random.seed(42)

    # 1. Generate a true sparse signal s_true
    s_true = np.zeros(N_signal)
    non_zero_indices = np.random.choice(N_signal, sparsity_true, replace=False)
    # Make true signal values distinct and somewhat large to test Big-M
    s_true[non_zero_indices] = (np.random.randn(sparsity_true) * 3) + np.copysign(5, np.random.randn(sparsity_true))
    print(f"True sparse signal (s_true) [{np.sum(s_true!=0)} non-zeros]:\n{s_true}")

    # 2. Generate a random sensing matrix A
    A_sensing_matrix = np.random.randn(M_measurements, N_signal)
    # A_sensing_matrix = A_sensing_matrix / np.linalg.norm(A_sensing_matrix, axis=0, keepdims=True)
    print(f"\nSensing matrix A shape: {A_sensing_matrix.shape}")

    # 3. Compute measurements y
    y_measurements = A_sensing_matrix @ s_true
    print(f"Measurement vector y shape: {y_measurements.shape}")

    # --- Call the CVXPY MIQP Solver ---
    print("\n--- Calling L0 Solver (CVXPY MIQP) ---")

    # Estimate Big-M based on the true signal's max possible value, or use a generous fixed one
    # M_val_heuristic = 1.1 * np.max(np.abs(s_true)) if np.any(s_true) else 100.0
    M_val_heuristic = None # Let the solver estimate

    # Try to specify a solver if you have one installed that's good for MIQP
    # Examples: cp.GUROBI, cp.MOSEK, cp.CPLEX, cp.XPRESS
    # Open-source options for MIQP (may need to be installed separately & CVXPY bindings):
    # cp.SCIP, cp.HIGHS (HiGHS can do MIQP)
    # For general mixed-integer convex programs (incl. MIQP if Q is PSD):
    # cp.ECOS_BB (often default for small mixed-integer SOCP/QP)
    # cp.GLPK_MI (can sometimes handle MIQP)
    # cp.CBC (via cylp - primarily MILP, but can be tried)

    # Let's try ECOS_BB which is often bundled with CVXPY's default install
    # Or GLPK_MI if available. If you have Gurobi/Mosek, that's best.
    # If `solver=None`, CVXPY will try to pick one.

    # Check available solvers:
    # print("Installed MIQP-capable solvers:", cp.installed_solvers())
    # Pick one based on availability. ECOS_BB is a good general one if others aren't present.
    # GUROBI, MOSEK, CPLEX are best for MIQP.
    # SCIP is a strong open-source option.
    # ECOS_BB can handle MIQPs.

    available_solvers = cp.installed_solvers()
    print(f"Installed MIQP-capable solvers: {available_solvers}")
    # miqp_solver_to_use = None
    # if cp.GUROBI in available_solvers: miqp_solver_to_use = cp.GUROBI
    # elif cp.MOSEK in available_solvers: miqp_solver_to_use = cp.MOSEK
    # elif cp.CPLEX in available_solvers: miqp_solver_to_use = cp.CPLEX
    # elif cp.XPRESS in available_solvers: miqp_solver_to_use = cp.XPRESS
    # elif cp.SCIP in available_solvers: miqp_solver_to_use = cp.SCIP # May need to ensure SCIP supports MIQP via CVXPY well
    # elif cp.ECOS_BB in available_solvers: miqp_solver_to_use = cp.ECOS_BB
    # elif cp.GLPK_MI in available_solvers: miqp_solver_to_use = cp.GLPK_MI

    # miqp_solver_to_use = cp.ECOS_BB # Defaulting to ECOS_BB for broader compatibility
    # miqp_solver_to_use = cp.GLPK_MI # Try if ECOS_BB fails or is slow

    # --- WORKING SOLVERS--------------
    # miqp_solver_to_use = cp.SCIP # To let CVXPY choose
    miqp_solver_to_use = cp.GUROBI # To let CVXPY choose

    start_time = time.time()
    s_recovered_cvxpy, x_recovered_cvxpy = solve_l0_cvxpy_miqp(
        A_matrix=A_sensing_matrix,
        y_vector=y_measurements,
        lambda_val=lambda_l0_param,
        N_signal=N_signal,
        M_measure=M_measurements,
        big_M_value=M_val_heuristic,
        solver=miqp_solver_to_use,
        verbose_solver=False, # Set to True for detailed solver logs
        verbose_script=True
    )
    end_time = time.time()
    print(f"\nCVXPY MIQP Solver finished in {end_time - start_time:.2f} seconds.")

    # Reconstruct signal s = R*x where R is s_recovered
    # s_recovered_cvxpy already has 0s where x_recovered_cvxpy is 0 if solver worked.
    s_reconstructed_cvxpy = s_recovered_cvxpy

    print(f"\nTrue s:              {np.array2string(s_true, precision=3, floatmode='fixed')}")
    # s_recovered_cvxpy contains the actual signal values
    print(f"Recovered s (CVXPY): {np.array2string(s_recovered_cvxpy, precision=3, floatmode='fixed')}")
    print(f"Recovered x (CVXPY): {np.array2string(x_recovered_cvxpy, precision=0, floatmode='fixed')}")

    mse_s = np.mean((s_true - s_reconstructed_cvxpy)**2)
    mse_y = np.mean((y_measurements - A_sensing_matrix @ s_reconstructed_cvxpy)**2)
    recovered_sparsity = np.sum(x_recovered_cvxpy > 0.5)

    l0_obj_val = 0.5 * np.sum((A_sensing_matrix @ s_reconstructed_cvxpy - y_measurements)**2) + \
                 lambda_l0_param * recovered_sparsity

    print(f"\nMSE (s_true vs s_reconstructed): {mse_s:.4e}")
    print(f"MSE (y_true vs A@s_reconstructed): {mse_y:.4e}")
    print(f"True sparsity: {sparsity_true}, Recovered sparsity: {recovered_sparsity}")
    print(f"CVXPY L0 Objective Value (lambda={lambda_l0_param}): {l0_obj_val:.4e}")

    true_support = set(np.where(np.abs(s_true) > 1e-6)[0])
    recovered_support_cvxpy = set(np.where(x_recovered_cvxpy > 0.5)[0])
    print(f"True support: {true_support}")
    print(f"Recovered support (CVXPY): {recovered_support_cvxpy}")
    if true_support == recovered_support_cvxpy:
        if np.allclose(s_true[list(true_support)], s_reconstructed_cvxpy[list(true_support)], atol=1e-3): # Check values too
            print("SUCCESS: Exact support and close values recovery by CVXPY MIQP!")
        else:
            print("SUCCESS: Exact support recovery by CVXPY MIQP (values might differ slightly).")
    else:
        print("INFO: Support not exactly recovered by CVXPY MIQP.")

True sparse signal (s_true) [10 non-zeros]:
[-10.1747535   -9.2369111    0.           0.           0.
   4.3226711    0.           0.           5.72588681   5.942742
   0.          -8.03849336   0.          -7.72407223   0.
   0.         -10.73984073   0.           0.           0.
   0.           0.          -0.60305369   3.31313741   0.        ]

Sensing matrix A shape: (15, 25)
Measurement vector y shape: (15,)

--- Calling L0 Solver (CVXPY MIQP) ---
Installed MIQP-capable solvers: ['CLARABEL', 'CVXOPT', 'GLPK', 'GLPK_MI', 'GUROBI', 'OSQP', 'SCIP', 'SCIPY', 'SCS']
  CVXPY MIQP: Estimated Big-M = 31.03
  CVXPY MIQP: Solving with N=25, M=15, lambda=0.01, BigM=31.0
  Using solver: GUROBI
Restricted license - for non-production use only - expires 2026-11-23
  CVXPY MIQP: Optimization successful (Status: optimal). Objective value: 1.0000e-01

CVXPY MIQP Solver finished in 1.31 seconds.

True s:              [-10.175  -9.237   0.000   0.000   0.000   4.323   0.000   0.000   5.726
   5.943 

## Dirac-3 Solver

### Optimization Problem Summary

The goal is to solve the L0-regularized sparse recovery problem, which seeks a sparse signal `R` that best explains a measurement `y`:

$$\min_{\sigma \in \{0,1\}^N, R \in \mathbb{R}^N} \left( \frac{1}{2} \| y - A(\sigma \circ R) \|_2^2 + \lambda \|\sigma\|_0 \right)$$

To adapt this for a specialized solver that only accepts non-negative variables and a polynomial objective, we perform several transformations:

1.  **Relaxation of Sparsity**: The binary sparsity-inducing variable $\sigma_r \in \{0,1\}$ is relaxed to a continuous variable $x_r \in [0,1]$. To enforce this, a penalty term is added to encourage $x_r$ to be close to 0 or 1.
2.  **Decomposition of `R`**: The real-valued signal $R_r$ is decomposed into two non-negative variables, $R_r = R_{+,r} - R_{-,r}$, where $R_{+,r}, R_{-,r} \ge 0$. Two penalties are added: one to ensure their product $R_{+,r}R_{-,r}$ is near zero, and an optional second penalty to keep their values below a specified maximum, $R_{max}$.

This results in the following unconstrained polynomial objective function over non-negative variables $(x, R_+, R_-)$:

$$
\begin{aligned}
\min_{x, R_+, R_- \ge 0} \Bigg( & \underbrace{\frac{1}{2} \| y - A(x \circ (R_+ - R_-)) \|_2^2}_{\text{Original Objective}} + \underbrace{\lambda \sum_{r=1}^N x_r}_{\text{L0 Approx.}} \\
& + \underbrace{\mu \sum_{r=1}^N x_r(1-x_r)}_{\text{Binary Penalty}} + \underbrace{P \sum_{r=1}^N R_{+,r}R_{-,r}}_{\text{Exclusivity Penalty}} \\
& + \underbrace{P_{bound} \sum_{r=1}^N \left( R_{+,r}(R_{max} - R_{+,r}) + R_{-,r}(R_{max} - R_{-,r}) \right)}_{\text{Optional Bounding Penalty}} \Bigg)
\end{aligned}
$$

The Python script `convert_problem_to_quartic_poly` transforms this final objective into a list of coefficients and variable indices suitable for the hardware solver.

Below we add slack variables for $R^+$ and $R^-$ in order to bound them

In [None]:
def convert_problem_to_quartic_poly(
    A_matrix, y_vector, 
    lambda_penalty, mu_relax_penalty, P_positivity_penalty,
    add_slack_for_sum_constraint=False,
    R_max_bound=None, P_bound_penalty=0.0
):
    """
    Converts the L0-sparse recovery problem to a quartic polynomial objective
    for a specialized solver that takes positive variables.

    This version includes an optional quadratic penalty to bound R+ and R- variables.

    Args:
        A_matrix (np.ndarray): The M x N measurement matrix A.
        y_vector (np.ndarray): The M x 1 measurement vector y.
        lambda_penalty (float): Coefficient for the original L0 norm penalty.
        mu_relax_penalty (float): Coefficient for the relaxation penalty sum_r x_r(1-x_r).
        P_positivity_penalty (float): Coefficient for the penalty sum_r R_{+r}R_{-r}.
        add_slack_for_sum_constraint (bool): If True, adds a slack variable to
            nullify a mandatory sum constraint in the solver.
        R_max_bound (float, optional): The target upper bound for R+ and R- variables.
            If provided, P_bound_penalty must also be non-zero.
        P_bound_penalty (float, optional): The coefficient for the bounding penalty
            P_bound * R(R_max_bound - R). Defaults to 0.0 (no penalty).

    Returns:
        tuple: (poly_coeffs, poly_indices, total_num_vars)
    """
    A = np.asarray(A_matrix)
    y = np.asarray(y_vector)

    if A.ndim != 2 or y.ndim != 1 or A.shape[0] != y.shape[0]:
        raise ValueError("Input dimensions are incorrect.")

    if P_bound_penalty != 0 and R_max_bound is None:
        raise ValueError("If P_bound_penalty is set, R_max_bound must also be provided.")

    M, N = A.shape

    # Determine the total number of variables for the solver
    num_base_vars = 3 * N
    total_num_vars = num_base_vars + 1 if add_slack_for_sum_constraint else num_base_vars

    # Variable mapping (1-based indexing for solver)
    # x_r       -> r + 1
    # R_{+,r}   -> N + r + 1
    # R_{-,r}   -> 2*N + r + 1
    # S1 (slack) -> 3*N + 1

    b_vec = A.T @ y
    Q_mat = A.T @ A

    term_map = {}
    
    def add_term_to_map(coeff, var_indices_list):
        if coeff == 0:
            return
        
        sorted_actual_indices = sorted([v_idx for v_idx in var_indices_list if v_idx != 0])
        padded_list = [0] * 4
        current_len = len(sorted_actual_indices)
        for i in range(current_len):
            padded_list[4 - 1 - i] = sorted_actual_indices[current_len - 1 - i]
        
        final_indices_tuple = tuple(sorted(padded_list))
        term_map[final_indices_tuple] = term_map.get(final_indices_tuple, 0) + coeff

    # --- Populate the term map ---
    for r_idx in range(N):
        var_x_r = r_idx + 1
        var_R_plus_r = N + r_idx + 1
        var_R_minus_r = 2 * N + r_idx + 1

        # Original penalties
        add_term_to_map(lambda_penalty + mu_relax_penalty, [var_x_r])
        add_term_to_map(-mu_relax_penalty, [var_x_r, var_x_r])
        add_term_to_map(P_positivity_penalty, [var_R_plus_r, var_R_minus_r])

        # NEW: Add bounding penalties for R+ and R-
        # The penalty is -P_bound * R(R_max - R) to encourage R to be in [0, R_max].
        # We add P_bound * (R*R_max - R^2) to the objective.
        if P_bound_penalty != 0 and R_max_bound is not None:
            # Penalty for R+_r
            add_term_to_map(P_bound_penalty * R_max_bound, [var_R_plus_r]) # Linear term
            add_term_to_map(-P_bound_penalty, [var_R_plus_r, var_R_plus_r]) # Quadratic term
            # Penalty for R-_r
            add_term_to_map(P_bound_penalty * R_max_bound, [var_R_minus_r]) # Linear term
            add_term_to_map(-P_bound_penalty, [var_R_minus_r, var_R_minus_r]) # Quadratic term

        # Main objective terms
        br_val = b_vec[r_idx]
        if br_val != 0:
            add_term_to_map(-br_val, [var_x_r, var_R_plus_r])
            add_term_to_map(br_val, [var_x_r, var_R_minus_r])

        for s_idx in range(N):
            var_x_s = s_idx + 1
            var_R_plus_s = N + s_idx + 1
            var_R_minus_s = 2 * N + s_idx + 1
            
            Q_rs_val = Q_mat[r_idx, s_idx]
            if Q_rs_val == 0: continue
            
            coeff_val_rs = 0.5 * Q_rs_val
            add_term_to_map(coeff_val_rs, [var_x_r, var_R_plus_r, var_x_s, var_R_plus_s])
            add_term_to_map(-coeff_val_rs, [var_x_r, var_R_plus_r, var_x_s, var_R_minus_s])
            add_term_to_map(-coeff_val_rs, [var_x_r, var_R_minus_r, var_x_s, var_R_plus_s])
            add_term_to_map(coeff_val_rs, [var_x_r, var_R_minus_r, var_x_s, var_R_minus_s])

    final_poly_coeffs = []
    final_poly_indices = []
    for indices_tuple, summed_coeff in term_map.items():
        if summed_coeff != 0:
            final_poly_indices.append(list(indices_tuple))
            final_poly_coeffs.append(summed_coeff)
            
    return final_poly_coeffs, final_poly_indices, total_num_vars

def decode_solution(solver_solution, N, threshold=0.5):
    """
    Decodes the solution from the specialized solver back to the original
    problem's sigma and R vectors.
    """
    solution_vec = np.asarray(solver_solution)
    if solution_vec.size < 3 * N:
        raise ValueError(f"solver_solution has size {solution_vec.size}, but expected at least {3*N}.")

    x_vars = solution_vec[0:N]
    R_plus_vars = solution_vec[N:2*N]
    R_minus_vars = solution_vec[2*N:3*N]

    sigma_decoded = (x_vars >= threshold).astype(int)
    R_decoded = R_plus_vars - R_minus_vars
    R_decoded[sigma_decoded == 0] = 0.0

    return sigma_decoded, R_decoded


## Example

In [8]:
# if __name__ == '__main__':
# Example Usage (as per the problem description structure)
# N: number of (sigma_r, R_r) pairs
# M: dimension of y

N_vars = 3 # Number of original (sigma, R) variables
M_meas = 5 # Number of measurements

# Random example data
np.random.seed(1)
A_matrix = np.random.rand(M_meas, N_vars)
y_vector = np.random.rand(M_meas)

lambda_p = 0.1  # L0 penalty
eps = 1.0     # x(1-x) penalty
gamma = 10.0     # R+R- penalty

print(f"A_matrix (M x N = {M_meas} x {N_vars}):\n", A_matrix)
print(f"y_vector (M = {M_meas}):\n", y_vector)
print(f"lambda: {lambda_p}, eps: {eps}, gamma: {gamma}\n")

coeffsG, indicesG = convert_problem_to_quartic_poly(A_matrix, y_vector, lambda_p, eps, gamma)

print("Generated Polynomial Terms:")
print(f"Number of terms: {len(coeffsG)}")
# for i in range(len(coeffs)):
#     print(f"Coeff: {coeffs[i]:.4f}, Indices: {indices[i]}")

# Small test case from the problem description for index formatting
# E = 3.14 - 2*x1 + 3*x2 - 4*x1^2 + 5*x1*x2 - 6*x1*x2 + 7*x1^2*x2 - 8*x2^3
# Assuming N=2 (for x1, x2). Max degree 3 for this example.
# Our code is specific to the L0 problem and generates up to quartic.
# Test the padding and sorting of indices in `add_term_temp` logic:

print("\n--- Index Formatting Test (using internal logic similar to add_term_temp) ---")
test_poly_coeffs = []
test_poly_indices = []

# Test add_term_temp's core logic for formatting:
def test_format_indices(var_indices_list_in):
    sorted_actual_indices = sorted([v_idx for v_idx in var_indices_list_in if v_idx != 0])
    padded_list = [0] * 4 # Quartic
    current_len = len(sorted_actual_indices)
    for i in range(current_len):
        padded_list[4 - 1 - i] = sorted_actual_indices[current_len - 1 - i]
    return sorted(padded_list)

# Test cases for formatting (assuming quartic, so length 4)
# x1 (var index 1)
print(f"x1 ([1]): {test_format_indices([1])}") # Expected: [0,0,0,1]
# x1^2 ([1,1])
print(f"x1^2 ([1,1]): {test_format_indices([1,1])}") # Expected: [0,0,1,1]
# x1*x2 ([1,2])
print(f"x1*x2 ([1,2]): {test_format_indices([1,2])}") # Expected: [0,0,1,2]
# x1^2*x2 ([1,1,2])
print(f"x1^2*x2 ([1,1,2]): {test_format_indices([1,1,2])}") # Expected: [0,1,1,2]
# x1*x2*x3*x4 ([1,2,3,4])
print(f"x1*x2*x3*x4 ([1,2,3,4]): {test_format_indices([1,2,3,4])}") # Expected: [1,2,3,4]
# Constant term (empty list)
print(f"Constant ([]): {test_format_indices([])}") # Expected: [0,0,0,0]

# Example of a more complex output from the main function for a small N
if N_vars <= 3 and M_meas <=5: # Print details for small problems
      print("\nDetailed Polynomial Terms (small example):")
      for i in range(len(coeffsG)):
          print(f"  Term {i+1}: Coefficient = {coeffsG[i]:.4f}, Indices = {indicesG[i]}")

A_matrix (M x N = 5 x 3):
 [[4.17022005e-01 7.20324493e-01 1.14374817e-04]
 [3.02332573e-01 1.46755891e-01 9.23385948e-02]
 [1.86260211e-01 3.45560727e-01 3.96767474e-01]
 [5.38816734e-01 4.19194514e-01 6.85219500e-01]
 [2.04452250e-01 8.78117436e-01 2.73875932e-02]]
y_vector (M = 5):
 [0.67046751 0.4173048  0.55868983 0.14038694 0.19810149]
lambda: 0.1, eps: 1.0, gamma: 10.0

Generated Polynomial Terms:
Number of terms: 36

--- Index Formatting Test (using internal logic similar to add_term_temp) ---
x1 ([1]): [0, 0, 0, 1]
x1^2 ([1,1]): [0, 0, 1, 1]
x1*x2 ([1,2]): [0, 0, 1, 2]
x1^2*x2 ([1,1,2]): [0, 1, 1, 2]
x1*x2*x3*x4 ([1,2,3,4]): [1, 2, 3, 4]
Constant ([]): [0, 0, 0, 0]

Detailed Polynomial Terms (small example):
  Term 1: Coefficient = 1.1000, Indices = [0, 0, 0, 1]
  Term 2: Coefficient = -1.0000, Indices = [0, 0, 1, 1]
  Term 3: Coefficient = 10.0000, Indices = [0, 0, 4, 7]
  Term 4: Coefficient = -0.6260, Indices = [0, 0, 1, 4]
  Term 5: Coefficient = 0.6260, Indices = [0, 0, 1

## Run on Dirac-3

In [17]:
import qci_client as qc

In [18]:
client = qc.QciClient()
print(client.get_allocations()["allocations"]["dirac"])

{'metered': True, 'seconds': 8912, 'paid': True}


In [62]:
N_signal = 10
M_measurements = 5
sparsity_true = 3
lambda_p = 0.2 # Lambda for the L0 objective

np.random.seed(42)

# 1. Generate a true sparse signal s_true
s_true = np.zeros(N_signal)
non_zero_indices = np.random.choice(N_signal, sparsity_true, replace=False)
# Make true signal values distinct and somewhat large to test Big-M
s_true[non_zero_indices] = (np.random.randn(sparsity_true) * 3) + np.copysign(5, np.random.randn(sparsity_true))
print(f"True sparse signal (s_true) [{np.sum(s_true!=0)} non-zeros]:\n{s_true}")

# 2. Generate a random sensing matrix A
A_sensing_matrix = np.random.randn(M_measurements, N_signal)
# A_sensing_matrix = A_sensing_matrix / np.linalg.norm(A_sensing_matrix, axis=0, keepdims=True)
print(f"\nSensing matrix A shape: {A_sensing_matrix.shape}")

# 3. Compute measurements y
y_measurements = A_sensing_matrix @ s_true
print(f"Measurement vector y shape: {y_measurements.shape}")

True sparse signal (s_true) [3 non-zeros]:
[ 0.         -1.96845415  0.          0.          0.         -6.7426344
  0.          0.         -4.16287612  0.        ]

Sensing matrix A shape: (5, 10)
Measurement vector y shape: (5,)


In [217]:
eps = 0.5     # x(1-x) penalty
gamma = 0.5     # R+R- penalty

print(f"A_matrix (M x N = {M_meas} x {N_vars}):\n", A_sensing_matrix)
print(f"y_vector (M = {M_meas}):\n", y_measurements)
print(f"lambda: {lambda_p}, eps: {eps}, gamma: {gamma}\n")

coeffsG, indicesG, terms = convert_problem_to_quartic_poly(A_sensing_matrix, y_measurements, lambda_p, eps, gamma, add_slack_for_sum_constraint=True,
    R_max_bound=7, P_bound_penalty=0.01)

print("Generated Polynomial Terms:")
print(f"Number of terms: {len(coeffsG)}")
print(f"terms: {terms}")

A_matrix (M x N = 5 x 3):
 [[-2.61254901  0.95036968  0.81644508 -1.523876   -0.42804606 -0.74240684
  -0.7033438  -2.13962066 -0.62947496  0.59772047]
 [ 2.55948803  0.39423302  0.12221917 -0.51543566 -0.60025385  0.94743982
   0.291034   -0.63555974 -1.02155219 -0.16175539]
 [-0.5336488  -0.00552786 -0.22945045  0.38934891 -1.26511911  1.09199226
   2.77831304  1.19363972  0.21863832  0.88176104]
 [-1.00908534 -1.58329421  0.77370042 -0.53814166 -1.3466781  -0.88059127
  -1.1305523   0.13442888  0.58212279  0.88774846]
 [ 0.89433233  0.7549978  -0.20716589 -0.62347739 -1.50815329  1.09964698
  -0.17773212 -0.41038331  1.17971634 -0.89820794]]
y_vector (M = 5):
 [  5.75544502  -2.91167472  -8.26218749   6.63084194 -13.81170908]
lambda: 0.1, eps: 0.5, gamma: 0.5

Generated Polynomial Terms:
Number of terms: 300
terms: 31


In [218]:
from collections import Counter

poly_indices = indicesG
poly_coefficients = coeffsG

ind_dict = Counter(np.array(poly_indices).flatten().tolist())
num_vars = len(ind_dict)
max_deg = len(poly_indices[0])

print(f"num variables:{ num_vars}, max degree: {max_deg}")
print(f"num of zero in poly_indices: {ind_dict[0]} (if nonzero, calculate min_degree)")

num variables:31, max degree: 4
num of zero in poly_indices: 210 (if nonzero, calculate min_degree)


In [219]:
data = [{"idx": idx, "val": val} for idx, val in zip(poly_indices, poly_coefficients)]
file = {
    "file_name": "hello-world",
    "file_config": {
        "polynomial": {
            "num_variables": 31,
            "min_degree": 1,
            "max_degree": 4,
            "data": data,
        }
    }
}

In [220]:
file_response = client.upload_file(file=file)

In [221]:
# Build the job body to be submitted to the API.
# This is where the job type and the Dirac-3 device and its configuration are specified.
job_body = client.build_job_body(
    job_type='sample-hamiltonian',
    job_name='l0_run5', # user-defined string, optional
    job_tags=['eqcintern', 'l0'],  # user-defined list of string identifiers, optional
    job_params={'device_type': 'dirac-3', 'relaxation_schedule': 2, 'sum_constraint': 30, 'num_samples':100},
    polynomial_file_id=file_response['file_id'],
)

In [222]:
# Submit the job and await the result.
# response = client.process_job(job_body=job_body, wait=True)
job_response = client.process_job(job_body=job_body)
assert job_response["status"] == qc.JobStatus.COMPLETED.value

2025-06-10 09:45:52 - Dirac allocation balance = 4254 s
2025-06-10 09:45:52 - Job submitted: job_id='68483710b8b42e8edbee8050'
2025-06-10 09:45:52 - QUEUED
2025-06-10 09:45:55 - RUNNING
2025-06-10 09:59:50 - COMPLETED
2025-06-10 09:59:53 - Dirac allocation balance = 3428 s


In [223]:
import json

with open('l0_run5.json', 'w') as tfile:
    json.dump(job_response, tfile)

In [224]:
print(
    f"Result [x_1, x_2] = {job_response['results']['solutions'][0]} is " +
    ("optimal" if job_response["results"]["energies"][0] == -1 else "suboptimal")
)

Result [x_1, x_2] = [0, 0, 29.6756248, 0, 7e-07, 0, 0, 0, 1e-07, 0, 0, 0, 0.3243728, 1e-07, 2.5e-06, 0, 0, 0, 0, 0, 1e-07, 0, 0, 0, 0, 0, 1e-07, 0, 2e-07, 0, 0] is suboptimal


In [71]:
def decode_solution(solver_solution, N, threshold=0.5):
    """
    Decodes the solution from the specialized solver back to the original
    problem's sigma and R vectors.

    Args:
        solver_solution (list or np.ndarray): The vector of variable values
            returned by the solver. Length should be 3*N or 3*N+1.
        N (int): The number of original (sigma, R) variable pairs.
        threshold (float): The cutoff value (between 0 and 1) to determine
            if a relaxed x_r variable should be considered 1 or 0.

    Returns:
        tuple: (sigma_decoded, R_decoded)
            sigma_decoded (np.ndarray): The decoded binary vector representing the mask.
            R_decoded (np.ndarray): The decoded real-valued signal vector.
    """
    solution_vec = np.asarray(solver_solution)
    if solution_vec.size < 3 * N:
        raise ValueError(f"solver_solution has size {solution_vec.size}, but expected at least {3*N}.")

    # Parse the solver's solution vector, ignoring any slack variables at the end
    x_vars = solution_vec[0:N]
    R_plus_vars = solution_vec[N:2*N]
    R_minus_vars = solution_vec[2*N:3*N]

    # Decode sigma by applying the threshold to the relaxed x variables
    sigma_decoded = (x_vars >= threshold).astype(int)

    # Reconstruct the real-valued R vector from its positive and negative parts
    R_decoded = R_plus_vars - R_minus_vars
    
    # Enforce sparsity: where sigma is 0, R should also be 0.
    R_decoded[sigma_decoded == 0] = 0.0

    return sigma_decoded, R_decoded

In [225]:
decode_solution(job_response['results']['solutions'][0], 10, threshold=0.5)

(array([0, 0, 1, 0, 0, 0, 0, 0, 0, 0]),
 array([0.       , 0.       , 0.3243728, 0.       , 0.       , 0.       ,
        0.       , 0.       , 0.       , 0.       ]))

## Solve with Gurobi, SCIP

In [155]:
# miqp_solver_to_use = cp.GUROBI # To let CVXPY choose
miqp_solver_to_use = cp.SCIP

start_time = time.time()
s_recovered_cvxpy, x_recovered_cvxpy = solve_l0_cvxpy_miqp(
    A_matrix=A_sensing_matrix,
    y_vector=y_measurements,
    lambda_val=lambda_l0_param,
    N_signal=N_signal,
    M_measure=M_measurements,
    big_M_value=M_val_heuristic,
    solver=miqp_solver_to_use,
    verbose_solver=False, # Set to True for detailed solver logs
    verbose_script=True
)
end_time = time.time()
print(f"\nCVXPY MIQP Solver finished in {end_time - start_time:.2f} seconds.")

# Reconstruct signal s = R*x where R is s_recovered
# s_recovered_cvxpy already has 0s where x_recovered_cvxpy is 0 if solver worked.
s_reconstructed_cvxpy = s_recovered_cvxpy

print(f"\nTrue s:              {np.array2string(s_true, precision=3, floatmode='fixed')}")
# s_recovered_cvxpy contains the actual signal values
print(f"Recovered s (CVXPY): {np.array2string(s_recovered_cvxpy, precision=3, floatmode='fixed')}")
print(f"Recovered x (CVXPY): {np.array2string(x_recovered_cvxpy, precision=0, floatmode='fixed')}")

mse_s = np.mean((s_true - s_reconstructed_cvxpy)**2)
mse_y = np.mean((y_measurements - A_sensing_matrix @ s_reconstructed_cvxpy)**2)
recovered_sparsity = np.sum(x_recovered_cvxpy > 0.5)

l0_obj_val = 0.5 * np.sum((A_sensing_matrix @ s_reconstructed_cvxpy - y_measurements)**2) + \
             lambda_l0_param * recovered_sparsity

print(f"\nMSE (s_true vs s_reconstructed): {mse_s:.4e}")
print(f"MSE (y_true vs A@s_reconstructed): {mse_y:.4e}")
print(f"True sparsity: {sparsity_true}, Recovered sparsity: {recovered_sparsity}")
print(f"CVXPY L0 Objective Value (lambda={lambda_l0_param}): {l0_obj_val:.4e}")

true_support = set(np.where(np.abs(s_true) > 1e-6)[0])
recovered_support_cvxpy = set(np.where(x_recovered_cvxpy > 0.5)[0])
print(f"True support: {true_support}")
print(f"Recovered support (CVXPY): {recovered_support_cvxpy}")
if true_support == recovered_support_cvxpy:
    if np.allclose(s_true[list(true_support)], s_reconstructed_cvxpy[list(true_support)], atol=1e-3): # Check values too
        print("SUCCESS: Exact support and close values recovery by CVXPY MIQP!")
    else:
        print("SUCCESS: Exact support recovery by CVXPY MIQP (values might differ slightly).")
else:
    print("INFO: Support not exactly recovered by CVXPY MIQP.")

  CVXPY MIQP: Estimated Big-M = 15.83
  CVXPY MIQP: Solving with N=10, M=5, lambda=0.01, BigM=15.8
  Using solver: SCIP
  CVXPY MIQP: Optimization successful (Status: optimal). Objective value: 3.0000e-02

CVXPY MIQP Solver finished in 0.55 seconds.

True s:              [ 0.000 -1.968  0.000  0.000  0.000 -6.743  0.000  0.000 -4.163  0.000]
Recovered s (CVXPY): [ 0.000 -1.968  0.000  0.000  0.000 -6.743  0.000  0.000 -4.163  0.000]
Recovered x (CVXPY): [0. 1. 0. 0. 0. 1. 0. 0. 1. 0.]

MSE (s_true vs s_reconstructed): 1.2800e-21
MSE (y_true vs A@s_reconstructed): 4.8558e-21
True sparsity: 3, Recovered sparsity: 3
CVXPY L0 Objective Value (lambda=0.01): 3.0000e-02
True support: {8, 1, 5}
Recovered support (CVXPY): {8, 1, 5}
SUCCESS: Exact support and close values recovery by CVXPY MIQP!


## Matching Pursuits Methods

In [114]:
import l0core

In [199]:
import l0core.solvers as solvers
print(dir(solvers))


['__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'calculate_h_prime_relaxed', 'calculate_user_hamiltonian', 'compute_stochastic_gradient', 'grad_loss_fn_original_h', 'grad_loss_fn_single_var', 'hamiltonian_h_prime', 'hamiltonian_h_prime_single_var', 'itertools', 'jax', 'jnp', 'np', 'penalty_terms_original', 'penalty_terms_single_var', 'solve_alternating_mip', 'solve_alternating_mip_B', 'solve_alternating_relaxed_h_prime', 'solve_continuous_relaxation_original_hamiltonian', 'solve_continuous_relaxation_sgd', 'solve_continuous_relaxation_single_var', 'solve_matching_pursuit', 'solve_omp', 'solve_sigma_exact', 'total_loss_function_original_hamiltonian', 'total_loss_function_single_var', 'update_step_original_h', 'update_step_sgd', 'update_step_single_var']


In [206]:
from l0core.solvers import solve_alternating_mip_B, solve_matching_pursuit, solve_omp, calculate_user_hamiltonian, solve_cosamp

In [211]:
R_mip, sigma_mip, history_H_mip = solve_cosamp(
    A_cs_matrix=A_sensing_matrix, 
    y_cs_vec=y_measurements, 
    lambda_mip=0.1,
    target_sparsity=3,
    N_signal=N_signal, 
    M_measure=M_measurements,
    num_outer_iter=150, 
    num_R_iter=10, 
    num_sigma_iter=15, # Fewer iterations for quicker test
    initial_R=None, 
    initial_sigma=None,
    verbose=True
)

Starting CoSaMP with target sparsity s = 3
CoSaMP iter 1: |res|=5.2222e+00, support=[9, 5, 4], H=0.300000
CoSaMP iter 2: |res|=2.9678e+00, support=[5, 9, 8], H=0.300000
CoSaMP iter 3: |res|=8.7475e-15, support=[5, 1, 8], H=0.300000
CoSaMP converged: residual norm below threshold
Finished CoSaMP solver


In [210]:
print('sigma: ', sigma_mip, '\n R: ', R_mip)

sigma:  [0. 1. 0. 0. 0. 1. 0. 0. 1. 1.] 
 R:  [ 0.00000000e+00 -1.96845415e+00  0.00000000e+00  0.00000000e+00
  0.00000000e+00 -6.74263440e+00  0.00000000e+00  0.00000000e+00
 -4.16287612e+00  1.58470727e-15]


In [203]:
R_mip, sigma_mip, history_H_mip = solve_omp(
    A_cs_matrix=A_sensing_matrix, 
    y_cs_vec=y_measurements, 
    lambda_mip=0.1,
    N_signal=N_signal, 
    M_measure=M_measurements,
    num_outer_iter=50, 
    num_R_iter=10, 
    num_sigma_iter=15, # Fewer iterations for quicker test
    initial_R=None, 
    initial_sigma=None,
    verbose=True
)

Starting Orthogonal Matching Pursuit solver
OMP iter 1: support=[0], H=0.100000
OMP iter 2: support=[0, 6], H=0.200000
OMP iter 3: support=[0, 6, 4], H=0.300000
OMP iter 4: support=[0, 6, 4, 1], H=0.400000
OMP iter 5: support=[0, 6, 4, 1, 8], H=0.500000
OMP stop at iter 5: max|corr|=0.0000 < lambda=0.1
Finished OMP solver


In [167]:
print('sigma: ', sigma_mip, '\n R: ', R_mip)

sigma:  [1. 1. 0. 0. 1. 0. 1. 0. 1. 0.] 
 R:  [-2.10898243 -4.54278     0.          0.          1.4516005   0.
 -2.27923557  0.         -5.68920346  0.        ]


In [187]:
from sklearn.linear_model import OrthogonalMatchingPursuit

def solve_omp_sklearn(A_cs_matrix,
              y_cs_vec,
              lambda_mip,
              N_signal,
              M_measure,
              num_outer_iter=50,
              num_R_iter=None,
              num_sigma_iter=None,
              initial_R=None,
              initial_sigma=None,
              verbose=True,
              tol=1e-6):
    """
    OMP via scikit-learn's API.
    
    Uses lambda_mip as the stopping tolerance on residual norm.
    Signature matches solve_alternating_mip.
    
    Returns:
      R_current, sigma_current, history_H
    """
    # Instantiate scikit-learn OMP with tol = lambda_mip
    omp = OrthogonalMatchingPursuit(n_nonzero_coefs=3, 
                                    # tol=tol,
                                    fit_intercept=True)
    # Fit model
    omp.fit(A_cs_matrix, y_cs_vec)
    coef = omp.coef_
    
    # Build outputs
    R_current     = coef
    sigma_current = (coef != 0).astype(float)
    
    # Compute final Hamiltonian
    H_final = calculate_user_hamiltonian(A_cs_matrix, y_cs_vec,
                                         R_current, sigma_current,
                                         lambda_mip)
    history_H = [H_final]
    
    if verbose:
        sparsity = int(sigma_current.sum())
        print(f"scikit-learn OMP finished: sparsity = {sparsity}, H = {H_final:.6f}")
    
    return R_current, sigma_current, history_H

In [188]:
R_mip, sigma_mip, history_H_mip = solve_omp_sklearn(
    A_cs_matrix=A_sensing_matrix, 
    y_cs_vec=y_measurements, 
    lambda_mip=0.1,
    N_signal=N_signal, 
    M_measure=M_measurements,
    num_outer_iter=50, 
    num_R_iter=10, 
    num_sigma_iter=15, # Fewer iterations for quicker test
    initial_R=None, 
    initial_sigma=None,
    verbose=True
)

scikit-learn OMP finished: sparsity = 3, H = -5.386960


In [189]:
print('sigma: ', sigma_mip, '\n R: ', R_mip)

sigma:  [1. 0. 0. 0. 0. 0. 1. 0. 1. 0.] 
 R:  [-2.31221237  0.          0.          0.          0.          0.
 -2.76932826  0.         -4.97251688  0.        ]
