In [None]:
import numpy as np
from scipy.fft import fft
import pandas as pd
from scipy.linalg import orth
from typing import Tuple
from tqdm import tqdm
import time


In [None]:
# The class to generate the matrices.
class MatrixGeneration:

  """
    The MatrixGeneration class takes three arguments in the constructor.

    m: The number of rows.
    n: The number of cols.
    kappa: How "bad" or "ill-mannered" the matrix is. A lower value of kappa means the matrix is clean, meaning multiplying does not amplify errors.

  """
  def __init__(self,m,n,kappa):
    self.m=m
    self.n=n
    self.kappa=kappa

    ## TODO: Some places we return self.A, some places we return A.lets make it uniform.
    #matrices with good condition number
  def ug_matrix(self):
    U = orth(np.random.rand(self.m, self.n))   # m x n orthonormal matrix
    V = orth(np.random.rand(self.n, self.n))   # n x n orthonormal matrix
    S = np.diag(np.linspace(1, 1/self.kappa, self.n))  # Diagonal scaling matrix
    self.A = U @ S @ V   # Final matrix
    return self.A

  #nb and ng matrices are the matrices which have a bad leverage score
  def nb_matrix(self):
      # Determine dimensions for constructing the matrix
      half_num_cols = int(self.n / 2)
      top_block_rows = self.m - half_num_cols

      # Random Gaussian block
      gaussian_block = np.random.normal(0, 1, (top_block_rows, half_num_cols))

      # Tiny noise block
      tiny_noise_block = 1e-8 * np.random.rand(top_block_rows, half_num_cols)

      # Identity matrix for the stable part
      identity_block = np.identity(half_num_cols)

      # Construct matrix blocks
      top_left = self.kappa * gaussian_block
      top_right = tiny_noise_block
      bottom_left = np.zeros((half_num_cols, top_block_rows))
      bottom_right = identity_block

      # Assemble full matrix
      top_half = np.hstack([top_left, top_right])
      bottom_half = np.hstack([bottom_left[:, :half_num_cols], bottom_right])
      A = np.vstack([top_half, bottom_half])

      return A


In [None]:
class MatrixApproximationLeverage:
    """
    Performs approximate matrix multiplication using leverage score sampling techniques.
    """

    @staticmethod
    def leverage_score_sampling(A: np.ndarray, B: np.ndarray, samples: int) -> Tuple[np.ndarray, np.ndarray]:
        """
        Selects rows from A and B based on leverage scores to approximate A @ B.T.

        Args:
            A (np.ndarray): Input matrix of shape (m, n)
            B (np.ndarray): Input matrix of shape (n, p)
            samples (int): Number of rows to sample

        Returns:
            Tuple[np.ndarray, np.ndarray]: Scaled sampled matrices A' and B'
        """
        m, _ = A.shape
        _, p = B.shape

        U, _, _ = np.linalg.svd(A, full_matrices=False)
        U_b, _, _ = np.linalg.svd(B, full_matrices=False)

        leverage_scores_A = np.sum(U**2, axis=1)
        leverage_scores_B = np.sum(U_b**2, axis=1)

        top_A = np.argpartition(leverage_scores_A, -samples)[-samples:]
        top_B = np.argpartition(leverage_scores_B, -samples)[-samples:]

        sampled_A = A[top_A, :]
        sampled_B = B[top_B, :]

        scaled_A = sampled_A * np.sqrt(A.shape[0] / samples)
        scaled_B = sampled_B * np.sqrt(B.shape[1] / samples)

        return scaled_A, scaled_B

    @staticmethod
    def sqrt_leverage_score_sampling(A: np.ndarray, B: np.ndarray, samples: int) -> Tuple[np.ndarray, np.ndarray]:
        """
        Selects rows using the square root of leverage scores.

        Args:
            A (np.ndarray): Input matrix of shape (m, n)
            B (np.ndarray): Input matrix of shape (n, p)
            samples (int): Number of rows to sample

        Returns:
            Tuple[np.ndarray, np.ndarray]: Scaled sampled matrices A' and B'
        """

        m, _ = A.shape
        _, p = B.shape

        U, _, _ = np.linalg.svd(A, full_matrices=False)
        U_b, _, _ = np.linalg.svd(B, full_matrices=False)

        leverage_scores_A = np.sqrt(np.sum(U**2, axis=1))
        leverage_scores_B = np.sqrt(np.sum(U_b**2, axis=1))

        top_A = np.argpartition(leverage_scores_A, -samples)[-samples:]
        top_B = np.argpartition(leverage_scores_B, -samples)[-samples:]

        sampled_A = A[top_A, :]
        sampled_B = B[top_B, :]

        scaled_A = sampled_A * np.sqrt(A.shape[0] / samples)
        scaled_B = sampled_B * np.sqrt(B.shape[1] / samples)

        return scaled_A, scaled_B


