# Constrained Orthogonal Matching Pursuit for Audio Declipping

In [168]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

## Signal Clipping

In [169]:
def clip(s, theta_clip):
    """ 
    Clips a signal s to the range [-theta_clip, theta_clip]
    """
    return np.clip(s, -theta_clip, theta_clip)

In [170]:
def get_theta_clip(y):
    """ 
    Returns the clipping threshold for a clipped signal y
    """
    return np.max(np.abs(y))

In [171]:
def get_M_r(y, theta_clip : float = None):
    """
    Returns the measurement matrix M_r for a clipped signal y
    """
    N = y.shape[0]
    M_r = np.eye(N)
    theta_clip = theta_clip if theta_clip is not None else get_theta_clip(y) 
    I_r = np.abs(y) < theta_clip           
    M_r = M_r[I_r, :]
    return M_r

In [172]:
def get_M_m_plus(y, theta_clip : float = None):
    """
    Returns the matrix M_m_plus (which gives the indices at which the signal is clipped from above) for a clipped signal y
    """
    N = y.shape[0]
    M_m_plus = np.eye(N)
    theta_clip = theta_clip if theta_clip is not None else get_theta_clip(y) 
    I_m_plus = y >= theta_clip           
    M_m_plus = M_m_plus[I_m_plus, :]
    return M_m_plus

In [174]:
def get_M_m_minus(y, theta_clip : float = None):
    """
    Returns the matrix M_m_minus (which gives the indices at which the signal is clipped from below) for a clipped signal y
    """
    N = y.shape[0]
    M_m_minus = np.eye(N)
    theta_clip = theta_clip if theta_clip is not None else get_theta_clip(y) 
    I_m_minus = y <= -theta_clip           
    M_m_minus = M_m_minus[I_m_minus, :]
    return M_m_minus

## Gabor Dictionary

In [160]:
params = {
    "sampling_rate": 16000,
    "frame_length": 1024,
    "frame_overlap": 768
}

# Number of atoms per dictionary
K_g = params["frame_length"]//2
# Length of the signal
N = params["frame_length"]

In [161]:
# Create the time-frequency grid
T = np.arange(0, N)
J = np.arange(0, K_g)
J, T = np.meshgrid(J, T)

# Dictionaries of shape (N, K_g), 2* K_g atoms in total
gabor_cosine = np.cos(np.pi * (J+1/2) * (T+1/2) / K_g)
gabor_sine = np.sin(np.pi * (J+1/2) * (T+1/2) / K_g)

In [162]:
gabor_cosine.shape, gabor_sine.shape

((1024, 512), (1024, 512))

## Orthogonal Matching Pursuit

In [None]:
def least_squares(y, D_c, D_s, theta_clip = None, theta_max = None, lambda_reg : float = 0):
    """
    Solves the following least squares problem
        min_{x_c, x_s} ||y - D_c * x_c - D_s * x_s||^2 + lambda * ||x_c||^2 + lambda * ||x_s||^2     s.t.     ...
    """
    D = np.concatenate((D_c, D_s), axis=1)
    x = np.linalg.lstsq(D, y, rcond=None)[0]
    x_c = x[:D_c.shape[1]]
    x_s = x[D_c.shape[1]:]

    ## TODO: Implement optimization in the case with linear constraints

    return x_c, x_s

