In [1]:
import numpy as np
from scipy.stats import entropy
from scipy.sparse import issparse
import scanpy as sc
from joblib import Parallel, delayed
from scipy.sparse import csr_matrix
import time

In [2]:
import numpy as np

def binning_method(data, method="rice"):
    """
    Calculates the optimal binning for the given data based on the selected method.
    
    Parameters:
        data (array-like): The data for which to calculate the bins.
        method (str): The binning method. Options:
                      "auto", "square_root", "rice", "logarithmic", "freedman-diaconis", "scott".
    
    Returns:
        bins (ndarray or int): The bin edges (for "auto" method) or the number of bins (for other methods).
    """
    # Ensure data is a numpy array
    data = np.array(data)
    
    # Get the number of data points
    n = len(data)
    
    # If method is 'rice'
    if method == "rice":
        # Rice Rule: Number of bins = 2 * n^(1/3)
        return int(np.ceil(2 * n**(1/3)))

    # If method is 'logarithmic'
    elif method == "logarithmic":
        # Logarithmic Binning: Number of bins = log2(n) + 1
        return int(np.ceil(np.log2(n) + 1))
    
    # If method is 'square_root'
    elif method == "square_root":
        # Square Root Rule: Number of bins = sqrt(n)
        return int(np.ceil(np.sqrt(n)))
    
    # If method is 'freedman-diaconis'
    elif method == "freedman-diaconis":
        # Freedman-Diaconis Rule: Bin width = (2 * IQR) / n^(1/3)
        q1, q3 = np.percentile(data, [25, 75])
        iqr = q3 - q1
        bin_width = 2 * iqr / (n ** (1/3))
        return int(np.ceil((np.max(data) - np.min(data)) / bin_width))
    
    # If method is 'scott'
    elif method == "scott":
        # Scott's Rule: Bin width = (3.5 * std(data)) / (n^(1/3))
        bin_width_scott = (3.5 * np.std(data)) / (n ** (1/3))
        return int(np.ceil((np.max(data) - np.min(data)) / bin_width_scott))
    
    else:
        raise ValueError("Invalid method. Choose from: 'auto', 'square_root', 'rice', 'logarithmic', 'freedman-diaconis', 'scott'.")


In [3]:
def mutual_information_matrix(matrix, nbins=20, n_jobs=-1):
    """
    Computes the mutual information matrix in parallel, working directly with sparse matrices,
    and computes the full matrix (including the diagonal elements).
    """
    if not issparse(matrix):
        matrix = csr_matrix(matrix)

    n_features = matrix.shape[0]
    mi_matrix = np.zeros((n_features, n_features))

    def compute_pairwise_mi(i, j, matrix, nbins=20):
        """
        Computes mutual information between row i and row j of the sparse matrix.
        """
        vi = matrix[i, :].toarray().flatten()
        vj = matrix[j, :].toarray().flatten()

        # Select binning method
        x_bins = binning_method(vi)
        y_bins = binning_method(vj)
        # Print the bins
        print(f"Bins for feature {i} (x):", x_bins)
        print(f"Bins for feature {j} (y):", y_bins)
                
        joint_counts, _, _ = np.histogram2d(vi, vj, bins=[x_bins, y_bins])
        ncounts = joint_counts.sum()
        if ncounts == 0:
            return 0  # No mutual information if no overlap
        joint_prob = joint_counts / ncounts + 1e-10

        marginal_i = joint_prob.sum(axis=1) + 1e-10
        marginal_j = joint_prob.sum(axis=0) + 1e-10
        
        joint_prob = joint_prob.flatten()
        h_xy =  -np.sum(joint_prob * np.log2(joint_prob))
        h_x =  -np.sum(marginal_i * np.log2(marginal_i))
        h_y =  -np.sum(marginal_j * np.log2(marginal_j))

        return float(h_x + h_y - h_xy)

    # Parallelizing the pairwise mutual information computation
    jobs = [(i, j) for i in range(n_features) for j in range(i+1, n_features)]  # Includes diagonal
    results = Parallel(n_jobs=n_jobs)(
        delayed(compute_pairwise_mi)(i, j, matrix, nbins) for i, j in jobs
    )

    # Fill the matrix with the results
    for idx, (i, j) in enumerate(jobs):
        mi_matrix[i, j] = results[idx]
        mi_matrix[j, i] = results[idx]  # Exploit symmetry to avoid duplicate computation

    return mi_matrix

