In [None]:
import numpy as np

def somp(Y: np.ndarray, Phi: np.ndarray, s: int):
    """
    Simultaneous Orthogonal Matching Pursuit (SOMP)

    Parameters
    ----------
    Y   : array, shape (m, K)
          Matrix of K measurement vectors (each column is one measurement).
    Phi : array, shape (m, n)
          Dictionary matrix whose columns are the atoms.
    s   : int
          Sparsity level: number of atoms to select.

    Returns
    -------
    S   : list of int
          Indices of the selected atoms (size s).
    X   : array, shape (s, K)
          Coefficient matrix (only the rows corresponding to S are nonzero).
    """
    # Initialize residual and support
    R = Y.copy()                    # residual: shape (m, K)
    S = []                          # support set (list of selected indices)
    
    # Precompute Phi^T for efficiency
    PhiT = Phi.T                  # shape (n, m)
    
    for _ in range(s):
        # 1) Compute (absolute) correlations for each atom j: sum over all K signals
        #    corr[j] = || (Phi[:, j].T @ R) ||_1
        # Using matrix form: (Phi^T @ R) is (n x K), then take abs and sum across columns.
        corr = np.sum(np.abs(PhiT @ R), axis=1)  # shape (n,)
        
        # 2) Mask out already-selected atoms (set their score to -inf)
        mask = np.ones_like(corr, dtype=bool)
        mask[S] = False
        corr_masked = np.where(mask, corr, -np.inf)
        
        # 3) Pick the atom with maximum correlation
        j_best = int(np.argmax(corr_masked))
        S.append(j_best)
        
        # 4) Form the sub-dictionary with selected atoms
        Phi_S = Phi[:, S]           # shape (m, |S|)
        
        # 5) Solve least-squares to get coefficients for all K signals:
        #    X_S = Phi_S^+ Y, where Phi_S^+ is the Moore–Penrose pseudoinverse
        X_S = np.linalg.pinv(Phi_S) @ Y  # shape (|S|, K)
        
        # 6) Update residuals: R = Y - Phi_S X_S
        R = Y - Phi_S @ X_S           # shape (m, K)
    
    # X: full n×K coefficient matrix, but only rows in S are nonzero.
    X = np.zeros((Phi.shape[1], Y.shape[1]))
    X[S, :] = X_S
    
    return S, X

# # Example usage:
# if __name__ == "__main__":
#     # synthetic demo
#     m, n, K, s = 50, 100, 3, 5
#     np.random.seed(0)
#     Phi = np.random.randn(m, n)
#     true_support = np.random.choice(n, s, replace=False)
#     X_true = np.zeros((n, K))
#     X_true[true_support] = np.random.randn(s, K)
#     Y = Phi @ X_true + 0.01 * np.random.randn(m, K)
    
#     S_est, X_est = somp(Y, Phi, s)
#     # print("True support:     ", sorted(true_support))
#     # print("Estimated support:", sorted(S_est))
#     print(X_true)
#     print(X_est)

# import numpy as np
# import matplotlib.pyplot as plt
# from scipy.fftpack import idct
# from sklearn.linear_model import OrthogonalMatchingPursuit

# # Set random seed for reproducibility
# np.random.seed(42)

# # Parameters
# N = 256        # signal dimension
# M = 80         # number of measurements
# s = 15         # sparsity level in DCT domain for OMP

# # Time vector
# t = np.linspace(0, 1, N, endpoint=False)

# # Generate signal: sum of three sine waves
# frequencies = [5, 20, 50]  # in cycles per interval
# x = sum(np.sin(2 * np.pi * f * t) for f in frequencies)

# # Build random Gaussian sensing matrix
# Phi = np.random.randn(M, N)

# # Take measurements
# y = Phi.dot(x)

# # Build inverse-DCT matrix and sensing dictionary
# B = idct(np.eye(N), norm='ortho')  # maps DCT coefficients to time domain
# Theta = Phi.dot(B)

# # Reconstruct sparse DCT coefficients via OMP
# omp = OrthogonalMatchingPursuit(n_nonzero_coefs=s)
# omp.fit(Theta, y)
# alpha_hat = omp.coef_

# # Recover the time-domain signal
# x_hat = idct(alpha_hat, norm='ortho')

# # Plot original vs. reconstructed signals
# plt.figure()
# plt.plot(t, x)
# plt.plot(t, x_hat)
# plt.title('Original (sum of 3 sines) vs. Reconstructed Signal')
# plt.legend(['Original', 'Reconstructed'])
# plt.show()

# # Print recovered sparsity support
# print("Recovered nonzero DCT indices:", np.nonzero(alpha_hat)[0])


# import numpy as np
# from scipy.fftpack import dct, idct

# # 1. Sample signal
# n = 1024
# x = np.sin(np.linspace(0, 4*np.pi, n))

# # 2. Measurement matrix
# m = int(0.3 * n)
# Phi = np.random.randn(m, n)
# Phi /= np.linalg.norm(Phi, axis=1, keepdims=True)

# # 3. Forward and adjoint operators
# def A_dct(s):
#     """y = Φ Ψ s  with Ψ = IDCT"""
#     x_rec = idct(s, norm='ortho')      # inverse‐DCT synthesis
#     return Phi @ x_rec

# def At_dct(r):
#     """Ψᵀ Φᵀ r  with Ψᵀ = DCT"""
#     temp = Phi.T @ r
#     return dct(temp, norm='ortho')     # forward‐DCT analysis

# # 4. Soft‐threshold
# def soft(u, thresh):
#     return np.sign(u) * np.maximum(np.abs(u) - thresh, 0)

# # 5. ISTA
# def ista_dct(y, lam, tau, max_iter=100):
#     s = np.zeros(n)
#     for _ in range(max_iter):
#         grad = At_dct(A_dct(s) - y)
#         s = soft(s - tau*grad, lam*tau)
#     return s

# # 6. Run CS + DCT
# y = Phi @ x
# L = np.linalg.norm(Phi, 2)**2    # Lipschitz approx
# tau = 1.0 / L
# lam = 0.01

# s_hat = ista_dct(y, lam, tau, max_iter=200)
# x_hat = idct(s_hat, norm='ortho')

# print("NMSE:", np.linalg.norm(x - x_hat)**2 / np.linalg.norm(x - x.mean())**2)


In [None]:
import numpy as np
s = 1
B = 2
alpha = 0.5 * np.log(0.5*(B + 1/B))

# 4 * s * B * np.sqrt(2*alpha / np.pi)
(np.sqrt(8 * np.pi * alpha) / (2 * np.pi * B)) * 128


In [None]:
"""
.. _gallery:cs:ecg:bsbl:1:

ECG Data Compressive Sensing
=====================================

.. contents::
    :depth: 2
    :local:

In this example, we demonstrate the compressive sensing of ECG data
and reconstruction using Block Sparse Bayesian Learning (BSBL).
"""

# Configure JAX to work with 64-bit floating point precision. 
import jax
jax.config.update("jax_enable_x64", True)


# %% 
# Let's import necessary libraries
import timeit
import numpy as np
import jax.numpy as jnp
# CR-Suite libraries
import cr.nimble as crn
import cr.nimble.dsp as crdsp
import cr.sparse.dict as crdict
import cr.sparse.plots as crplot
import cr.sparse.block.bsbl as bsbl

# Sample data
# from scipy.misc import electrocardiogram
import mne
# Plotting
import matplotlib.pyplot as plt
# Miscellaneous
from scipy.signal import detrend, butter, filtfilt


# # %% 
# # Test signal
# # ------------------------------
# # SciPy includes a test electrocardiogram signal
# # which is a 5 minute long electrocardiogram (ECG), 
# # a medical recording of the electrical activity of the heart, 
# # sampled at 360 Hz.
# ecg = electrocardiogram()
# # Sampling frequency in Hz
# fs = 360
# # We shall only process a part of the signal in this demo
# N = 400
# x = ecg[:N]
# t = np.arange(N) * (1/fs)

# Arquivo de exemplo da base "CHB-MIT Scalp EEG Database" 
raw = mne.io.read_raw_edf('./../files/chb01_14.edf')
fs = raw.info['sfreq']

# Captura de uma janela de 1s de um canal específico
ch_name = raw.ch_names[4]
start_time = 170
end_time = 171

x = raw.get_data(tmin=start_time, tmax=end_time, picks=(ch_name)).flatten()
N = x.size
t = np.linspace(start_time, end_time, num=N)

fig, ax = plt.subplots(figsize=(16,4))
ax.plot(t, x);




# %% 
# Preprocessing
# '''''''''''''''''''''''''''''''''''''''

# Remove the linear trend from the signal
x = detrend(x)
## bandpass filter
# lower cutoff frequency
f1 = 5
# upper cutoff frequency
f2 = 40
# passband in normalized frequency
Wn = np.array([f1, f2]) * 2 / fs
# butterworth filter
fn = 3
fb, fa = butter(fn, Wn, 'bandpass')
x = filtfilt(fb,fa,x)
fig, ax = plt.subplots(figsize=(16,4))
ax.plot(t, x);



# %% 
# Compressive Sensing at 70%
# ------------------------------
# We choose the compression ratio (M/N) to be 0.7
CR = 0.70
M = int(N * CR)
print(f'M={M}, N={N}, CR={CR}')

# %%
# Sensing matrix
Phi = crdict.gaussian_mtx(crn.KEY0, M, N)

# %%
# Measurements
# '''''''''''''''''''''''''''''''''''''''
y = Phi @ x
fig, ax = plt.subplots(figsize=(16, 4))
ax.plot(y);


# %% 
# Sparse Recovery with BSBL
# '''''''''''''''''''''''''''''''''''''''
options = bsbl.bsbl_bo_options(y, max_iters=20)
start = timeit.default_timer()
sol = bsbl.bsbl_bo_np_jit(Phi, y, 8, options=options)
stop = timeit.default_timer()
print(f'Reconstruction time: {stop - start:.2f} sec', )
print(sol)

# %%
# Recovered signal
x_hat = sol.x
print(f'SNR: {crn.signal_noise_ratio(x, x_hat):.2f} dB, PRD: {crn.percent_rms_diff(x, x_hat):.1f}%')

# %%
# Plot the original and recovered signals
ax = crplot.h_plots(2)
ax[0].plot(x)
ax[1].plot(x_hat)


# %% 
# Compressive Sensing at 50%
# ------------------------------
# Let us now increase the compression
CR = 0.50
M = int(N * CR)
print(f'M={M}, N={N}, CR={CR}')

# %%
# Sensing matrix
Phi = crdict.gaussian_mtx(crn.KEY0, M, N)

# %%
# Measurements
# '''''''''''''''''''''''''''''
y = Phi @ x
fig, ax = plt.subplots(figsize=(16, 4))
ax.plot(y);


