In [132]:
import numpy as np
from scipy.linalg import svd
import plotly.graph_objects as go
import pandas as pd

*Compression Algorithm Fig 1*

In [83]:
def randomized_compression(A, r, rOV, w):
    """
    Compute a compression matrix Q for A using a randomized algorithm.

    Parameters:
    A (numpy array): Input matrix (m x n)
    r (int): Target rank
    rOV (int): Oversampling parameter
    w (int): Exponent for the power iteration

    Returns:
    Q (numpy array): Compression matrix (m x (r + rOV))
    """
    m, n = A.shape
    d = r + rOV  # Effective reduced dimension

    # Step 1: Draw a Gaussian random matrix Omega_L
    Omega_L = np.random.randn(n, d)

    # Step 2: Compute B = (A A^T)^w A Omega_L
    B = A @ Omega_L  # Initial multiplication: A Omega_L
    for _ in range(w):
        B = (A @ A.T) @ B # Power iteration: (A A^T) B

    # Step 3: Compute the orthogonal basis Q using QR decomposition
    Q, _ = np.linalg.qr(B)

    return Q

*NMF with Structured Random Compression Fig 2* 

In [None]:
def randomized_nmf(A, r, r_ov=5, w=1, max_iter=500, tol=1e-6):
    """
    NMF with compression matrices L and R, random initialization for Y_k,
    and multiplicative updates with proper dimension handling.
    """
    m, n = A.shape
    d = r + r_ov  # Effective reduced dimension

    # --- Step 1: Compute compression matrices L and R ---
    L = randomized_compression(A, r, r_ov, w)      # m × d
    R = randomized_compression(A.T, r, r_ov, w).T  # d × n

    # --- Step 3: Initialize Y_k as random non-negative ---
    Y_k = np.abs(np.random.randn(r, n))  # r × n
    Y_k = np.maximum(Y_k, 1e-6)          # Enforce non-negativity

    # Initialize X
    X_k = np.abs(np.random.randn(m, r))   # m × r
    X_k = np.maximum(X_k, 1e-6)

    # --- Main loop ---
    errors = []
    for k in range(max_iter):
        # --- Update X_{k+1} ---
        # numerator: (m × d) @ (d × r) = m × r
        numerator = (A @ R.T) @ (R @ Y_k.T)
        # denominator: (m × r) @ (r × d) @ (d × r) = m × r
        denominator = X_k @ (Y_k @ R.T @ R @ Y_k.T)
        X_k_plus1 = X_k * numerator / (denominator + 1e-10)
        X_k_plus1 = np.maximum(X_k_plus1, 1e-6)

        # --- Update Y_{k+1} ---
        # numerator: (r × d) @ (d × n) = r × n
        numerator = (L.T @ X_k_plus1).T @ (L.T @ A)
        # denominator: (r × d) @ (d × r) @ (r × n) = r × n
        denominator = (L.T @ X_k_plus1).T @ (L.T @ X_k_plus1) @ Y_k
        Y_k_plus1 = Y_k * numerator / (denominator + 1e-10)
        Y_k_plus1 = np.maximum(Y_k_plus1, 1e-6)

        # --- Convergence check ---
        error = np.linalg.norm(A - X_k_plus1 @ Y_k_plus1, 'fro') / np.linalg.norm(A, 'fro')
        errors.append(error)

        X_k, Y_k = X_k_plus1, Y_k_plus1

    return X_k, Y_k, errors

In [111]:
# Usage
np.random.seed(42)
A = np.abs(np.random.randn(100, 50))  # Non-negative input
X, Y, errors = randomized_nmf(A, r=5,w=5)
print(f"Final normalized error: {errors[-1]:.4f}")

Final normalized error: 0.5402


*Error Plot*

In [112]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        y = errors
    )
)

fig.show()

*Subsample Randomized Hadamard Transform*

In [87]:
import numpy as np
from scipy.linalg import hadamard