In [None]:
import numpy as np
from typing import Tuple, Dict, Any

class MatrixApproximationSampling:
    def __init__(self, hash_size: int = 10000, seed: int = 42):
        """
        Initializes the sampler with a random hash vector used for priority and threshold sampling.
        """
        np.random.seed(seed)
        self.hash = np.random.uniform(0, 1, size=hash_size)

    def uniform_sampling(self, A: np.ndarray, B: np.ndarray, num_samples: int) -> Tuple[np.ndarray, np.ndarray]:
        """
        Uniformly samples columns from A and corresponding rows from B.T.

        Returns two matrices C and R such that C @ R ≈ A @ B.T
        """
        m, n_A = A.shape
        n_B, p = B.T.shape

        if n_A != n_B:
            raise ValueError("Incompatible shapes: A.columns must match B.rows for multiplication")

        C = np.zeros((m, num_samples))
        R = np.zeros((num_samples, p))

        for t in range(num_samples):
            idx = np.random.choice(n_A)
            scaling = np.sqrt(n_A / num_samples)
            C[:, t] = A[:, idx] * scaling
            R[t, :] = B.T[idx, :] * scaling

        return C, R

    def priority_sampling(self, A: np.ndarray, num_samples: int) -> Dict[str, Any]:
        """
        Performs priority sampling on the rows of matrix A.

        Returns a sketch with selected row indices, row values, and a threshold.
        """
        n, d = A.shape
        r = np.full(n, np.inf)

        for i in range(n):
            norm_sq = np.linalg.norm(A[i]) ** 2
            if norm_sq > 0:
                r[i] = self.hash[i] / norm_sq

        threshold = np.partition(r, num_samples)[num_samples] if num_samples < n else np.inf

        selected_indices = np.where(r < threshold)[0]
        selected_rows = A[selected_indices]

        return {
            'indices': selected_indices,
            'values': selected_rows,
            'threshold': threshold
        }

    def threshold_sampling(self, A: np.ndarray, num_samples: int) -> Dict[str, Any]:
        """
        Performs threshold sampling based on row norms and a hash function.

        Returns a sketch with selected row indices, row values, and a threshold value.
        """
        n, d = A.shape
        A_norm_sq = np.linalg.norm(A, 'fro') ** 2
        tau = num_samples / A_norm_sq if A_norm_sq > 0 else float('inf')

        selected_indices = []
        selected_rows = []

        for i in range(n):
            row_norm_sq = np.linalg.norm(A[i]) ** 2
            if self.hash[i] <= tau * row_norm_sq:
                selected_indices.append(i)
                selected_rows.append(A[i])

        return {
            'indices': np.array(selected_indices),
            'values': np.array(selected_rows),
            'threshold': tau
        }

    @staticmethod
    def approximate_matrix_multiplication(sketch_A: Dict[str, Any], sketch_B: Dict[str, Any]) -> np.ndarray:
        """
        Approximates matrix multiplication using row sketches.

        Only uses rows that are common to both A and B sketches.
        """
        indices_A = sketch_A['indices']
        indices_B = sketch_B['indices']
        common_indices = np.intersect1d(indices_A, indices_B)

        if common_indices.size == 0:
            return np.zeros((sketch_A['values'].shape[1], sketch_B['values'].shape[1]))

        result = np.zeros((sketch_A['values'].shape[1], sketch_B['values'].shape[1]))

        for idx in common_indices:
            i_A = np.where(indices_A == idx)[0][0]
            i_B = np.where(indices_B == idx)[0][0]

            row_A = sketch_A['values'][i_A]
            row_B = sketch_B['values'][i_B]

            norm_A = np.linalg.norm(row_A) ** 2
            norm_B = np.linalg.norm(row_B) ** 2
            denom = min(1.0, norm_A / sketch_A['threshold'], norm_B / sketch_B['threshold'])

            if denom > 0:
                result += np.outer(row_A, row_B) / denom

        return result