# %% 
# Sparse Recovery with BSBL
# '''''''''''''''''''''''''''''''''''''''
options = bsbl.bsbl_bo_options(y, max_iters=20)
start = timeit.default_timer()
sol = bsbl.bsbl_bo_np_jit(Phi, y, 8, options=options)
stop = timeit.default_timer()
print(f'Reconstruction time: {stop - start:.2f} sec', )
print(sol)

# %%
# Recovered signal
x_hat = sol.x
print(f'SNR: {crn.signal_noise_ratio(x, x_hat):.2f} dB, PRD: {crn.percent_rms_diff(x, x_hat):.1f}%')

# %%
# Plot the original and recovered signals
ax = crplot.h_plots(2)
ax[0].plot(x)
ax[1].plot(x_hat)

In [None]:
from abc import ABC, abstractmethod
import mne
import matplotlib.pyplot as plt
import numpy as np
from scipy.fftpack import dct, idct
import cvxpy as cp
import cr.nimble as crn
import cr.nimble.dsp as crdsp
import cr.sparse.dict as crdict
import cr.sparse.plots as crplot
import cr.sparse.block.bsbl as bsbl

class SamplingMatrix(ABC):
    """
    Abstract class for sensing matrices.
    """

    _value: np.ndarray
    
    @property
    def value(self) -> np.ndarray:
        return self._value

class SparseBasis(ABC):
    """
    Abstract class for sparse basis.
    """

    @abstractmethod
    def apply_sampling_matrix(self, Theta: np.ndarray) -> np.ndarray:
        ...
    
    @abstractmethod
    def forward(self, x: np.ndarray) -> np.ndarray:
        ...
    
    @abstractmethod
    def backward(self, s: np.ndarray) -> np.ndarray:
        ...

class ReconstructionAlgorithm(ABC):
    """
    Abstract class for reconstruction algorithms.
    """
    
    @abstractmethod
    def reconstruct(self, y: np.ndarray, Phi: np.ndarray) -> np.ndarray:
        ...

class CompressiveSensing:
    """
    Compressive Sensing framework.
    """
    
    def __init__(self, sampling_matrix: SamplingMatrix, sparse_basis: SparseBasis, reconstruction_algorithm: ReconstructionAlgorithm):
        self.sampling_matrix = sampling_matrix
        self.sparse_basis = sparse_basis
        self.reconstruction_algorithm = reconstruction_algorithm
    
    def solve(self, x: np.ndarray) -> np.ndarray:
        y = self.sampling_matrix.value @ x
        Phi = self.sparse_basis.apply_sampling_matrix(self.sampling_matrix.value)

        s = self.reconstruction_algorithm.reconstruct(y, Phi)
        return self.sparse_basis.backward(s)

    def calc_metrics(self, x: np.ndarray, x_hat: np.ndarray) -> tuple[float, float] | tuple[np.ndarray, np.ndarray]:
        """
        Calculate metrics for the reconstruction.
        """

        nmse = self._nmse(x, x_hat)
        snr = self._snr(x, x_hat)
        
        return nmse, snr
    
    def _nmse(self, x: np.ndarray, x_hat: np.ndarray) -> float | np.ndarray:
        """
        Calculate normalized mean square error (NMSE).

        ||x - x_hat||^2 / ||x - mean(x)||^2
        """
        mu = x.mean(axis=0)

        den = np.linalg.norm(x - mu, axis=0)**2
        num = np.linalg.norm(x - x_hat, axis=0)**2

        return np.where(den == 0, np.inf, num / den)
    
    def _snr(self, x: np.ndarray, x_hat: np.ndarray) -> float | np.ndarray:
        """
        Calculate signal to noise ratio (SNR).
        """
        den = np.linalg.norm(x - x_hat, axis=0)**2
        num = np.linalg.norm(x, axis=0)**2

        return np.where(den == 0, np.inf, 10*np.log10(num / den))
    

class GaussianSamplingMatrix(SamplingMatrix):
    """
    Gaussian sensing matrix.
    """
    
    def __init__(self, m: int, n: int, random_state: int = 42):
        rng = np.random.default_rng(random_state)
        self._value = rng.standard_normal((m, n)) / np.sqrt(m)

class Undersampled(SamplingMatrix):
    """
    Undersampled identity matrix.
    """
    
    def __init__(self, m: int, n: int, random_state: int = 42):
        rng = np.random.default_rng(random_state)
        indices = np.sort(rng.choice(n, m, replace=False))
        self._value = np.eye(n)[indices, :]


class BinaryPermutedBlockDiagonal(SamplingMatrix):
    """
    Binary permuted block diagonal matrix.
    """
    
    def __init__(self, m: int, cr: int):
        I = np.eye(m, dtype=int)
        self._value = np.repeat(I, repeats=cr, axis=1)

class SparseBinary(SamplingMatrix):
    """
    Sparse binary matrix with `d` ones in each column in random rows.
    """
    
    def __init__(self, m: int, n: int, d: int, random_state: int = 42):
        if d > m:
            raise ValueError("The number of ones per column (d) cannot exceed the number of rows (m).")
        
        rng = np.random.default_rng(random_state)
        self._value = np.zeros((m, n), dtype=int)
        
        for col in range(n):
            # Randomly select `d` unique rows for each column
            row_indices = rng.choice(m, d, replace=False)
            self._value[row_indices, col] = 1

class DCTBasis(SparseBasis):
    """
    Discrete Cosine Transform (DCT) basis.
    """
    
    def __init__(self, n: int):
        self._Psi = np.eye(n)
    
    def apply_sampling_matrix(self, Theta: np.ndarray) -> np.ndarray:
        return dct(Theta, norm='ortho')
    
    def forward(self, x: np.ndarray) -> np.ndarray:
        return dct(x, norm='ortho')
    
    def backward(self, s: np.ndarray) -> np.ndarray:
        return idct(s, norm='ortho')

class GaborBasis(SparseBasis):
    """
    Gabor basis.
    """
    
    def __init__(
        self,
        N: int,
        fs: float,
        tf: int = 2,
        ff: int = 4,
        scales: list[int] | None = None,
    ):
        """Create an over‑complete *real* Gabor dictionary Dₜ₍ₜf f₍ff) matching Table II.

        Parameters
        ----------
        N         : Signal length (samples) — 512 in the paper.
        fs        : Sampling frequency (Hz) — 128 in the paper.
        tf, ff    : Time‑factor and frequency‑factor (1, 2, 4, 8).
        scales    : Sequence of *scale* values *s* (Gaussian σ in samples). If *None*, the
                    canonical set {1, 2, 4, 8, 16, 32, 64} is used.
        time_base : Empirical constant giving the time step at scale 1 & tf = 1 (samples).
        freq_base : Frequency step at scale 1 & ff = 1 (Hz).

        Returns
        -------
        D         : (N × P) NumPy array with *P* atoms (column‑wise), ℓ₂‑normalised.
        meta      : List of triplets (*scale, n0, f0*) for each atom.
        """
        if scales is None:
            scales = [1, 2, 4, 8, 16, 32, 64]

        B = 2.0
        alpha = 0.5 * np.log(0.5*(B + 1/B))

        n = np.arange(N)
        atoms: list[np.ndarray] = []
        for s in scales:
            time_base = s * B * np.sqrt(2 * alpha / np.pi)
            ts = 4 * time_base / tf
            n0_vals = np.arange(0, N, ts)

            freq_base = (np.sqrt(8.0 * np.pi * alpha) / (s * B)) * (fs / 2.0)
            fs_step = freq_base / ff  # Eq. 10 → frequency step (Hz)
            f0_vals = np.arange(fs_step, fs / 2.0, fs_step)  # avoid DC / Nyquist

            for n0 in n0_vals:
                for f0 in f0_vals:
                    atom = self._gabor_atom(n, n0, f0, s)
                    atoms.append(atom)

        D = np.stack(atoms, axis=1)  # (N × P)
        self._Psi = D.astype(np.float32)
    
    def apply_sampling_matrix(self, Theta: np.ndarray) -> np.ndarray:
        return Theta @ self._Psi
    
    def forward(self, x: np.ndarray) -> np.ndarray:
        return self._Psi @ x
    
    def backward(self, s: np.ndarray) -> np.ndarray:
        return self._Psi @ s
    
    def _gabor_atom(self, n: list[int], n0: float, f0: float, s: float, phi: float = 0.0) -> np.ndarray:
        """Generate a *real* Gabor atom (eq. 8 in the paper).

        G(x) = exp(-(x‑n0)² / (2 s²)) · cos(2π f0 (x‑n0)/fs + φ)
        The atom is ℓ₂‑normalised.
        """
        atom = np.exp(-(n - n0) ** 2 / (2.0 * s ** 2)) * np.sin(2.0 * np.pi * f0 * (n - n0) + phi)
        atom /= np.linalg.norm(atom)
        return atom.astype(np.float32)

class BasisPursuit(ReconstructionAlgorithm):
    """
    Basis Pursuit.
    """
    
    def reconstruct(self, y: np.ndarray, Phi: np.ndarray) -> np.ndarray:
        alpha = cp.Variable(Phi.shape[1])
        objective = cp.Minimize(cp.norm1(alpha))
        constraints = [y == Phi @ alpha]
        prob = cp.Problem(objective, constraints)
        prob.solve()
        if prob.status != cp.OPTIMAL:
            raise ValueError("The optimization problem is not solvable.")
        
        return np.asarray(alpha.value, dtype=np.float32)



class OrthogonalMatchingPursuit(ReconstructionAlgorithm):
    """
    Orthogonal Matching Pursuit (OMP) reconstruction algorithm.
    """
    
    def __init__(self, max_iter: int = 100, tol: float = 1e-6):
        """
        Parameters
        ----------
        max_iter : int
            Maximum number of iterations for the OMP algorithm.
        tol : float
            Tolerance for the residual norm to stop the algorithm.
        """
        self.max_iter = max_iter
        self.tol = tol

    def reconstruct(self, y: np.ndarray, Phi: np.ndarray) -> np.ndarray:
        """
        Reconstruct the sparse signal using OMP.

        Parameters
        ----------
        y : np.ndarray
            The observed measurements (m-dimensional vector).
        Phi : np.ndarray
            The sensing matrix (m x n matrix).

        Returns
        -------
        np.ndarray
            The reconstructed sparse signal (n-dimensional vector).
        """
        m, n = Phi.shape
        residual = y.copy()
        support = []

        for _ in range(self.max_iter):
            # Step 1: Find the column of Phi most correlated with the residual
            correlations = Phi.T @ residual
            best_index = np.argmax(np.abs(correlations))
            support.append(best_index)

            # Step 2: Solve least squares problem to update the solution
            Phi_support = Phi[:, support]
            s_support = np.linalg.lstsq(Phi_support, y, rcond=None)[0]

            # Step 3: Update the residual
            residual = y - Phi_support @ s_support

            # Check stopping condition
            if np.linalg.norm(residual) < self.tol:
                break

        # Step 4: Construct the full solution vector
        s = np.zeros(n)
        s[support] = s_support
    
        return s