def randomized_compression(A, r, rOV, w = 10):
    """
    Compute a compression matrix Q for A using a subsampled randomized Hadamard transform.

    Parameters:
    A (numpy array): Input matrix (m x n)
    r (int): Target rank
    rOV (int): Oversampling parameter
    w (int): Exponent for the power iteration

    Returns:
    Q (numpy array): Compression matrix (m x (r + rOV))
    """
    m, n = A.shape
    d = r + rOV  # Effective reduced dimension

    # Step 1: Generate subsampled randomized Hadamard matrix Omega_L
    # Find smallest power of 2 greater than or equal to n
    n_power = 2**int(np.ceil(np.log2(n)))
    
    # Generate Hadamard matrix of size n_power x n_power
    H = hadamard(n_power)
    
    # Generate random diagonal matrix D with ±1 entries
    D = np.diag(np.random.choice([-1, 1], size=n_power))
    
    # Compute randomized Hadamard transform
    Omega_full = (1/np.sqrt(n_power)) * D @ H
    
    # If n < n_power, truncate the matrix to size n
    if n < n_power:
        Omega_full = Omega_full[:n, :]
    
    # Randomly subsample d columns
    subsample_indices = np.random.choice(n_power, size=d, replace=False)
    Omega_L = Omega_full[:, subsample_indices]

    # Step 2: Compute B = (A A^T)^w A Omega_L
    B = A @ Omega_L  # Initial multiplication: A Omega_L
    for _ in range(w):
        B = (A @ A.T) @ B  # Power iteration: (A A^T) B

    # Step 3: Compute the orthogonal basis Q using QR decomposition
    Q, _ = np.linalg.qr(B)

    return Q

*FJLT*

In [98]:
import numpy as np
from numpy.fft import fft

def randomized_compression(A, r, rOV, w):
    """
    Compute a compression matrix Q for A using an FJLT test matrix with FFT-based Hadamard transform.

    Parameters:
    A (numpy array): Input matrix (m x n)
    r (int): Target rank
    rOV (int): Oversampling parameter
    w (int): Exponent for the power iteration

    Returns:
    Q (numpy array): Compression matrix (m x (r + rOV))
    """
    m, n = A.shape
    d = r + rOV  # Effective reduced dimension

    # Find smallest power of 2 >= n
    n_power = 2**int(np.ceil(np.log2(n)))

    # P: Random diagonal matrix with ±1 entries
    P = np.random.choice([-1, 1], size=n_power)
    
    # D: Sparse random projection matrix
    q = 0.1
    D = (1/np.sqrt(q)) * np.random.choice([0, 1, -1], size=(n_power, d), 
                                        p=[1-q, q/2, q/2])
    
    # H: Fast Walsh-Hadamard Transform via FFT
    def apply_hadamard(x):
        n_pow = len(x)
        # Pad x to n_power if necessary
        if len(x) < n_pow:
            x = np.pad(x, (0, n_pow - len(x)), mode='constant')
        return fft(x) * (1/np.sqrt(n_pow))
    
    # Construct Omega_L = D.T @ H @ P (applied to columns of A)
    # Step 1: Apply P (element-wise multiplication) to each column of A
    if n < n_power:
        A_padded = np.pad(A, ((0, 0), (0, n_power - n)), mode='constant')
    else:
        A_padded = A
    
    PA = A_padded * P[np.newaxis, :]  # (m × n_power)
    
    # Step 2: Apply Hadamard transform to each row of PA
    HPA = np.apply_along_axis(apply_hadamard, 1, PA)  # (m × n_power)
    
    # Step 3: Apply sparse projection D.T
    Omega_L = HPA @ D  # (m × n_power) @ (n_power × d) = (m × d)
    
    # Step 2: Compute B = (A A^T)^w A Omega_L
    B = Omega_L  # Since Omega_L is already m × d, no need for A @ Omega_L here
    for _ in range(w):
        B = (A @ A.T) @ B
    
    # Step 3: QR decomposition
    Q, _ = np.linalg.qr(B)
    return Q

# Test it
np.random.seed(42)
A = np.abs(np.random.randn(100, 50))
Q = randomized_compression(A, r=5, rOV=5, w=1)
print(f"Shape of Q: {Q.shape}")  # Should be (100, 10)

Shape of Q: (100, 10)


*Clarkson-Woodruff Transform*

In [107]:
import numpy as np