In [None]:
class MatrixApproximationSampling:
    def __init__(self, hash_size: int = 10000, seed: int = 42):
        """
        Initializes the sampler with a random hash vector used for priority and threshold sampling.
        """
        np.random.seed(seed)
        self.hash = np.random.uniform(0, 1, size=hash_size)
         def uniform_sampling(self, A: np.ndarray, B: np.ndarray, num_samples: int) -> Tuple[np.ndarray, np.ndarray]:
        """
        Uniformly samples columns from A and corresponding rows from B.T.

        Returns two matrices C and R such that C @ R ≈ A @ B.T
        """
        m, n_A = A.shape
        n_B, p = B.T.shape

        if n_A != n_B:
            raise ValueError("Incompatible shapes: A.columns must match B.rows for multiplication")

        C = np.zeros((m, num_samples))
        R = np.zeros((num_samples, p))

        for t in range(num_samples):
            idx = np.random.choice(n_A)
            scaling = np.sqrt(n_A / num_samples)
            C[:, t] = A[:, idx] * scaling
            R[t, :] = B.T[idx, :] * scaling

        return C, R


    def priority_sampling(self, A: np.ndarray, num_samples: int) -> Dict[str, Any]:
        """
        Performs priority sampling on the rows of matrix A.

        Returns a sketch with selected row indices, row values, and a threshold.
        """
        n, d = A.shape
        r = np.full(n, np.inf)

        for i in range(n):
            norm_sq = np.linalg.norm(A[i]) ** 2
            if norm_sq > 0:
                r[i] = self.hash[i] / norm_sq

        threshold = np.partition(r, num_samples)[num_samples] if num_samples < n else np.inf

        selected_indices = np.where(r < threshold)[0]
        selected_rows = A[selected_indices]

        return {
            'indices': selected_indices,
            'values': selected_rows,
            'threshold': threshold
        }

    def threshold_sampling(self, A: np.ndarray, num_samples: int) -> Dict[str, Any]:
        """
        Performs threshold sampling based on row norms and a hash function.

        Returns a sketch with selected row indices, row values, and a threshold value.
        """
        n, d = A.shape
        A_norm_sq = np.linalg.norm(A, 'fro') ** 2
        tau = num_samples / A_norm_sq if A_norm_sq > 0 else float('inf')

        selected_indices = []
        selected_rows = []

        for i in range(n):
            row_norm_sq = np.linalg.norm(A[i]) ** 2
            if self.hash[i] <= tau * row_norm_sq:
                selected_indices.append(i)
                selected_rows.append(A[i])

        return {
            'indices': np.array(selected_indices),
            'values': np.array(selected_rows),
            'threshold': tau
        }
    @staticmethod
    def approximate_matrix_multiplication(sketch_A: Dict[str, Any], sketch_B: Dict[str, Any]) -> np.ndarray:
      """
      Approximates matrix multiplication using row sketches.
      Only uses rows that are common to both A and B sketches.
      """
      indices_A = sketch_A['indices']
      indices_B = sketch_B['indices']
      common_indices = np.intersect1d(indices_A, indices_B)

      if common_indices.size == 0:
          return np.zeros((sketch_A['values'].shape[1], sketch_B['values'].shape[1]))

      result = np.zeros((sketch_A['values'].shape[1], sketch_B['values'].shape[1]))

      for idx in common_indices:
          i_A = np.where(indices_A == idx)[0][0]
          i_B = np.where(indices_B == idx)[0][0]

          row_A = sketch_A['values'][i_A]
          row_B = sketch_B['values'][i_B]

          norm_A = np.linalg.norm(row_A) ** 2
          norm_B = np.linalg.norm(row_B) ** 2
          # CORRECTED: Multiply by thresholds instead of dividing
          denom = min(1.0, norm_A * sketch_A['threshold'], norm_B * sketch_B['threshold'])

          if denom > 0:
              result += np.outer(row_A, row_B) / denom

      return result