class SimultaneousOrthogonalMatchingPursuit(ReconstructionAlgorithm):
    """
    Simultaneous Orthogonal Matching Pursuit (SOMP) reconstruction algorithm.
    """
    
    def __init__(self, max_iter: int = 100, tol: float = 1e-6):
        self.max_iter = max_iter
        self.tol = tol

    def reconstruct(self, Y: np.ndarray, Phi: np.ndarray):
        """
        Simultaneous Orthogonal Matching Pursuit (SOMP)

        Parameters
        ----------
        Y   : array, shape (m, K)
            Matrix of K measurement vectors (each column is one measurement).
        Phi : array, shape (m, n)
            Dictionary matrix whose columns are the atoms.
        s   : int
            Sparsity level: number of atoms to select.

        Returns
        -------
        S   : list of int
            Indices of the selected atoms (size s).
        X   : array, shape (s, K)
            Coefficient matrix (only the rows corresponding to S are nonzero).
        """
        # Initialize residual and support
        R = Y.copy()                    # residual: shape (m, K)
        S = []                          # support set (list of selected indices)
        
        # Precompute Phi^T for efficiency
        PhiT = Phi.T                  # shape (n, m)
        
        for _ in range(self.max_iter):
            # 1) Compute (absolute) correlations for each atom j: sum over all K signals
            #    corr[j] = || (Phi[:, j].T @ R) ||_1
            # Using matrix form: (Phi^T @ R) is (n x K), then take abs and sum across columns.
            corr = np.sum(np.abs(PhiT @ R), axis=1)  # shape (n,)
            
            # 2) Mask out already-selected atoms (set their score to -inf)
            mask = np.ones_like(corr, dtype=bool)
            mask[S] = False
            corr_masked = np.where(mask, corr, -np.inf)
            
            # 3) Pick the atom with maximum correlation
            j_best = int(np.argmax(corr_masked))
            S.append(j_best)
            
            # 4) Form the sub-dictionary with selected atoms
            Phi_S = Phi[:, S]           # shape (m, |S|)
            
            # 5) Solve least-squares to get coefficients for all K signals:
            #    X_S = Phi_S^+ Y, where Phi_S^+ is the Moore–Penrose pseudoinverse
            X_S = np.linalg.pinv(Phi_S) @ Y  # shape (|S|, K)
            
            # 6) Update residuals: R = Y - Phi_S X_S
            R = Y - Phi_S @ X_S           # shape (m, K)

            # 7) Check stopping condition
            if np.linalg.norm(R) < self.tol:
                break
        
        # 8) Construct the full solution vector
        # X: full n×K coefficient matrix, but only rows in S are nonzero.
        X = np.zeros((Phi.shape[1], Y.shape[1]))
        X[S, :] = X_S
        
        return X

def get_signals(start_time: int, end_time: int) -> tuple[np.ndarray, np.ndarray, float]:
    """
    Get a sample EEG signal from the CHB-MIT Scalp EEG Database.
    """
    # Load the raw EEG data
    raw = mne.io.read_raw_edf('./../files/chb01_14.edf', verbose=False)
    fs = raw.info['sfreq']
    
    # Get the data for the selected channel and time window
    X = raw.get_data(tmin=start_time, tmax=end_time).T
    t = np.linspace(start_time, end_time, num=X.shape[0])
    
    return X, t, fs


def main():
    X, t, fs = get_signals(
        start_time=1,
        end_time=2
    )
    n = X.shape[0]

    cr = 2
    m = int(n / cr)

    # ----------------------------------------------- 1° Etapa: Definir a matriz de amostragem --------------------------------------------------
    # sampling_matrix = SparseBinary(m, n, d = 8, random_state=42)
    # sampling_matrix = GaussianSamplingMatrix(m, n)
    # sampling_matrix = Undersampled(m, n)
    sampling_matrix = BinaryPermutedBlockDiagonal(m, cr)

    # ----------------------------------------------- 2° Etapa: Definir dicionário --------------------------------------------------------------
    sparse_basis = DCTBasis(n)
    # sparse_basis = GaborBasis(n, fs=fs, tf=2, ff=4)

    # ----------------------------------------------- 3° Etapa: Definir algoritmo de reconstrução -----------------------------------------------
    reconstruction_algorithm = SimultaneousOrthogonalMatchingPursuit(max_iter=250, tol=1e-8)
    
    # ----------------------------------------------- 4° Etapa: Aplicar compressed sensing ------------------------------------------------------
    cs = CompressiveSensing(sampling_matrix, sparse_basis, reconstruction_algorithm)
    X_hat = cs.solve(X)

    # ----------------------------------------------- 5° Etapa: Avaliar resultado ---------------------------------------------------------------
    # nmse, snr = cs.calc_metrics(X[:, 18], X_hat[:, 18])
    # print(f"NMSE: {nmse:.4f}, SNR: {snr:.4f} dB")
    nmse, snr = cs.calc_metrics(X, X_hat)
    print(f"nmse = {nmse}", f"snr = {snr}")
    # print(f"NMSE: {nmse:.4f}, SNR: {snr:.4f} dB")

    # ----------------------------------------------- 6° Etapa: Plotar curvas -------------------------------------------------------------------
    # fig, ax = plt.subplots(figsize=[15, 5])
    # ax.plot(t, X, 'k', label='Sinal original')
    # ax.plot(t, X_hat, 'r', linestyle='dotted', label='Sinal reconstruído')
    # ax.set_xlabel('Tempo (s)')
    # ax.set_ylabel('Amplitude')
    # ax.set_title('Sinal original x Sinal reconstruído')
    # ax.legend(loc='upper right', framealpha=1)
    # ax.grid()

    # plt.show()

def main2():
    X, t, fs = get_signals(
        start_time=1,
        end_time=2
    )
    
    x = X[:, 0]
    n = x.size

    cr = 2
    m = int(n / cr)

    # ----------------------------------------------- 1° Etapa: Definir a matriz de amostragem --------------------------------------------------
    # sampling_matrix = SparseBinary(m, n, d = 8, random_state=42)
    # sampling_matrix = GaussianSamplingMatrix(m, n)
    # sampling_matrix = Undersampled(m, n)
    sampling_matrix = BinaryPermutedBlockDiagonal(m, cr)

    # ----------------------------------------------- 2° Etapa: Definir dicionário --------------------------------------------------------------
    sparse_basis = DCTBasis(n)
    # sparse_basis = GaborBasis(n, fs=fs, tf=2, ff=4)

    # ----------------------------------------------- 3° Etapa: Definir algoritmo de reconstrução -----------------------------------------------
    reconstruction_algorithm = BasisPursuit()
    # reconstruction_algorithm = OrthogonalMatchingPursuit(max_iter=250, tol=1e-8)

    # ----------------------------------------------- 4° Etapa: Aplicar compressed sensing ------------------------------------------------------
    cs = CompressiveSensing(sampling_matrix, sparse_basis, reconstruction_algorithm)
    x_hat = cs.solve(x)

    # ----------------------------------------------- 5° Etapa: Avaliar resultado ---------------------------------------------------------------
    nmse, snr = cs.calc_metrics(x, x_hat)
    print(f"NMSE: {nmse:.4f}, SNR: {snr:.4f} dB")

    # ----------------------------------------------- 6° Etapa: Plotar curvas -------------------------------------------------------------------
    fig, ax = plt.subplots(figsize=[15, 5])
    ax.plot(t, x, 'k', label='Sinal original')
    ax.plot(t, x_hat, 'r', linestyle='dotted', label='Sinal reconstruído')
    ax.set_xlabel('Tempo (s)')
    ax.set_ylabel('Amplitude')
    ax.set_title('Sinal original x Sinal reconstruído')
    ax.legend(loc='upper right', framealpha=1)
    ax.grid()

    plt.show()


if __name__ == "__main__":
    # main()
    main2()

In [None]:
import math as mt
import numpy as np
import cvxpy as cp
import matplotlib.pyplot as plt

""" (Robust) Compressed Sensing (CS) and harmonic signals (sum of sinusoids)


    Parameters
    ----------
    S : int
        Number of non-zeros coefficients of the S-sparse signal representation.
    pts : int
        Number of points of the original signal (sinusoid sum).
    n : int
        Number of FFT points and, consequently, of the sparse signal points.
    N : int
        Number of columns of the sensing matrix.
    M : int
        Number of rows of the sensing matrix.
    t : (pts,) ndarray
        Range (in rads) of values used by the reference sinusoid.
    f : (S,) ndarray
        Random frequency values with uniform distribution.
    A : (S,) list
        Random amplitude values with uniform distribution.
    Phi : (M,N) ndarray
        Sensing (aquisition) matrix generated from i.i.d Gaussian samples.
    wNoise : (M,) ndarray
        Sensing noise with (Normal) Gaussian distribution ~ N(0,1).
    s : (pts,) ndarray
        Superposition of S sinusoids with random frequencies/amplitudes.
    fft : (n,) ndarray
        DFT of the sum of sinusoids using FFT algorithm.
    x : (n,) ndarray
        S-sparse (compressible) signal.
    idx : (n,) ndarray
        Sorted index values of largest coefficients (in magnitude) in 'x'.
    y : (M,) ndarray
        Compressed signal.
    eps : float
        Second Order Cone Program (SOCP) constraint constant for robust CS.
    xOpt
        CVXPY optimization problem variable and sparsest solution.
    SOC_constraints : list
        CVXPY problem second order cone constraints for robust CS.
    prob
        CVXPY optimization probem formulation.
    sCs : (pts,) ndarray
        Signal (sum of sinusoids) reconstruction with CS.
    fs : int
        Sampling frequency for linear (Nyquist-Shannon) reconstruction.
    Psi : (pts,pts) ndarray
        Sensing (aquisition) identity matrix generated for linear sampling.
    uniIdx : (fs,) ndarray
        Indexes of the positions sampled from the original signal (Dirac comb).
    yNs : (fs,) ndarray
        Signal after linear sampling.
    sNs : (pts,) ndarray
        Signal (sum of sinusoids) reconstruction with sinc interpolation.


    Notes
    -----
    This code implements a (robust) CS scheme in which a sum of sinusoids with
    random amplitudes/frequencies is recovered using l_1 minimization. For
    comparison purposes, this simulation also includes the Nyquist-Shannon
    linear sampling scheme.


    References
    ----------

    .. [1] E. J. Candes and M. B. Wakin, "An Introduction To Compressive
           Sampling," in IEEE Signal Processing Magazine, vol. 25, no. 2,
           pp. 21-30, March 2008.

    .. [2] Han, Z., Li, H., & Yin, W. (2013). Compressive Sensing for Wireless
           Networks. Cambridge: Cambridge University.

    .. [3] Eldar, Y., & Kutyniok, G. (Eds.). (2012). Compressed Sensing:
           Theory and Applications. Cambridge: Cambridge University Press.

    .. [4] S. Boyd and L. Vandenberghe, Convex Optimization. New York, NY, USA:
           Cambridge University Press, 2004.


    © 2020 Pedro H. C. de Souza
"""
S = 16
pts = 2047
n = (pts + 1)//2
N = n  # the 'n'-point FFT sparsifies the signal, thus dim(Range('Phi')) = 'n'
M = 91  # m >= C*S*log(N/S) for i.i.d Gaussian sensing matrix (to obey RIP)
t = np.linspace(0, 2*np.pi, pts)  # ref. sinusoid period spans 'pts' samples
np.random.seed(1)  # repeat a run with the same aplitudes/frequecies values
f = np.random.randint(1, 100, S)  # location of the S non-zero coefficients
A = np.random.randint(5, 10, S)  # magnitude of the S non-zero coefficients
np.random.seed()  # a new realization of the sensing matrix for each run
Phi = np.random.randn(M, N)/mt.sqrt(M)  # ~ N(0,1/M) (variance is normalized)
wNoise = (np.random.randn(M) + 1j*np.random.randn(M))/mt.sqrt(2)  # ~ N(0,1)