def randomized_compression(A, r, rOV, w):
    """
    Compute a compression matrix Q for A using the Clarkson-Woodruff Transform.

    Parameters:
    A (numpy array): Input matrix (m x n)
    r (int): Target rank
    rOV (int): Oversampling parameter
    w (int): Exponent for the power iteration

    Returns:
    Q (numpy array): Compression matrix (m x (r + rOV))
    """
    m, n = A.shape
    d = r + rOV  # Effective reduced dimension

    # Step 1: Construct Clarkson-Woodruff Transform (CountSketch matrix)
    # Parameters for sparsity (s controls the number of non-zero entries per column)
    s = max(1, int(np.ceil(np.log(n) / np.log(d))))  # Adjust sparsity based on problem size
    
    # Hash functions for column indices and signs
    hash_indices = np.random.randint(0, d, size=n)  # h: [n] -> [d]
    signs = np.random.choice([-1, 1], size=n)       # σ: [n] -> {-1, 1}
    
    # Build sparse sketching matrix S (d × n)
    S = np.zeros((d, n))
    for j in range(n):
        S[hash_indices[j], j] = signs[j]
    
    # Step 2: Compute sketch B = A S^T
    B = A @ S.T  # (m × n) @ (n × d) = (m × d)
    
    # Step 3: Power iteration: B = (A A^T)^w B
    for _ in range(w):
        B = (A @ A.T) @ B
    
    # Step 4: QR decomposition to get orthonormal basis
    Q, _ = np.linalg.qr(B)
    
    return Q

# Test it
np.random.seed(42)
A = np.abs(np.random.randn(100, 50))
Q = randomized_compression(A, r=5, rOV=5, w=10)
print(f"Shape of Q: {Q.shape}")  # Should be (100, 10)

Shape of Q: (100, 10)


*Grok NMF*

In [113]:
import numpy as np

def randomized_compression(A, r, rOV, w):
    """
    Compute a compression matrix Q for A using the Clarkson-Woodruff Transform.

    Parameters:
    A (numpy array): Input matrix (m x n)
    r (int): Target rank
    rOV (int): Oversampling parameter
    w (int): Exponent for the power iteration

    Returns:
    Q (numpy array): Compression matrix (m x (r + rOV))
    """
    m, n = A.shape
    d = r + rOV  # Effective reduced dimension

    # Step 1: Construct Clarkson-Woodruff Transform (CountSketch matrix)
    s = max(1, int(np.ceil(np.log(n) / np.log(d))))  # Sparsity parameter
    
    # Hash functions for column indices and signs
    hash_indices = np.random.randint(0, d, size=n)  # h: [n] -> [d]
    signs = np.random.choice([-1, 1], size=n)       # σ: [n] -> {-1, 1}
    
    # Build sparse sketching matrix S (d × n)
    S = np.zeros((d, n))
    for j in range(n):
        S[hash_indices[j], j] = signs[j]
    
    # Step 2: Compute sketch B = A S^T
    B = A @ S.T  # (m × n) @ (n × d) = (m × d)
    
    # Step 3: Power iteration: B = (A A^T)^w B
    for _ in range(w):
        B = (A @ A.T) @ B
    
    # Step 4: QR decomposition to get orthonormal basis
    Q, _ = np.linalg.qr(B)
    
    return Q