In [None]:
class JLemma:

  def gaussian_JL(self,A,B,samples):
    m, n = A.shape
    _, p = B.shape
    R = np.random.normal(0, 1/np.sqrt(samples), size=(samples, n))
    A_proj = A @ R.T
    B_proj = R @ B.T
    approx_product = A_proj @ B_proj
    return approx_product
  def PHD_JL(self,A,B,samples):
        m, n = A.shape
        _, p = B.shape
        D = np.diag(np.random.choice([-1, 1], size=n))
        H = fft(np.eye(n), norm='ortho')
        P = np.zeros((samples, n))
        selected_rows = np.random.choice(n,samples)
        P[np.arange(samples), selected_rows] = 1
        Phi = P @ H @ D
        A_proj = A @ Phi.T
        B_proj = Phi @ B.T
        approx_product = A_proj @ B_proj
        return approx_product


In [None]:
import numpy as np
from scipy.fftpack import fft

class JLemma:
    def gaussian_JL(self, A, B, samples):
        """Gaussian JL: Dense random projection with i.i.d. normal entries."""
        m, n = A.shape
        _, p = B.shape
        R = np.random.normal(0, 1/np.sqrt(samples), size=(samples, n))
        A_proj = A @ R.T
        B_proj = R @ B.T
        approx_product = A_proj @ B_proj
        return approx_product

    def PHD_JL(self, A, B, samples):
        """PHD JL: Fast Johnson-Lindenstrauss using FFT and subsampling."""
        m, n = A.shape
        _, p = B.shape
        D = np.diag(np.random.choice([-1, 1], size=n))
        H = fft(np.eye(n), norm='ortho')
        P = np.zeros((samples, n))
        selected_rows = np.random.choice(n, samples, replace=False)
        P[np.arange(samples), selected_rows] = 1
        Phi = P @ H @ D
        A_proj = A @ Phi.T
        B_proj = Phi @ B.T
        approx_product = A_proj @ B_proj
        return approx_product

    def countsketch_JL(self, A, B, s):
        """
        Count Sketch JL: Sparse projection using hashing and sign flipping.
        Args:
            A: (m x n) matrix
            B: (p x n) matrix (must match A's columns)
            s: sketch size (target dimension)
        Returns:
            Approximate product (A @ B.T) of shape (m x p)
        """
        m, n = A.shape
        p, _ = B.shape

        # Generate hash functions and signs (shared for A and B)
        h = np.random.randint(0, s, size=n)  # Hash buckets
        sigma = np.random.choice([-1, 1], size=n)  # Random signs

        # Compute A's sketch (m x s)
        A_sketch = np.zeros((m, s))
        for j in range(n):
            A_sketch[:, h[j]] += sigma[j] * A[:, j]

        # Compute B's sketch (s x p)
        B_sketch = np.zeros((s, p))
        for j in range(n):
            B_sketch[h[j], :] += sigma[j] * B[:, j]

        return A_sketch @ B_sketch

In [None]:
#all the four matrices
"""
There are two key properties being considered here:

1. Leverage Score Distribution
Uniform leverage scores → all rows are equally “important”

Non-uniform leverage scores → some rows are more “important” than others

2. Condition Number (κ / kappa)
Good condition number (low κ) → matrix is stable for computation

Bad condition number (high κ) → matrix is numerically unstable, more sensitive to error

"""