s = np.zeros(pts)
for i in range(S):
    s += A[i]*np.sin(f[i]*t)  # generate the sum of sinusoids
fft = np.fft.rfft(s, norm="ortho")
# x = fft  # uncomment (comment below) to add signal noise

# Optimum thresholding strategy (see [3, pg.11])
x = np.zeros(N) + 1j*np.zeros(N)
idx = np.argsort(-np.abs(fft)**2)  # sort coefficients in descend order
x[idx[0:S]] = fft[idx[0:S]]  # oracle that selects the largest coefficients

# Universal encoding
y = Phi@x.T  # sensing (aquisition) of the sparse (compressible) signal
# y = Phi@x.T + wNoise  # uncomment (comment above) to add sensing noise

# l_1 minimization reconstruction
xOpt = cp.Variable(N, complex=True)
# eps = np.linalg.norm(wNoise)  # see [2, pg.64] Theorem 4
# SOC_constraints = [cp.SOC(cp.real(eps), Phi@xOpt.T - y)]  # uncomment
# prob = cp.Problem(cp.Minimize(cp.norm(xOpt, p=1)),        # (comment below)
#                   SOC_constraints)                        # for robust CS
prob = cp.Problem(cp.Minimize(cp.norm(xOpt, p=1)),
                  [Phi@xOpt.T == y])  # l_1 minization problem in CVXPY
prob.solve()
sCs = np.fft.irfft(xOpt.value, pts, norm="ortho")  # IFFT performed on "x*"

# Nyquist-Shannon linear sampling
fs = 161
Psi = np.identity(pts)
uniIdx = np.round(np.arange(0, pts - 1, pts/fs)).astype(int)
yNs = Psi[uniIdx, :]@s.T
sNs = np.zeros(pts)
for i in range(pts):
    sNs[i] = yNs @ np.sinc((t[i]/(2*np.pi) - np.arange(fs)*(1/fs))/(1/fs))

# Plot results
fig, ax = plt.subplots()
plt.subplot(211)
line1, = plt.plot(t, s, 'k', label='Original')
line2, = plt.plot(t, sNs, '--g', label='Nyquist-Shannon')
line3, = plt.plot(t, sCs, ':r', label='CS')
plt.axis([np.pi, np.pi + 0.5, np.min(s), np.max(s)])
plt.xlabel('samples')
plt.ylabel('Amplitude')
plt.xticks(visible=False)
plt.yticks(visible=False)
plt.legend(fontsize='small')
props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
textstr = '\n'.join((
    'Nyquist-Shannon \n M = %.2i samples \n $|| \mathbf{s^*} - \mathbf{s^o} ||^2_2 /N$ = %.2E' %
    (fs, np.linalg.norm(sNs - s)**2 / s.size / (sum(A**2) / 2)),
    'CS \n M = %.2i samples \n $|| \mathbf{s^*} - \mathbf{s^o} ||^2_2 /N$ = %.2E' %
    (M, np.linalg.norm(sCs - s)**2 / s.size / (sum(A**2) / 2))))
plt.text(0.02, 0.98, textstr, transform=ax.transAxes, fontsize='xx-small',
         verticalalignment='top', bbox=props)

plt.subplot(212)
normCf = (np.linalg.norm(s)**2)/2
line1, = plt.plot(np.arange(n), (np.abs(x)**2)/normCf,
                  'xb', markersize=4, label='Original (FFT)')
line2, = plt.plot(np.arange(n), (np.abs(xOpt.value)**2)/normCf, 'or', fillstyle='none',
                  markersize=4, label='CS')
plt.axis([0, np.max(f) + 20, 0, 0.15])
plt.xlabel('Frequency')
plt.ylabel('Amplitude')
plt.yticks(visible=False)
plt.legend(fontsize='small')
textstr = r'$S = %i$' % (S, )
plt.text(0.9, 0.05, textstr, transform=ax.transAxes, fontsize='small',
         verticalalignment='bottom', bbox=props)

In [None]:
import numpy as np
from scipy.linalg import solve # For solving linear systems

# --- Helper Functions ---

def frob_norm_sq(X):
    """Calculates the squared Frobenius norm of a matrix X."""
    return np.linalg.norm(X, 'fro')**2

def l21_norm(X):
    """Calculates the l2,1-norm of a matrix X (sum of L2 norms of rows)."""
    # Calculate L2 norm for each row
    row_l2_norms = np.linalg.norm(X, axis=1)
    # Sum the L2 norms
    return np.sum(row_l2_norms)

def soft_threshold_row_sparse(X, lambda_val, mu_val):
    """
    Implements the row-sparse soft-thresholding operation for updating Q.
    Based on Equation (12d) and the definition of Lambda in the paper.

    Q <- signum(X) * max(0, |X| - (lambda/mu) * Lambda)
    where Lambda = diag(||X_j||_2^-1) * abs(X)
    """
    # X corresponds to (Z + B2) in the paper's notation
    
    # Calculate L2 norm for each row of X
    row_l2_norms = np.linalg.norm(X, axis=1, keepdims=True) # Keepdims for broadcasting
    
    # Handle rows with zero L2 norm to avoid division by zero.
    # If a row's L2 norm is zero, its inverse is effectively zero for scaling.
    # This also ensures that if a row is all zeros, it remains all zeros after thresholding.
    inv_row_l2_norms = np.zeros_like(row_l2_norms)
    non_zero_rows_idx = row_l2_norms.flatten() != 0
    inv_row_l2_norms[non_zero_rows_idx] = 1.0 / row_l2_norms[non_zero_rows_idx]

    # Calculate Lambda matrix as defined in the paper
    # Lambda_jk = (1 / ||X_j||_2) * |X_jk|
    Lambda_matrix = inv_row_l2_norms * np.abs(X)

    # Calculate the threshold term
    threshold_term = (lambda_val / mu_val) * Lambda_matrix

    # Apply the soft-thresholding
    # |X| - threshold_term
    arg_max = np.abs(X) - threshold_term
    
    # max(0, arg_max)
    max_val = np.maximum(0, arg_max)
    
    # signum(X) * max_val
    Q_new = np.sign(X) * max_val
    
    return Q_new

def calculate_objective_function(Y, A, D, Z, P, Q, B1, B2, lambda1, lambda2, mu1, mu2):
    """
    Calculates the Augmented Lagrangian objective function (Equation 11).
    """
    term1 = frob_norm_sq(Y - A @ D @ Z)
    term2 = lambda1 * frob_norm_sq(P)
    term3 = lambda2 * l21_norm(Q) # Using lambda2 for l21_norm as per (9) and (12d)
    term4 = mu1 * frob_norm_sq(P - D - B1)
    term5 = mu2 * frob_norm_sq(Q - Z - B2)
    
    return term1 + term2 + term3 + term4 + term5

# --- Main Algorithm Implementation ---