In [4]:
import numpy as np
from scipy.sparse import csr_matrix, issparse

def compute_pearson_residual(matrix, theta=100):
    """
    Compute clipped Pearson residuals for a given sparse matrix, where rows are features and columns are cells.
    Reference: https://doi.org/10.1101/2020.12.01.405886

    Args:
        matrix (scipy.sparse.csr_matrix or numpy.ndarray): 2D sparse or dense array with rows as features and columns as cells.
    
    Returns:
        scipy.sparse.csr_matrix: Clipped Pearson residuals matrix in sparse format.
    """
    # Ensure the input matrix is sparse
    if not issparse(matrix):
        matrix = csr_matrix(matrix)
    
    # Compute the row and column sums
    row_sums = matrix.sum(axis=1)  # Sum along rows (features)
    col_sums = matrix.sum(axis=0)  # Sum along columns (cells)
    total_sum = matrix.sum()       # Total sum of all elements

    # Compute the expected values under independence assumption
    row_sums_dense = np.array(row_sums).flatten()  # Convert to dense
    col_sums_dense = np.array(col_sums).flatten()  # Convert to dense
    expected = (row_sums_dense[:, None] * col_sums_dense[None, :]) / total_sum

    # Ensure expected values are sparse for consistency
    expected_sparse = csr_matrix(expected)

    # Standard deviation for Pearson residuals
    std_dev = np.sqrt(expected_sparse + (expected_sparse.power(2)) / theta)

    # Compute Pearson residuals
    residuals = (matrix - expected_sparse).multiply(std_dev.power(-1))  # Element-wise division

    # Replace NaN and Inf values (caused by division by zero in std_dev) with 0
    residuals.data[np.isnan(residuals.data)] = 0
    residuals.data[np.isinf(residuals.data)] = 0

    # Clip residuals to ±sqrt(n), where n is the number of columns (cells)
    num_cells = matrix.shape[1]
    clip_value = np.sqrt(num_cells)
    residuals.data = np.clip(residuals.data, -clip_value, clip_value)

    return residuals


In [None]:
import os
import scanpy as sc

# Download h5ad here: https://drive.google.com/drive/folders/1-2og2FAM0_6e3L2_9C7HnOm9sgfrJIRe?usp=sharing
# Load data
base_path = r"C:\Users\ssromerogon\Documents\vscode_working_dir\QUBO_Feature_Selection\qubo_fs_dwave\efficient_differentiation\5000_HVGs"
fname = r"Data_hESC_EC_day1_5000g_filtered_feature_bc_matrix_h5.h5ad"

# Use os.path.join for cross-platform compatibility
file_dir = os.path.join(base_path, fname)

# Read the AnnData object
adata = sc.read_h5ad(file_dir)

adata.layers['counts'] = adata.X.copy() 
# Print the AnnData object summary
print(adata)

AnnData object with n_obs × n_vars = 4697 × 5000
    obs: 'CellID', 'BatchID', 'ClusterID', 'CellType', 'CellCycle', 'cell_potency', 'monocle3_pseudotime', 'splinefit_pseudotime', 'Tmonocleout'
    layers: 'counts'


In [6]:
# Apply pearson residuals
#sc.experimental.pp.normalize_pearson_residuals(adata, theta=100)
#X = np.array(adata.X.T)
X = adata.X.T
X = compute_pearson_residual(X)
X = X.toarray()
y = np.array(adata.obs['monocle3_pseudotime'].values)
y_reshaped = y.reshape(1, -1) 
g = adata.var_names
print(X.shape)
print(g.shape)
print(y_reshaped.shape)
Xy = np.vstack((X, y_reshaped)) 
Xy = csr_matrix(Xy)
X = csr_matrix(X)

print(Xy.shape)

(5000, 4697)
(5000,)
(1, 4697)
(5001, 4697)


In [7]:
start_time = time.time()

MI_mat = mutual_information_matrix(Xy, nbins=20, n_jobs=-1)

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time for MI construction: {elapsed_time:.4f} seconds") 

Elapsed time for MI construction: 1249.0782 seconds