def randomized_nmf(A, r, rOV, w, max_iter=100, tol=1e-5):
    """
    Randomized NMF algorithm using compression matrices L and R.

    Parameters:
    A (numpy array): Input matrix (m x n)
    r (int): Target rank
    rOV (int): Oversampling parameter
    w (int): Exponent for the power iteration
    max_iter (int): Maximum number of iterations
    tol (float): Convergence tolerance

    Returns:
    X (numpy array): Non-negative matrix (m x r)
    Y (numpy array): Non-negative matrix (r x n)
    errors (list): List of normalized Frobenius norm errors
    """
    m, n = A.shape
    d = r + rOV  # Effective reduced dimension

    # Step 1: Compute compression matrices L and R
    L = randomized_compression(A, r, rOV, w)      # m × (r + rOV)
    R = randomized_compression(A.T, r, rOV, w).T  # (r + rOV) × n

    # Compute Aˇ = A R^T and Aˆ = L^T A
    A_breve = A @ R.T  # (m × (r + rOV))
    A_hat = L.T @ A    # ((r + rOV) × n)

    # Step 2: Initialize Y_k
    Y_k = np.abs(np.random.randn(r, n))  # (r × n)

    # Step 3: Iterative updates
    k = 1
    errors = []
    while True:
        # Step 5: Y_k ← Y_k R^T
        Y_k = Y_k @ R.T  # (r × (r + rOV))

        # Step 6: Find X_{k+1} such that ||Aˇ - X_{k+1} Y_k||_F^2 is minimized
        # Solve: X_{k+1} Y_k = Aˇ (least squares)
        X_k1 = np.linalg.lstsq(Y_k.T, A_breve.T, rcond=None)[0].T  # (m × r)
        X_k1 = np.maximum(X_k1, 0)  # Enforce non-negativity

        # Step 7: X_{k+1} ← L^T X_{k+1}
        X_k1 = L.T @ X_k1  # ((r + rOV) × r)

        # Step 8: Find Y_{k+1} such that ||Aˆ - X_{k+1} Y_{k+1}||_F^2 is minimized
        # Solve: X_{k+1} Y_{k+1} = Aˆ (least squares)
        Y_k1 = np.linalg.lstsq(X_k1, A_hat, rcond=None)[0]  # (r × n)
        Y_k1 = np.maximum(Y_k1, 0)  # Enforce non-negativity

        # Compute error: ||A - L X_{k+1} Y_{k+1}||_F
        A_approx = L @ X_k1 @ Y_k1
        error = np.linalg.norm(A - A_approx, 'fro') / np.linalg.norm(A, 'fro')
        errors.append(error)

        # Step 9: Update k
        k += 1
        Y_k = Y_k1

        # Step 10: Check convergence
        if k > max_iter or (len(errors) > 1 and abs(errors[-1] - errors[-2]) < tol):
            break

    # Final X and Y
    X = L @ X_k1  # (m × r)
    Y = Y_k1      # (r × n)

    return X, Y, errors

# Test the implementation
np.random.seed(42)
A = np.abs(np.random.randn(100, 50))  # Non-negative input
r = 5
rOV = 5
w = 1
X, Y, errors = randomized_nmf(A, r, rOV, w)
print(f"Final normalized error: {errors[-1]:.4f}")
print(f"Shape of X: {X.shape}")  # Should be (100, 5)
print(f"Shape of Y: {Y.shape}")  # Should be (5, 50)

Final normalized error: 0.5459
Shape of X: (100, 5)
Shape of Y: (5, 50)


In [116]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        y = errors
    )
)

fig.show()

In [117]:
import numpy as np
from scipy.optimize import nnls

def randomized_compression(A, r, rOV, w):
    """
    Compute a compression matrix Q for A using the Clarkson-Woodruff Transform.

    Parameters:
    A (numpy array): Input matrix (m x n)
    r (int): Target rank
    rOV (int): Oversampling parameter
    w (int): Exponent for the power iteration

    Returns:
    Q (numpy array): Compression matrix (m x (r + rOV))
    """
    m, n = A.shape
    d = r + rOV  # Effective reduced dimension

    # Step 1: Construct Clarkson-Woodruff Transform (CountSketch matrix)
    s = max(1, int(np.ceil(np.log(n) / np.log(d))))  # Sparsity parameter
    
    # Hash functions for column indices and signs
    hash_indices = np.random.randint(0, d, size=n)  # h: [n] -> [d]
    signs = np.random.choice([-1, 1], size=n)       # σ: [n] -> {-1, 1}
    
    # Build sparse sketching matrix S (d × n)
    S = np.zeros((d, n))
    for j in range(n):
        S[hash_indices[j], j] = signs[j]
    
    # Step 2: Compute sketch B = A S^T
    B = A @ S.T  # (m × n) @ (n × d) = (m × d)
    
    # Step 3: Power iteration: B = (A A^T)^w B
    for _ in range(w):
        B = (A @ A.T) @ B
    
    # Step 4: QR decomposition to get orthonormal basis
    Q, _ = np.linalg.qr(B)
    
    return Q