def row_sparse_bcs(Y, A, lambda1, lambda2, mu1, mu2,
                   n_features_dict, max_iterations=100, tolerance=1e-4):
    """
    Implements the Row-Sparse Blind Compressed Sensing (BCS) algorithm
    using Split Bregman as described in Shukla & Majumdar (2015).

    Args:
        Y (np.ndarray): Compressed multi-channel EEG signal (m x num_channels).
        A (np.ndarray): Random projection matrix (m x n).
        lambda1 (float): Regularization parameter for D (Frobenius norm).
        lambda2 (float): Regularization parameter for Z (l2,1-norm).
        mu1 (float): Bregman parameter for P-D constraint.
        mu2 (float): Bregman parameter for Q-Z constraint.
        n_features_dict (int): Number of atoms in the dictionary D.
        max_iterations (int): Maximum number of outer iterations.
        tolerance (float): Tolerance for objective function change to stop.

    Returns:
        tuple: (D_hat, Z_hat, P_hat, Q_hat) - estimated matrices.
    """
    
    m, num_channels = Y.shape
    n = A.shape[1] # Original signal dimension

    # Initialize D, Z, P, Q, B1, B2
    # D: (n x n_features_dict)
    # Z: (n_features_dict x num_channels)
    # P: (n x n_features_dict) - proxy for D
    # Q: (n_features_dict x num_channels) - proxy for Z
    # B1: (n x n_features_dict) - Bregman variable for P-D
    # B2: (n_features_dict x num_channels) - Bregman variable for Q-Z

    # Paper suggests D initialized to random values, number of columns > 30 (e.g., 40)
    D = np.random.randn(n, n_features_dict)
    
    # Paper suggests B1, B2 initialized to all ones.
    B1 = np.ones((n, n_features_dict))
    B2 = np.ones((n_features_dict, num_channels))

    # Initialize Z and P. Often zeros or random small values.
    # For P, it's a proxy for D, so it should have same shape as D.
    P = np.zeros((n, n_features_dict))
    # For Q, it's a proxy for Z, so it should have same shape as Z.
    Q = np.zeros((n_features_dict, num_channels))
    # Z is the sparse coefficient matrix, so it should have shape (n_features_dict, num_channels).
    Z = np.zeros((n_features_dict, num_channels))


    prev_objective_value = float('inf')

    print("Starting Row-Sparse BCS Algorithm...")
    print(f"Initial objective value (approx): {calculate_objective_function(Y, A, D, Z, P, Q, B1, B2, lambda1, lambda2, mu1, mu2):.4f}")

    for iteration in range(max_iterations):
        # --- 1. Solve for Z (Sub-problem 12b) ---
        # min_Z ||Y - ADZ||_F^2 + mu2 ||Q - Z - B2||_F^2
        # Derivative w.r.t Z set to zero:
        # (D^T A^T A D + mu2 * I) Z = D^T A^T Y + mu2 * (Q - B2)
        
        Left_Z = D.T @ A.T @ A @ D + mu2 * np.eye(n_features_dict)
        Right_Z = D.T @ A.T @ Y + mu2 * (Q - B2)
        
        # Solve the linear system for Z
        Z = solve(Left_Z, Right_Z)

        # --- 2. Solve for D (Sub-problem 12a) ---
        # min_D ||Y - ADZ||_F^2 + mu1 ||P - D - B1||_F^2
        # Derivative w.r.t D set to zero:
        # A^T A D Z Z^T + mu1 D = A^T Y Z^T + mu1 * (P - B1)
        # This is a generalized Sylvester equation of the form M D N + Q D = S.
        # Solved by vectorizing D and forming a large linear system.
        
        # Coefficients for vec(D)
        C1 = A.T @ A # (n x n)
        C2 = Z @ Z.T # (n_features_dict x n_features_dict)

        # Matrix for vec(D) in the linear system (K_D @ vec(D) = RHS_D_flat)
        # K_D = kron(C2.T, C1) + mu1 * I_vec(D)
        # The Kronecker product order is important: kron(B, A) for AXB form.
        # Here it's A D Z, so the matrix for vec(D) is (Z.T @ Z) kron (A.T @ A)
        # Let's verify the Kronecker product for M D N + Q D = S.
        # Vectorizing: (N.T kron M + kron(I, Q)) vec(D) = vec(S)
        # In our case: M=A.T@A, N=Z@Z.T, Q=mu1*I, S=A.T@Y@Z.T + mu1*(P-B1)
        
        K_D = np.kron(C2.T, C1) + mu1 * np.eye(D.size) # D.size = n * n_features_dict
        
        # RHS of the linear system, flattened
        RHS_D = A.T @ Y @ Z.T + mu1 * (P - B1)
        RHS_D_flat = RHS_D.flatten() # Flatten to a 1D array
        
        # Solve for the vectorized D and reshape back
        vec_D = solve(K_D, RHS_D_flat)
        D = vec_D.reshape(D.shape)

        # --- 3. Solve for P (Sub-problem 12c) ---
        # min_P lambda1 ||P||_F^2 + mu1 ||P - D - B1||_F^2
        # Closed-form solution: P = (mu1 / (lambda1 + mu1)) * (D + B1)
        
        P = (mu1 / (lambda1 + mu1)) * (D + B1)

        # --- 4. Update Q (Sub-problem 12d) ---
        # Q <- signum(Z+B2) * max(0, |Z+B2| - (lambda2/mu2) * Lambda)
        # Lambda = diag(||(Z+B2)_j||_2^-1) * abs(Z+B2)
        
        Q = soft_threshold_row_sparse(Z + B2, lambda2, mu2)

        # --- 5. Update Bregman Variables ---
        # As per the paper's explicit formulas:
        B1 = P - D - B1
        B2 = Q - Z - B2
        
        # --- Check for Convergence ---
        current_objective_value = calculate_objective_function(Y, A, D, Z, P, Q, B1, B2, lambda1, lambda2, mu1, mu2)
        
        change_in_objective = abs(prev_objective_value - current_objective_value)
        
        print(f"Iteration {iteration+1}: Objective = {current_objective_value:.4f}, Change = {change_in_objective:.6f}")
        
        if change_in_objective < tolerance:
            print(f"Converged at iteration {iteration+1}.")
            break
            
        prev_objective_value = current_objective_value
    else:
        print(f"Maximum iterations ({max_iterations}) reached.")

    return D, Z, P, Q

# --- Synthetic Data Generation ---
# Dimensions (adjust as needed for your experiments)
m = 128  # Number of measurements (rows of Y, A)
n = 256  # Original signal dimension (cols of A, rows of X=DZ)
num_channels = 4 # Number of EEG channels (cols of Y, Z)
n_features_dict = 40 # Number of atoms in dictionary D (cols of D, rows of Z) - as per paper

# Generate random projection matrix A (e.g., Bernoulli or Gaussian)
# Using Bernoulli matrix as it's common in Compressed Sensing
A = np.random.choice([-1, 1], size=(m, n)) / np.sqrt(m)

# Generate true dictionary D_true (n x n_features_dict)
D_true = np.random.randn(n, n_features_dict)

# Generate true sparse coefficients Z_true (n_features_dict x num_channels)
# Make it row-sparse by setting some rows to zero
Z_true = np.random.randn(n_features_dict, num_channels)
sparsity_ratio = 0.5 # Percentage of rows to keep non-zero
num_sparse_rows = int(n_features_dict * sparsity_ratio)
zero_rows_indices = np.random.choice(n_features_dict, n_features_dict - num_sparse_rows, replace=False)
Z_true[zero_rows_indices, :] = 0

# Generate Y_true = A @ D_true @ Z_true
Y_true = A @ D_true @ Z_true

# Add Gaussian noise to Y
noise_level = 0.05 # Adjust noise level as desired
noise = noise_level * np.random.randn(*Y_true.shape)
Y = Y_true + noise

# --- Algorithm Parameters (from the paper) ---
lambda1 = 0.01 # Example value, paper uses lambda1 for D, but doesn't specify a value.
               # I'll use a small value to encourage D to be somewhat structured.
lambda2 = 0.01 # Example value, paper uses lambda2 for Z, but doesn't specify a value.
               # This controls the sparsity of Z.
mu1 = 0.1      # Bregman parameter for P-D constraint (from paper)
mu2 = 0.001    # Bregman parameter for Q-Z constraint (from paper)

# --- Run the Algorithm ---
D_hat, Z_hat, P_hat, Q_hat = row_sparse_bcs(
    Y, A, lambda1, lambda2, mu1, mu2, n_features_dict
)

# --- Evaluate Results ---
print("\n--- Results ---")
print("Original signal dimension (n):", n)
print("Number of measurements (m):", m)
print("Number of channels:", num_channels)
print("Number of dictionary atoms (n_features_dict):", n_features_dict)

# Calculate Reconstruction Error (Normalized Mean Squared Error - NMSE)
# NMSE = ||original - reconstructed||_F / ||original||_F
# Here, original signal X = D_true @ Z_true
# Reconstructed signal X_hat = D_hat @ Z_hat

X_true = D_true @ Z_true
X_hat = D_hat @ Z_hat

nmse = np.linalg.norm(X_true - X_hat, 'fro') / np.linalg.norm(X_true, 'fro')
print(f"\nNormalized Mean Squared Error (NMSE) for X: {nmse:.4f}")

# Check sparsity of Z_hat (number of non-zero rows)
# A row is considered non-zero if its L2 norm is above a small threshold
threshold_for_zero_row = 1e-6
num_non_zero_rows_Z_true = np.sum(np.linalg.norm(Z_true, axis=1) > threshold_for_zero_row)
num_non_zero_rows_Z_hat = np.sum(np.linalg.norm(Z_hat, axis=1) > threshold_for_zero_row)

print(f"Number of non-zero rows in Z_true: {num_non_zero_rows_Z_true} / {n_features_dict}")
print(f"Number of non-zero rows in Z_hat: {num_non_zero_rows_Z_hat} / {n_features_dict}")

print("\nShape of estimated D (Dictionary):", D_hat.shape)
print("Shape of estimated Z (Sparse Coefficients):", Z_hat.shape)
print("Shape of estimated P (Proxy for D):", P_hat.shape)
print("Shape of estimated Q (Proxy for Z):", Q_hat.shape)

# Optional: Visualize some results (requires matplotlib)
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.imshow(np.abs(Z_true), aspect='auto', cmap='viridis')
plt.title('Absolute Z_true (Row-Sparse)')
plt.colorbar()

plt.subplot(1, 2, 2)
plt.imshow(np.abs(Z_hat), aspect='auto', cmap='viridis')
plt.title('Absolute Z_hat (Estimated)')
plt.colorbar()
plt.tight_layout()
plt.show()


In [None]:
import scipy.io as sio
import numpy as np
import pandas as pd

def get_mat_data_from_bciiv(file_path: str):
    mat_data = sio.loadmat(file_path, spmatrix=False)

    # INT16 to uV
    fs = int(mat_data["nfo"]["fs"][0][0][0][0])
    chs = np.array([arr[0] for arr in  mat_data["nfo"]["clab"][0][0][0]])
    X = 0.1 * np.array(mat_data["cnt"], dtype=np.float64)

    return chs, fs, X

file_path = './../BCICIV_1calib_1000Hz_mat/BCICIV_calib_ds1a_1000Hz.mat'
chs, fs, raw_signals = get_mat_data_from_bciiv(file_path)

X = raw_signals

chs, fs

In [None]:
import scipy.io as sio
import numpy as np

class BCIIVLoader:
    """
    Loader for BCI Competition IV .mat EEG files.
    Allows channel selection and time slicing.
    """
    def __init__(self, file_path: str):
        self.file_path = file_path
        self._load_mat()
    
    def _load_mat(self):
        mat_data = sio.loadmat(self.file_path, spmatrix=False)
        self.fs = int(mat_data["nfo"]["fs"][0][0][0][0])
        self.ch_names = np.array([arr[0] for arr in mat_data["nfo"]["clab"][0][0][0]])
        self.data = 0.1 * np.array(mat_data["cnt"], dtype=np.float64)  # shape: (samples, channels)
        self.n_samples, self.n_channels = self.data.shape

    def get_channel_data(self, channel):
        """
        Returns the data for a specific channel (by name or index).
        """
        if isinstance(channel, str):
            idx = np.where(self.ch_names == channel)[0]
            if len(idx) == 0:
                raise ValueError(f"Channel '{channel}' not found.")
            idx = idx[0]
        else:
            idx = int(channel)
        return self.data[:, idx]

    def get_data(self, channels=None, tmin=None, tmax=None):
        """
        Returns data for selected channels and time range.
        channels: list of channel names or indices (default: all)
        tmin, tmax: time in seconds (default: full range)
        """
        # Channel selection
        if channels is None:
            ch_idx = np.arange(self.n_channels)
        else:
            ch_idx = []
            for ch in channels:
                if isinstance(ch, str):
                    idx = np.where(self.ch_names == ch)[0]
                    if len(idx) == 0:
                        raise ValueError(f"Channel '{ch}' not found.")
                    ch_idx.append(idx[0])
                else:
                    ch_idx.append(int(ch))
            ch_idx = np.array(ch_idx)
        
        # Time selection
        start = 0 if tmin is None else int(self.fs * tmin)
        end = self.n_samples if tmax is None else int(self.fs * tmax)

        # Clamp to valid range
        start = max(0, start)
        end = min(self.n_samples, end)
        
        t = np.arange(start, end) / self.fs
        X = self.data[start:end, ch_idx]
    
        # Return data [samples, channels], time vector and sampling frequency
        return X, t, self.fs