In [None]:
def OMP(y, M_r, K, eps, D_c = gabor_cosine, D_s = gabor_sine, theta_clip = None, theta_max = None):
    """ 
    Runs the Orthogonal Matching Pursuit algorithm, using Gabor Dictionaries.

    Inputs:
    --------
    y: np.array
        Input signal of size N
    M_r: np.array
        Measurement matrix of size (N_r, N)
    D_c: np.array
        Dictionary for the cosine atoms of size (N, K_g)
    D_s: np.array
        Dictionary for the sine atoms of size (N, K_g)
    K: int
        Maximal number of atoms to select
    eps: float
        Stopping criterion
    theta_clip: float
        (Optional) Clipping value of the signal, used as an additional constraint in the least squares problem. If None, no clipping constraint is applied.
    theta_max: float
        (Optional) Maximum value of the signal, used as an additional constraint in the least squares problem. If None, no maximum constraint is applied.

    Outputs:
    --------
    y_reconstructed: np.array_str
        Reconstruciton of the original signal y
    x_c: np.array
        Sparse activations of the cosine atoms 
    x_s: np.array
        Sparse activations of the sine atoms
    residual_norms : list
        List of the squared norms of the residuals at each iteration
    """

    K_g = D_c.shape[1]

    # Reliable samples of the signal
    y_r = M_r @ y                                                          # Of shape (N_r)

    # Dictionaries
    W_c = np.diag(1 / np.linalg.norm(M_r @ D_c, axis=0))        # W_j,j = 1/||M_r * d_j||, j = 0, ..., K_g-1, of shape (K_g, K_g)
    W_s = np.diag(1 / np.linalg.norm(M_r @ D_s, axis=0))
    d_c_norm = M_r @ D_c @ W_c                                             # Of shape (N_r, K_g)
    d_s_norm = M_r @ D_s @ W_s

    # TODO: Cette ligne est teubé, faut la changer
    d_cs_dot = np.diag(np.dot(d_c_norm.T, d_s_norm), k = 0)                # Array containing <d_norm_j^c|d_norm_j^s>, j = 0, ..., K_g-1, of shape (K_g)

    # Residual and support
    r = y_r
    Omega = []
    residual_norms = [np.linalg.norm(y_r)**2]


    for k in tqdm(range(K)):

        # Atom selection
        x_c = (np.dot(r, d_c_norm) - d_cs_dot * np.dot(r, d_s_norm)) / (1 - d_cs_dot**2)
        x_s = (np.dot(r, d_s_norm) - d_cs_dot * np.dot(r, d_c_norm)) / (1 - d_cs_dot**2)
        proj = np.zeros(K_g)
        for j in range(K_g):
            proj[j] = np.linalg.norm(r - x_c[j] * d_c_norm[:,j] - x_s[j] * d_s_norm[:,j])**2
        i = np.argmin(np.abs(proj))

        # Update support and residual
        Omega.append(i)
        x_c2, x_s2 = least_squares(y_r, d_c_norm[:,Omega], d_s_norm[:,Omega], theta_clip, theta_max)
        x_c, x_s = np.zeros(K_g), np.zeros(K_g)
        x_c[Omega] = x_c2
        x_s[Omega] = x_s2

        r = y_r - np.dot(d_c_norm[:,Omega], x_c2) - np.dot(d_s_norm[:,Omega], x_s2)
        r_norm = np.linalg.norm(r)**2
        residual_norms.append(r_norm)

        # Stopping criterion
        if r_norm < eps:
            break
    
    # Output
    x_c = W_c @ x_c
    x_s = W_s @ x_s
    y_reconstructed = D_c @ x_c + D_s @ x_s
    return y_reconstructed, x_c, x_s, residual_norms

In [164]:
## TODO : Function to do OMP on multiple frames, and then overlap them
def inpainting(y, frame_length : int = 1024, frame_overlap : int = 768, K : int = 50, eps : float = 1e-6, D_c = gabor_cosine, D_s = gabor_sine, theta_clip = None, theta_max = None):
    pass

## Data

### Dataset

In [157]:
# Synthetic data generation
def generate_synthetic_dataset(M, N, K, theta_clip : float = .8, D_c = gabor_cosine, D_s = gabor_sine, sigma : float = 0.1):
    """
    Generates M waveforms of length N. Each waveform is a sum of K Gabor atoms and some noise. Both the original signal and the signal clipped at theta_clip are returned, along with the grounD-truth vector x.
    """
    K_g = D_c.shape[1]

    X = np.zeros((M, 2*K_g))
    Y = np.zeros((M, N))
    Y_clipped = np.zeros((M, N))

    for i in range(M):
        y = np.zeros(N)
        for k in range(K):
            j = np.random.randint(0, K_g)
            a = .2 * np.random.randn()
            b = .2 * np.random.randn()
            y += a * D_c[:,j] + b * D_s[:,j]
            X[i, j] += a
            X[i, K_g+j] += b
        Y[i] = y + sigma * np.random.randn(N)
        Y_clipped[i] = clip(Y[i], theta_clip)
    
    return X, Y, Y_clipped


In [175]:
## TODO: find some real data, and add a function to load some real data

### Exploratory Data Analysis

## Experiments