def randomized_nmf(A, r, rOV, w, max_iter=100, tol=1e-5):
    """
    Randomized NMF algorithm using compression matrices L and R, with NNLS for updates.

    Parameters:
    A (numpy array): Input matrix (m x n)
    r (int): Target rank
    rOV (int): Oversampling parameter
    w (int): Exponent for the power iteration
    max_iter (int): Maximum number of iterations
    tol (float): Convergence tolerance

    Returns:
    X (numpy array): Non-negative matrix (m x r)
    Y (numpy array): Non-negative matrix (r x n)
    errors (list): List of normalized Frobenius norm errors
    """
    m, n = A.shape
    d = r + rOV  # Effective reduced dimension

    # Step 1: Compute compression matrices L and R
    L = randomized_compression(A, r, rOV, w)      # m × (r + rOV)
    R = randomized_compression(A.T, r, rOV, w).T  # (r + rOV) × n

    # Compute Aˇ = A R^T and Aˆ = L^T A
    A_breve = A @ R.T  # (m × (r + rOV))
    A_hat = L.T @ A    # ((r + rOV) × n)

    # Step 2: Initialize Y_k
    Y_k = np.abs(np.random.randn(r, n))  # (r × n)

    # Step 3: Iterative updates
    k = 1
    errors = []
    while True:
        # Step 5: Y_k ← Y_k R^T
        Y_k = Y_k @ R.T  # (r × (r + rOV))

        # Step 6: Find X_{k+1} such that ||Aˇ - X_{k+1} Y_k||_F^2 is minimized
        # Solve: X_{k+1} Y_k = Aˇ using NNLS for each row of X_{k+1}
        X_k1 = np.zeros((m, r))
        for i in range(m):
            X_k1[i, :] = nnls(Y_k.T, A_breve[i, :])[0]  # Solve for each row

        # Step 7: X_{k+1} ← L^T X_{k+1}
        X_k1 = L.T @ X_k1  # ((r + rOV) × r)

        # Step 8: Find Y_{k+1} such that ||Aˆ - X_{k+1} Y_{k+1}||_F^2 is minimized
        # Solve: X_{k+1} Y_{k+1} = Aˆ using NNLS for each column of Y_{k+1}
        Y_k1 = np.zeros((r, n))
        for j in range(n):
            Y_k1[:, j] = nnls(X_k1, A_hat[:, j])[0]  # Solve for each column

        # Compute error: ||A - L X_{k+1} Y_{k+1}||_F
        A_approx = L @ X_k1 @ Y_k1
        error = np.linalg.norm(A - A_approx, 'fro') / np.linalg.norm(A, 'fro')
        errors.append(error)

        # Step 9: Update k
        k += 1
        Y_k = Y_k1

        # Step 10: Check convergence
        if k > max_iter or (len(errors) > 1 and abs(errors[-1] - errors[-2]) < tol):
            break

    # Final X and Y
    X = L @ X_k1  # (m × r)
    Y = Y_k1      # (r × n)

    return X, Y, errors

# Test the implementation
np.random.seed(42)
A = np.abs(np.random.randn(100, 50))  # Non-negative input
r = 5
rOV = 5
w = 1
X, Y, errors = randomized_nmf(A, r, rOV, w)
print(f"Final normalized error: {errors[-1]:.4f}")
print(f"Shape of X: {X.shape}")  # Should be (100, 5)
print(f"Shape of Y: {Y.shape}")  # Should be (5, 50)

Final normalized error: 0.5444
Shape of X: (100, 5)
Shape of Y: (5, 50)


#### *Benchmark Methods*

In [121]:
def generate_ground_truth(m, r, delta):
    """Generate ground truth matrices X_GT and Y_GT"""
    n = int(0.75 * m)
    
    # Generate X_GT (m × r)
    mask_x = np.random.rand(m, r) < delta
    X_GT = np.random.uniform(0, 1, (m, r)) * mask_x
    
    # Generate Y_GT (r × n)
    mask_y = np.random.rand(r, n) < delta
    Y_GT = np.random.uniform(0, 1, (r, n)) * mask_y
    
    return X_GT, Y_GT, n