# Example usage:
loader = BCIIVLoader('./../BCICIV_1calib_1000Hz_mat/BCICIV_calib_ds1a_1000Hz.mat')
X, t, fs = loader.get_data(tmin=10, tmax=12)

X

In [None]:
import numpy as np
from numpy.linalg import svd
from sklearn.linear_model import OrthogonalMatchingPursuit

class KSVD:
    """
    K-SVD dictionary learning algorithm.

    Parameters
    ----------
    n_components : int
        Number of dictionary atoms.
    max_iter : int
        Number of K-SVD iterations.
    sparsity : int
        Target number of non-zero coefficients in sparse coding.
    """
    def __init__(self, n_components=40, max_iter=10, sparsity=5):
        self.n_components = n_components
        self.max_iter = max_iter
        self.sparsity = sparsity
        self.dictionary_ = None

    def _initialize_dictionary(self, X):
        # Initialize dictionary with random normalized columns from data
        n_features, n_samples = X.shape
        indices = np.random.choice(n_samples, self.n_components, replace=False)
        D = X[:, indices].copy()
        D /= np.linalg.norm(D, axis=0, keepdims=True) + 1e-8
        return D

    def _sparse_coding(self, X, D):
        # Compute sparse codes using OMP
        omp = OrthogonalMatchingPursuit(n_nonzero_coefs=self.sparsity)
        Gamma = np.zeros((self.n_components, X.shape[1]))
        for i in range(X.shape[1]):
            omp.fit(D, X[:, i])
            Gamma[:, i] = omp.coef_
        return Gamma

    def _dictionary_update(self, X, D, Gamma):
        # Update each atom
        for k in range(self.n_components):
            # Find signals that use atom k
            omega = np.nonzero(Gamma[k, :])[0]
            if len(omega) == 0:
                continue
            # Compute the residual without atom k
            E = X[:, omega] - D @ Gamma[:, omega] + np.outer(D[:, k], Gamma[k, omega])
            # SVD on the residual
            U, S, Vt = svd(E, full_matrices=False)
            # Update atom k and coefficients
            D[:, k] = U[:, 0]
            Gamma[k, omega] = S[0] * Vt[0, :]
        return D, Gamma

    def fit(self, X):
        """
        Learn dictionary from data.

        Parameters
        ----------
        X : array-like, shape (n_features, n_samples)
            Training data matrix.

        Returns
        -------
        self
        """
        X = np.array(X, dtype=float)
        # Initialize
        self.dictionary_ = self._initialize_dictionary(X)

        for it in range(self.max_iter):
            # Sparse coding step
            Gamma = self._sparse_coding(X, self.dictionary_)
            # Dictionary update step
            self.dictionary_, Gamma = self._dictionary_update(X, self.dictionary_, Gamma)
            # (Optional) compute reconstruction error
            error = np.linalg.norm(X - self.dictionary_ @ Gamma)**2
            print(f"Iteration {it+1}/{self.max_iter}, reconstruction error: {error:.4f}")

        self.code_ = Gamma
        return self

    def transform(self, X):
        """
        Encode data using the learned dictionary.
        """
        return self._sparse_coding(np.array(X, dtype=float), self.dictionary_)

    def reconstruct(self):
        """
        Reconstruct signals from codes and dictionary.
        """
        return self.dictionary_ @ self.code_


# Generate synthetic data: signals composed of random combinations of sinusoids
t = np.linspace(0, 1, 256)
S = np.vstack([np.sin(2*np.pi*f*t) for f in [5, 15, 30]])
X = (np.random.randn(3, 100).T @ S).T  # shape (256,100)

ksvd = KSVD(max_iter=15, sparsity=3)
ksvd.fit(X)
codes = ksvd.transform(X)
X_hat = ksvd.reconstruct()
print("Reconstruction done.")


In [None]:
import numpy as np
from sklearn.decomposition import MiniBatchDictionaryLearning
from sklearn.linear_model import Lasso # For reconstruction (alternative to SPGL1 BPDN)
from scipy.fft import dct, idct # For potential Gabor-like dictionary comparison
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import os
# import mne # Uncomment if you have mne-python installed for real EEG data loading

# --- 1. Configuration Parameters (Based on Paper) ---
# EEG Data Parameters
SAMPLING_FREQUENCY = 256 # Hz [cite: 59]
EPOCH_LENGTH_SECONDS = 4 # seconds [cite: 69]
N_SAMPLES_PER_EPOCH = EPOCH_LENGTH_SECONDS * SAMPLING_FREQUENCY # 1024 samples

# K-SVD Parameters
TRAINING_SET_SIZE = 8000 # EEG segments [cite: 64]
TEST_SET_SIZE = 3200 # EEG segments [cite: 64calculate_nmse]
DICTIONARY_SIZE = 3000 # Number of atoms [cite: 81]
MAX_COEFFICIENTS_T0 = 60 # Number of non-zero coefficients per atom [cite: 86]
K_SVD_MAX_ITERATIONS = 80 # [cite: 72]

# Compressed Sensing Parameters
COMPRESSION_RATIOS = [2, 4, 8] # [cite: 72]
# Reconstruction algorithm: Paper uses SPGL1 BPDN. We'll use sklearn.linear_model.Lasso as an approximation.
# For exact BPDN, you'd need to integrate SPGL1 or a similar convex optimization solver (e.g., cvxpy).

# --- 2. Helper Functions ---

def generate_eeg_data_placeholder(num_segments, segment_length):
    """
    Generates placeholder EEG-like data for demonstration.
    In a real scenario, this would load and preprocess actual EDF files.
    """
    print(f"Generating {num_segments} placeholder EEG segments of length {segment_length}...")
    # Simulate some oscillatory patterns with noise
    data = np.zeros((num_segments, segment_length))
    for i in range(num_segments):
        t = np.linspace(0, EPOCH_LENGTH_SECONDS, segment_length, endpoint=False)
        # Combine a few sinusoids and noise
        signal = 5 * np.sin(2 * np.pi * 5 * t) + \
                 2 * np.sin(2 * np.pi * 12 * t + np.pi/4) + \
                 1 * np.sin(2 * np.pi * 30 * t) + \
                 np.random.normal(0, 0.5, segment_length)
        data[i, :] = signal
    print("Placeholder data generation complete.")
    return data

def create_random_sensing_matrix(M, N):
    """
    Creates a white-noise Gaussian random matrix (Phi).
    M: compressed signal length
    N: original signal length
    """
    print(f"Creating random sensing matrix Phi of shape ({M}, {N})...")
    # Paper uses white-noise Gaussian random matrix [cite: 72]
    return np.random.randn(M, N)

def calculate_nmse(original_signal, reconstructed_signal):
    """
    Calculates the Normalized Mean Square Error (NMSE) between two signals.
    NMSE(x, x_hat) = ||x - x_hat||_2 / ||x - mean(x)||_2 [cite: 74]
    """
    original_signal = np.asarray(original_signal)
    reconstructed_signal = np.asarray(reconstructed_signal)

    # Ensure signals are 1D arrays for norm calculation
    original_signal_flat = original_signal.flatten()
    reconstructed_signal_flat = reconstructed_signal.flatten()

    # Calculate numerator: ||x - x_hat||_2
    numerator = np.linalg.norm(original_signal_flat - reconstructed_signal_flat)

    # Calculate denominator: ||x - mean(x)||_2
    denominator = np.linalg.norm(original_signal_flat - np.mean(original_signal_flat))

    if denominator == 0:
        return float('inf') # Avoid division by zero if original signal is constant
    return numerator / denominator # [cite: 74]

# --- 3. Main K-SVD Dictionary Learning Process ---

def learn_ksvd_dictionary(training_data, n_atoms, max_coeffs, n_iterations):
    """
    Learns a K-SVD dictionary using MiniBatchDictionaryLearning from sklearn.
    """
    print(f"\nStarting K-SVD dictionary learning with {n_atoms} atoms and {max_coeffs} max coefficients...")
    print(f"Training data shape: {training_data.shape}")

    # MiniBatchDictionaryLearning is an efficient variant of K-SVD
    # n_components: number of atoms in the dictionary
    # transform_n_nonzero_coeffs: T0, max number of non-zero coefficients [cite: 46]
    # n_iter: max number of iterations [cite: 72]
    # random_state for reproducibility
    # batch_size: can be tuned for performance, smaller batches can lead to faster convergence
    # alpha: L1 regularization strength for sparse coding (implicitly in OMP, but sklearn uses it)
    ksvd_learner = MiniBatchDictionaryLearning(
        n_components=n_atoms,
        transform_n_nonzero_coefs=max_coeffs,
        # n_iter=n_iterations,
        random_state=42, # For reproducibility
        batch_size=256, # A common batch size, can be adjusted
        alpha=1.0 # Default alpha for sparse coding (L1 regularization)
    )

    # Fit the model to the training data to learn the dictionary
    ksvd_learner.fit(training_data)
    learned_dictionary = ksvd_learner.components_
    print(f"K-SVD dictionary learning complete. Dictionary shape: {learned_dictionary.shape}")
    return learned_dictionary

# --- 4. Compressed Sensing and Reconstruction ---