# Case 1: Good Condition Number (kappa = 1e-8)
good_condition = MatrixGeneration(10000, 100, 1e-8)
A_uniform_good = good_condition.ug_matrix()
B_uniform_good = good_condition.ug_matrix()
A_nonuniform_good = good_condition.nb_matrix()
B_nonuniform_good = good_condition.nb_matrix()

# Case 2: Bad Condition Number (kappa = 10)
bad_condition = MatrixGeneration(10000, 100, 10)
A_uniform_bad = bad_condition.ug_matrix()
B_uniform_bad = bad_condition.ug_matrix()
A_nonuniform_bad = bad_condition.nb_matrix()
B_nonuniform_bad = bad_condition.nb_matrix()


In [None]:
sample_sizes = [250, 500, 750, 1000, 1250, 1500, 1750, 2000]
iterations = 5

matrix_types = [
    ("Uniform Good", 1e-8, "ug_matrix"),
    ("Uniform Bad", 10, "ug_matrix"),
    ("Non-uniform Good", 1e-8, "nb_matrix"),
    ("Non-uniform Bad", 10, "nb_matrix"),
]


In [None]:
results = []

for matrix_name, kappa, gen_method in tqdm(matrix_types, desc="Matrix Types"):
    generator = MatrixGeneration(10000, 100, kappa)
    A = getattr(generator, gen_method)()
    B = getattr(generator, gen_method)()

    original_product = A @ B.T
    original_norm = np.linalg.norm(original_product, 'fro')

    for size in sample_sizes:
        for i in range(iterations):
            # Each method result will be stored in its own row
            def add_row(method_name, approx_matrix, duration):
                approx_norm = np.linalg.norm(approx_matrix, 'fro')
                rel_error = abs(original_norm - approx_norm) / original_norm
                results.append({
                    "matrix": matrix_name,
                    "sample_size": size,
                    "iteration": i + 1,
                    "method": method_name,
                    "time": duration,
                    "frobenius_norm": approx_norm,
                    "relative_error": rel_error
                })

            # Leverage Sampling
            start = time.time()
            S_a, S_b = MatrixApproximationLeverage.leverage_score_sampling(A, B, size)
            end = time.time()
            add_row("Leverage Sampling", S_a @ S_b.T, end - start)

            # Sqrt Leverage
            start = time.time()
            S_a, S_b = MatrixApproximationLeverage.sqrt_leverage_score_sampling(A, B, size)
            end = time.time()
            add_row("Sqrt Leverage Sampling", S_a @ S_b.T, end - start)

            # Priority Sampling
            sampler = MatrixApproximationSampling()
            start = time.time()
            S_a = sampler.priority_sampling(A, size)
            S_b = sampler.priority_sampling(B, size)
            end = time.time()
            add_row("Priority Sampling", sampler.approximate_matrix_multiplication(S_a, S_b), end - start)

            # Threshold Sampling
            start = time.time()
            S_a = sampler.threshold_sampling(A, size)
            S_b = sampler.threshold_sampling(B, size)
            end = time.time()
            add_row("Threshold Sampling", sampler.approximate_matrix_multiplication(S_a, S_b), end - start)

            # Uniform Sampling
            start = time.time()
            S_a, S_b = sampler.uniform_sampling(A, B, size)
            end = time.time()
            add_row("Uniform Sampling", S_a @ S_b, end - start)

            # Gaussian JL
            lemma = JLemma()
            start = time.time()
            G = lemma.gaussian_JL(A, B, size)
            end = time.time()
            add_row("Gaussian JL", G, end - start)

            # PHD JL
            start = time.time()
            P = lemma.PHD_JL(A, B, size)
            end = time.time()
            add_row("PHD JL", P, end - start)

            #count-sketch JL
            start = time.time()
            C = lemma.countsketch_JL(A, B, size)
            end = time.time()
            add_row("countsketch JL", C, end - start)


# Save final results
df = pd.DataFrame(results)
df.to_csv("matrix_experiment_results.csv", index=False)
print("CSV saved as matrix_experiment_results.csv")


Matrix Types: 100%|██████████| 4/4 [1:40:24<00:00, 1506.18s/it]

CSV saved as matrix_experiment_results.csv