def generate_noise_matrix(m, n, delta):
    """Generate noise matrix N"""
    noise_density = delta**2
    mask_n = np.random.rand(m, n) < noise_density
    N = np.random.randn(m, n) * mask_n
    return N

def generate_dense_matrix(m, r, delta=1.0):
    """Generate dense synthetic matrix A = X_GT Y_GT + N"""
    X_GT, Y_GT, n = generate_ground_truth(m, r, delta)
    N = generate_noise_matrix(m, n, delta)
    A = X_GT @ Y_GT + N
    return A, X_GT, Y_GT

def generate_sparse_matrix(m, r, delta=0.01):
    """Generate sparse synthetic matrix A = X_GT Y_GT + N (without CSR)"""
    X_GT, Y_GT, n = generate_ground_truth(m, r, delta)
    N = generate_noise_matrix(m, n, delta)
    
    # Compute product while maintaining sparsity
    A = np.zeros((m, n))
    for i in range(m):
        for j in range(n):
            # Only compute non-zero elements
            if np.random.rand() < delta**2:  # Probability of non-zero
                A[i,j] = np.sum(X_GT[i,:] * Y_GT[:,j]) + N[i,j]
    return A, X_GT, Y_GT

In [141]:
from collections import defaultdict

sizes = np.arange(1_000,5_000 + 1_000,1000)
delta_dense = 1.0
delta_sparse = 0.01
r = 20
rOV = 5
w = 4

results = []

np.random.seed(123)
for m in sizes:
    n = int(0.75*m)

    # sparse and dense matrix
    for delta,density in [(delta_dense,'dense'),(delta_sparse,'sparse')]:
        if density == 'dense':
            A,_,_ = generate_dense_matrix(m,r,delta) 
        elif density == 'sparse':
            A,_,_ = generate_sparse_matrix(m,r,delta)

        X, Y, errors = randomized_nmf(np.abs(A),r,rOV,w)
        
        results.append({
            'Size': m,
            'Density': density,
            'Errors':errors,
            'Final Error': errors[-1]
        })


invalid value encountered in scalar divide



In [143]:
results_df = pd.DataFrame(results)

In [144]:
results_df

Unnamed: 0,Size,Density,Errors,Final Error
0,1000,dense,"[0.19134024393882426, 0.1912312922346707, 0.19...",0.1912307
1,1000,sparse,"[nan, nan, nan, nan, nan, nan, nan, nan, nan, ...",
2,2000,dense,"[0.1942293629630983, 0.19415962816581686, 0.19...",0.1941552
3,2000,sparse,"[0.0, 0.0]",0.0
4,3000,dense,"[0.19444680636262185, 0.19431461637387612, 0.1...",0.1943243
5,3000,sparse,"[1.2083198100473787e-16, 3.752981227801805e-18]",3.752981e-18
6,4000,dense,"[0.1969823517300768, 0.1968275765296103, 0.196...",0.1968134
7,4000,sparse,"[1.6781329234381388e-16, 0.0]",0.0
8,5000,dense,"[0.19607426836195416, 0.19577816011353463, 0.1...",0.1957584
9,5000,sparse,"[0.0, 1.0387947505092097e-16]",1.038795e-16


In [145]:
dense_results = results_df[results_df['Density'] == 'dense']

In [146]:
dense_results

Unnamed: 0,Size,Density,Errors,Final Error
0,1000,dense,"[0.19134024393882426, 0.1912312922346707, 0.19...",0.191231
2,2000,dense,"[0.1942293629630983, 0.19415962816581686, 0.19...",0.194155
4,3000,dense,"[0.19444680636262185, 0.19431461637387612, 0.1...",0.194324
6,4000,dense,"[0.1969823517300768, 0.1968275765296103, 0.196...",0.196813
8,5000,dense,"[0.19607426836195416, 0.19577816011353463, 0.1...",0.195758


In [152]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
    x = dense_results['Size'],
    y = dense_results['Final Error']
)
)



fig.update_layout(
    yaxis_range = [10**-2,1],
    template = 'plotly_white',
    title = 'Dense Matrix Error Rates'
)