def perform_compressed_sensing_and_reconstruction(
    test_data,
    learned_dictionary,
    compression_ratio,
    reconstruction_method='lasso' # 'lasso' for sklearn.Lasso
):
    """
    Performs compressed sensing and reconstruction on test data using the learned dictionary.
    """
    N = test_data.shape[1] # Original signal length [cite: 74]
    M = int(N / compression_ratio) # Compressed signal length [cite: 74]

    if M == 0:
        raise ValueError(f"Compression ratio {compression_ratio} is too high for N={N}. M cannot be zero.")

    # Generate sensing matrix Phi [cite: 72]
    Phi = create_random_sensing_matrix(M, N)

    reconstructed_signals = []
    nmse_values = []
    computation_times = []

    print(f"\nStarting CS reconstruction for CR={compression_ratio}:1 (M={M})...")
    for i, original_signal in enumerate(test_data):
        if i % (TEST_SET_SIZE // 10) == 0 and i > 0:
            print(f"  Processing test signal {i}/{TEST_SET_SIZE}...")

        # Step 1: Compress the signal y = Phi * x [cite: 25, 26]
        y_compressed = np.dot(Phi, original_signal)

        # Step 2: Reconstruction (solving y = Phi * D * theta for theta)
        # This is the core of the BPDN problem.
        # Paper uses SPGL1. Here, we approximate with Lasso (L1 regularization).
        # We need to solve: min ||theta||_1 subject to ||y - (Phi @ D) @ theta||_2 <= epsilon
        effective_sensing_matrix_sparse_domain = np.dot(Phi, learned_dictionary.T) # (M, P)

        # Use Lasso for sparse coefficient recovery
        # The 'alpha' parameter in Lasso controls the strength of L1 regularization.
        # A smaller alpha will favor less sparsity, larger alpha more sparsity.
        # This needs careful tuning to approximate BPDN behavior.
        start_time = plt.rcParams['figure.subplot.right'] if 'figure.subplot.right' in plt.rcParams else 0 # Just a placeholder for time measurement start
        reconstruction_lasso = Lasso(alpha=0.001, # Needs tuning! Corresponds to epsilon/noise level in BPDN
                                     fit_intercept=False,
                                     max_iter=10000, # Increase if convergence issues
                                     tol=1e-4) # Tolerance for convergence
        try:
            reconstruction_lasso.fit(effective_sensing_matrix_sparse_domain, y_compressed)
            sparse_coefficients_reconstructed = reconstruction_lasso.coef_
        except Exception as e:
            print(f"Lasso reconstruction failed for signal {i}: {e}. Skipping this signal.")
            continue
        end_time = plt.rcParams['figure.subplot.left'] if 'figure.subplot.left' in plt.rcParams else 0 # Just a placeholder for time measurement end
        computation_times.append(end_time - start_time) # Placeholder for actual time measurement

        # Step 3: Reconstruct the original signal: x_hat = D * theta_hat [cite: 22]
        x_reconstructed = np.dot(learned_dictionary.T, sparse_coefficients_reconstructed) # Transpose dictionary if atoms are rows

        reconstructed_signals.append(x_reconstructed)
        nmse_values.append(calculate_nmse(original_signal, x_reconstructed))

    mean_nmse = np.mean(nmse_values) if nmse_values else float('nan')
    mean_comp_time = np.mean(computation_times) if computation_times else float('nan')
    print(f"  CS Reconstruction complete for CR={compression_ratio}:1. Mean NMSE: {mean_nmse:.4f}")
    print(f"  Mean Computation Time (Approx): {mean_comp_time:.4f} seconds.")
    return mean_nmse, mean_comp_time

# --- 5. Gabor Dictionary (for comparison, as mentioned in paper) ---

def create_gabor_dictionary(N, num_freqs, num_shifts):
    """
    Creates a Gabor dictionary for a given signal length N.
    Simplified Gabor atom generation. A more rigorous implementation would use
    specific Gabor wavelets parameters (center frequency, bandwidth, time shift).
    """
    print(f"Creating Gabor dictionary with N={N}...")
    gabor_atoms = []
    t = np.linspace(0, 1, N, endpoint=False) # Time vector (normalized)

    for freq_idx in range(num_freqs):
        freq = 1 + freq_idx * (SAMPLING_FREQUENCY / 2 / num_freqs) # Example: covering up to Nyquist
        for shift_idx in range(num_shifts):
            shift = shift_idx * (N / num_shifts)
            # Simple Gabor atom: sine wave modulated by Gaussian
            # A more accurate Gabor uses complex exponentials and includes bandwidth/sigma
            gaussian_window = np.exp(-0.5 * ((t * N - shift) / (N / 10))**2) # Example Gaussian spread
            gabor_atom = np.sin(2 * np.pi * freq * t) * gaussian_window
            gabor_atom /= np.linalg.norm(gabor_atom) # Normalize
            gabor_atoms.append(gabor_atom)

    gabor_dict = np.array(gabor_atoms)
    print(f"Gabor dictionary created. Shape: {gabor_dict.shape}")
    return gabor_dict

# --- 6. Main Execution ---

if __name__ == "__main__":
    np.random.seed(42) # Set random seed for reproducibility

    # --- Data Generation/Loading ---
    # In a real scenario, you would load your EEG data here:
    # Example using MNE-Python (requires installation and EDF files)
    # raw = mne.io.read_raw_edf('path/to/chb01_01.edf', preload=True)
    # raw.resample(sfreq=SAMPLING_FREQUENCY)
    # full_eeg_data = raw.get_data() # (n_channels, n_samples)
    #
    # # Assuming a single channel or flattening multiple channels for simplicity
    # # For multi-channel EEG, you might learn a dictionary for each channel or a joint dictionary.
    # single_channel_data = full_eeg_data[0, :]
    #
    # # Segment the data
    # all_segments = []
    # for i in range(0, single_channel_data.shape[0] - N_SAMPLES_PER_EPOCH + 1, N_SAMPLES_PER_EPOCH):
    #     all_segments.append(single_channel_data[i:i+N_SAMPLES_PER_EPOCH])
    # all_segments = np.array(all_segments)
    #
    # # Split into training and testing (ensure no overlap as per paper [cite: 63])
    # np.random.shuffle(all_segments) # Shuffle to ensure random selection if not pre-shuffled
    # training_data_all_channels = all_segments[:TRAINING_SET_SIZE]
    # test_data_all_channels = all_segments[TRAINING_SET_SIZE : TRAINING_SET_SIZE + TEST_SET_SIZE]

    # For demonstration, generate placeholder data:
    training_data = generate_eeg_data_placeholder(TRAINING_SET_SIZE, N_SAMPLES_PER_EPOCH)
    test_data = generate_eeg_data_placeholder(TEST_SET_SIZE, N_SAMPLES_PER_EPOCH)

    print(training_data.shape)

    # Normalize data (important for many dictionary learning algorithms)
    scaler = StandardScaler()
    training_data_scaled = scaler.fit_transform(training_data)
    test_data_scaled = scaler.transform(test_data) # Use same scaler as training

    # --- K-SVD Dictionary Learning ---
    learned_ksvd_dictionary = learn_ksvd_dictionary(
        training_data_scaled,
        100, # DICTIONARY_SIZE, # 3000 atoms [cite: 81]
        MAX_COEFFICIENTS_T0, # 60 coeffs [cite: 86]
        K_SVD_MAX_ITERATIONS # 80 iterations [cite: 72]
    )

    # # --- Evaluate K-SVD Performance ---
    # print("\n--- Evaluating K-SVD Dictionary Performance ---")
    # ksvd_nmse_results = []
    # ksvd_comp_time_results = []
    # for cr in COMPRESSION_RATIOS:
    #     nmse, comp_time = perform_compressed_sensing_and_reconstruction(
    #         test_data_scaled,
    #         learned_ksvd_dictionary,
    #         cr,
    #         reconstruction_method='lasso'
    #     )
    #     ksvd_nmse_results.append(nmse)
    #     ksvd_comp_time_results.append(comp_time)

    # print("\nK-SVD NMSE Results per CR:", ksvd_nmse_results)
    # print("K-SVD Comp Time Results per CR:", ksvd_comp_time_results)

    # # --- Gabor Dictionary Comparison (as mentioned in paper) ---
    # # The paper uses a Gabor dictionary of 15938 atoms [cite: 85]
    # # Creating a Gabor dictionary of this size requires careful parameterization.
    # # Here's a conceptual example.
    # print("\n--- Evaluating Gabor Dictionary Performance (for comparison) ---")
    # # For a real Gabor, you need specific time/frequency resolutions.
    # # N=1024, if 15938 atoms is the goal, need to set num_freqs and num_shifts accordingly.
    # # Example: 128 freqs * 128 shifts ~ 16000 atoms
    # # A more precise Gabor implementation would be needed for direct comparison.
    # try:
    #     gabor_dictionary = create_gabor_dictionary(
    #         N_SAMPLES_PER_EPOCH,
    #         num_freqs=128, # Example: for ~16000 atoms
    #         num_shifts=128  # Example: for ~16000 atoms
    #     )

    #     gabor_nmse_results = []
    #     gabor_comp_time_results = []
    #     for cr in COMPRESSION_RATIOS:
    #         nmse, comp_time = perform_compressed_sensing_and_reconstruction(
    #             test_data_scaled,
    #             gabor_dictionary,
    #             cr,
    #             reconstruction_method='lasso'
    #         )
    #         gabor_nmse_results.append(nmse)
    #         gabor_comp_time_results.append(comp_time)

    #     print("\nGabor NMSE Results per CR:", gabor_nmse_results)
    #     print("Gabor Comp Time Results per CR:", gabor_comp_time_results)

    #     # --- Plotting Results (Similar to Fig. 3 & 4 in paper) ---
    #     plt.figure(figsize=(12, 5))

    #     # NMSE Plot
    #     plt.subplot(1, 2, 1)
    #     plt.plot([f"{cr}:1" for cr in COMPRESSION_RATIOS], ksvd_nmse_results, '-*', label='K-SVD dic', color='red')
    #     plt.plot([f"{cr}:1" for cr in COMPRESSION_RATIOS], gabor_nmse_results, '-o', label='Gabor dic', color='blue')
    #     plt.xlabel('Compression Ratio')
    #     plt.ylabel('NMSE')
    #     plt.title('NMSE K-SVD vs. Gabor Dictionary')
    #     plt.legend()
    #     plt.grid(True)

    #     # Computation Cost Plot
    #     plt.subplot(1, 2, 2)
    #     plt.plot([f"{cr}:1" for cr in COMPRESSION_RATIOS], ksvd_comp_time_results, '-*', label='K-SVD dic', color='red')
    #     plt.plot([f"{cr}:1" for cr in COMPRESSION_RATIOS], gabor_comp_time_results, '-o', label='Gabor dic', color='blue')
    #     plt.xlabel('Compression Ratio')
    #     plt.ylabel('Seconds')
    #     plt.title('Computation Cost K-SVD vs. Gabor Dictionary')
    #     plt.legend()
    #     plt.grid(True)

    #     plt.tight_layout()
    #     plt.show()

    # except Exception as e:
    #     print(f"\nSkipping Gabor dictionary comparison due to error: {e}")
    #     print("Ensure 'num_freqs' and 'num_shifts' are set appropriately for a Gabor dictionary of comparable size.")
    #     print("The provided Gabor dictionary generation is a simplified conceptual example.")

In [45]:
import numpy as np
from sklearn.decomposition import DictionaryLearning
from sklearn.linear_model import OrthogonalMatchingPursuit

# ------------------------------------------------------------------------------
# 1. Load or prepare your EEG data:
#    X_train: array of shape (n_train_segments, segment_length)
#    X_test : array of shape (n_test_segments,  segment_length)
#    e.g. from CHB-MIT: segment_length = 4 s × 256 Hz = 1024 samples
# ------------------------------------------------------------------------------
# For demo, let’s make synthetic data:
n_train, n_test = 100, 3200
segment_length = 4 * 256  # 4 s @256 Hz
# e.g. X_train = load_eeg_segments(...)
# Here random:
X_train = np.random.randn(n_train, segment_length)
X_test  = np.random.randn(n_test,  segment_length)

# ------------------------------------------------------------------------------
# 2. Learn a dictionary with scikit-learn
# ------------------------------------------------------------------------------
n_atoms    = 100      # number of dictionary atoms
sparsity   = 30        # target nonzeros per segment
max_iter   = 20        # K-SVD/online iterations

dl = DictionaryLearning(
    n_components=n_atoms,
    transform_n_nonzero_coefs=sparsity,
    fit_algorithm='lars',      # MOD-style update; very similar to K-SVD in practice
    transform_algorithm='omp', # Orthogonal Matching Pursuit for sparse coding
    max_iter=max_iter,
    verbose=True,
    random_state=0
)
# Note: sklearn expects X of shape (n_samples, n_features)
# so here n_samples = n_train, n_features = segment_length
D_components = dl.fit(X_train).components_   # shape (n_atoms, segment_length)
D = D_components.T                          # shape (segment_length, n_atoms)

# ------------------------------------------------------------------------------
# 3. Build a random sensing matrix Φ and effective dictionary A = Φ·D
# ------------------------------------------------------------------------------
CR = 4  # compression ratio N/M
N  = segment_length
M  = N // CR
Φ  = np.random.randn(M, N) / np.sqrt(M)      # normalized Gaussian

A = Φ @ D  # shape (M, n_atoms)

# ------------------------------------------------------------------------------
# 4. Reconstruct test segments via OMP on (A, y)
# ------------------------------------------------------------------------------
omp = OrthogonalMatchingPursuit(n_nonzero_coefs=sparsity)
reconstruction_errors = []

X_recon = np.zeros_like(X_test)
for i, x_true in enumerate(X_test):
    y = Φ @ x_true                           # compressed measurements
    omp.fit(A, y)
    α = omp.coef_                            # sparse code (length n_atoms)
    x_hat = D @ α                            # reconstruct in signal space
    X_recon[i] = x_hat
    reconstruction_errors.append(np.linalg.norm(x_true - x_hat)**2)

mean_nmse = np.mean(reconstruction_errors) / np.mean(np.linalg.norm(X_test, axis=1)**2)
print(f"Mean NMSE on test set: {mean_nmse:.4f}")


[dict_learning] ....................Mean NMSE on test set: 1.2889


In [73]:
from abc import ABC, abstractmethod
import os
import glob
import pickle
import mne
import numpy as np
from pathlib import Path
import time
from scipy.signal import resample

cwd = Path.cwd()

class Loader(ABC):
    """
    Abstract class for data loaders.
    This class defines the interface for loading datasets.
    """
    _id: int = time.time_ns()  # Unique identifier for the loader instance
    _dataset: str
    _fs: float
    _ch_names: list[str]
    _data: np.ndarray # (n_blocks, n_samples, n_channels)
    _n_samples: int
    _n_channels: int
    # Number of blocks of segments in the dataset
    _n_blocks: int
    _segment_length_sec: float

    def __init__(self,  n_blocks: int, segment_length_sec: float, random_state: int = 42):
        self._n_blocks = n_blocks
        self._segment_length_sec = segment_length_sec

        self._load_data(n_blocks, segment_length_sec, random_state)

    
    @abstractmethod
    def _load_data(self, n_blocks: int, segment_length_sec: float, random_state: int):
        ...

    def get_random_segments(self, n_segments: int, random_state: int = 42) -> np.ndarray:
        """
        Returns a list of individual segments randomly selected from the dataset.
        """
        rng = np.random.default_rng(random_state)

        total_n_segments = self.n_blocks * self.n_channels

        if n_segments > total_n_segments:
            raise ValueError(f"Requested {n_segments} segments, but only {total_n_segments} available.")

        # Generate random indices for blocks and channels
        indices = rng.choice(total_n_segments, size=n_segments, replace=False)

        # Map indices to block and channel
        segments = []
        for idx in indices:
            block_idx = idx // self.n_channels
            channel_idx = idx % self.n_channels
            segments.append(self._data[block_idx, :, channel_idx])

        return np.array(segments)

    def resample(self, new_fs: float):
        """
        Resample the data to a new sampling frequency.
        Assumes self._data has shape (n_blocks, n_samples, n_channels).
        """
        if self._fs == new_fs:
            return
        if new_fs <= 0:
            raise ValueError("New sampling frequency must be positive.")

        new_n_samples = int(np.round(self.n_samples * new_fs / self._fs))
        # Resample each block independently along the samples axis
        self._data = resample(self._data, new_n_samples, axis=1)
        self._n_samples = new_n_samples
        self._fs = new_fs

    @classmethod
    def load(cls, file_path: str):
        """
        Load data from a file.
        This method should be implemented by subclasses.
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File {file_path} does not exist.")

        with open(file_path, "rb") as f:
            obj = pickle.load(f)
            if not isinstance(obj, cls):
                raise TypeError(f"Expected object of type {cls.__name__}, got {type(obj).__name__}.")
            
            return obj

    def save(self, folder_path: str):
        file_path = os.path.join(folder_path, f"{self.id}_{self.dataset}_fs_{self.fs}_blocks_{self.n_blocks}.pkl")
        with open(file_path, "wb") as f:
            pickle.dump(self, f)

    @property
    def id(self) -> int:
        return self._id

    @property
    def dataset(self) -> str:
        return self._dataset

    @property
    def fs(self) -> float:
        return self._fs

    @property
    def ch_names(self) -> list[str]:
        return self._ch_names

    @property
    def data(self) -> np.ndarray:
        return self._data
    
    @property
    def n_samples(self) -> int:
        return self._n_samples
    
    @property
    def n_channels(self) -> int:
        return self._n_channels
    
    @property
    def n_blocks(self) -> int:
        return self._n_blocks
    
    @property
    def segment_length_sec(self) -> float:
        return self._segment_length_sec


class CHBMITLoader(Loader):
    """
    Loader for CHB-MIT Scalp EEG Database.
    """

    _dataset = "chbmit"
    _fs = 256.0  # Default sampling frequency for CHB-MIT
    _ch_names = [
        'FP1-F7', 'F7-T7', 'T7-P7', 'P7-O1',
        'FP1-F3', 'F3-C3', 'C3-P3', 'P3-O1',
        'FP2-F4', 'F4-C4', 'C4-P4', 'P4-O2',
        'FP2-F8', 'F8-T8', 'T8-P8', 'P8-O2',
        'FZ-CZ', 'CZ-PZ', 'P7-T7', 'T7-FT9',
        'FT9-FT10', 'FT10-T8'
    ]
    _n_channels = len(_ch_names)


    def _load_data(self, n_blocks: int, segment_length_sec: float, random_state: int):
        """
        Load EEG data from the CHB-MIT dataset.

        Parameters
        ----------
        n_blocks : int
            Number of blocks to load.
        segment_length_sec : float
            Length of each segment in seconds.
        random_state : int
            Random seed for reproducibility.
        """
        # Set random seed
        rng = np.random.default_rng(random_state)

        # Define the root directory for CHB-MIT dataset
        root_dir = f"{cwd}/../../files/CHBMIT"

        # Get all EDF files from all subject folders
        edf_files = sorted(glob.glob(os.path.join(root_dir, "chb*/", "*.edf")))
        if not edf_files:
            raise FileNotFoundError(f"No EDF files found in {root_dir}.")

        # Shuffle the list of EDF files
        rng.shuffle(edf_files)

        # Initialize data storage
        blocks = []
        n_samples_per_segment = int(segment_length_sec * self._fs)

        for edf_file in edf_files:
            edf_file_name = os.path.basename(edf_file)
            # Read the EDF file using MNE
            raw = mne.io.read_raw_edf(edf_file, include=self.ch_names, verbose=False, )
            # Exclude last channel (duplicated T8-P8) 
            data = raw.get_data().T[:, :-1] # (n_samples, n_channels)
            if data.shape[1] != self.n_channels:
                continue  # Skip files with unexpected channel count

            max_start_idx = data.shape[0] - n_samples_per_segment
            if max_start_idx <= 0:
                continue

            possible_starts = np.arange(0, max_start_idx + 1, n_samples_per_segment)
            initial_idx = rng.choice(possible_starts.size)
            for start in possible_starts[initial_idx:]:
                end = start + n_samples_per_segment
                block = data[start:end, :]
                blocks.append(block)
    
                if len(blocks) >= n_blocks:
                    break

            if len(blocks) >= n_blocks:
                break
        
        if len(blocks) < n_blocks:
            # TODO: Handle case where not enough blocks are loaded. Save the initial_idx of each edf_file and if there is not enough blocks, shuffle the list of edf_files again, iterate over each of them and get new initial_idx before the one used earlier.
            raise ValueError(f"Requested {n_blocks} blocks, but only {len(blocks)} were loaded.")

        self._data = np.array(blocks)  # Shape (n_blocks, n_samples_per_segment, n_channels)
        self._n_samples = n_samples_per_segment

# chbmitLoader = CHBMITLoader(n_blocks=11200, segment_length_sec=4.0)
chbmitLoader = CHBMITLoader.load(f"{cwd}/../../files/processed/1748879687102909305_chbmit_fs_128_blocks_11200.pkl")
# chbmitLoader.resample(new_fs=128) 
chbmitLoader.data
chbmitLoader.get_random_segments(n_segments=10)
# chbmitLoader.save(folder_path=f"{cwd}/../../files/processed")

array([[-3.34115605e-05, -1.31800852e-05, -2.29188701e-05, ...,
        -1.25283447e-04, -1.12009883e-04, -1.15086609e-04],
       [-3.45223495e-05, -5.56788801e-05, -4.49851122e-05, ...,
         1.52313879e-05,  1.05455216e-05,  2.25462109e-05],
       [-2.23602284e-05, -3.42847510e-05, -2.72598235e-05, ...,
        -2.04800907e-05, -2.32271284e-05, -2.01397373e-05],
       ...,
       [ 4.25726923e-06,  1.07125274e-05,  2.51657876e-05, ...,
         3.76446703e-05,  1.91701990e-05, -1.85331468e-05],
       [ 1.82054427e-05,  6.53294523e-06,  1.46664068e-05, ...,
         2.53890719e-05,  3.81875524e-05,  4.82226537e-05],
       [ 4.96500858e-05,  4.62652006e-05,  3.56947840e-05, ...,
        -1.31065050e-05, -5.82600441e-06, -2.16787790e-05]],
      shape=(10, 512))

In [74]:
np.arange(0, 5)

array([0, 1, 2, 3, 4])