In [11]:
import numpy as np
from scipy.optimize import root_scalar, minimize
import time
from dwave.samplers import SteepestDescentSolver

K = 100
# Redundancy matrix and importance vector from MI_mat and K
R = MI_mat[:-1, :-1] / (K - 1)
J = MI_mat[-1, :-1]

# Define the optimize_qubo function (using SteepestDescentSolver to solve QUBO)
def optimize_qubo(alpha, R, J):
    """
    Computes the QUBO matrix and solves it using the Steepest Descent Solver.
    
    Parameters:
    - alpha (float): Mixing optimal parameter.
    - R (numpy.ndarray): Redundancy matrix (MI of feature-feature).
    - J (numpy.ndarray): Importance vector (MI of target-features).
    
    Returns:
    - n (int): Number of selected features (non-zero in solution vector).
    - result (dimod.SampleSet): QUBO solver results.
    """
    # Compute Q matrix
    Qmat = (1 - alpha) * R - alpha * np.diag(J)
    
    # Convert QUBO matrix to dictionary format
    def qubo_matrix_to_dict(Q):
        qubo = {}
        for i in range(Q.shape[0]):
            for j in range(Q.shape[1]):
                if Q[i, j] != 0:
                    qubo[(i, j)] = Q[i, j]
        return qubo
    
    qubo = qubo_matrix_to_dict(Qmat)
    
    # Initialize the Steepest Descent Solver
    sampler = SteepestDescentSolver()
    
    # Solve the QUBO problem
    result = sampler.sample_qubo(qubo, num_reads=100)
    
    print(f"Energy from the solution: {result.first.energy}")  # Print energy of the solution

    # Number of selected features (non-zero elements in the best solution vector)
    n = sum(result.first.sample.values())

    print(f"n solution: {n}")  # Print energy of the solution

    return n, result

In [12]:
import numpy as np
from scipy.optimize import root_scalar, minimize
import time
from dwave.samplers import SteepestDescentSolver

# Start timing
start_time = time.time()

# Objective function to optimize
def objective(alpha):
    print(f"Testing alpha: {alpha}")  # Print tested alpha values
    return optimize_qubo(alpha, R, J)[0] - K

# Try to find the optimal alpha using root_scalar (similar to fzero)
try:
    result = root_scalar(objective, bracket=[0, 1], method='bisect')
    alphasol = result.root
    print(f"Optimal alpha value from root_scalar: {alphasol}")

    _, xsol = optimize_qubo(alphasol, R, J)
    print(f"Energy from the solution: {xsol.first.energy}")  # Print energy of the solution

except:
    # If root_scalar fails, fall back to minimize (similar to fminsearch)
    result = minimize(objective, x0=1, method='Nelder-Mead')
    alphasol = result.x[0]
    print(f"Optimal alpha value from minimize: {alphasol}")

    _, xsol = optimize_qubo(alphasol, R, J)
    print(f"Energy from the solution: {xsol.first.energy}")  # Print energy of the solution

# End timing
time_zerof = time.time() - start_time

# Print computation time
print(f"Time taken: {time_zerof:.2f} seconds")

Testing alpha: 0.0
Energy from the solution: 9.000359568744898e-09
n solution: 1
Testing alpha: 1.0
Energy from the solution: -605.7357724436584
n solution: 5000
Testing alpha: 0.5
Energy from the solution: -4.726462112615991
n solution: 82
Testing alpha: 0.75
Energy from the solution: -14.293549381241974
n solution: 214
Testing alpha: 0.625
Energy from the solution: -8.232391475532495
n solution: 128
Testing alpha: 0.5625
Energy from the solution: -6.270376354626933
n solution: 103
Testing alpha: 0.53125
Energy from the solution: -5.455345323567599
n solution: 93
Testing alpha: 0.546875
Energy from the solution: -5.85104980406868
n solution: 97
Testing alpha: 0.5546875
Energy from the solution: -6.057486188678013
n solution: 101
Testing alpha: 0.55078125
Energy from the solution: -5.953716814117797
n solution: 99
Testing alpha: 0.552734375
Energy from the solution: -6.0054158741531865
n solution: 99
Testing alpha: 0.5537109375
Energy from the solution: -6.031409832396093
n solution: 1