## Projet Audio Signal Analysis, indexing and transformation

Lambert Fatoux  
lambert.fatoux@eleves.enpc.fr

**Determined Blind Source Separation Unifying
Independent Vector Analysis and Nonnegative
Matrix Factorization**

In [None]:
import os, sys, wave, struct
import numpy as np
import matplotlib.pyplot as plt
import IPython.display as ipd
from IPython.core.display import display
import scipy.io.wavfile as wavfile
import soundfile as sf
import scipy.signal as ss
from tqdm import tqdm

from math import ceil

In [None]:
# useful functions
def load_sound(file):
    return wave.open(file, 'rb')

def load_file(filename):
    """
    Utils function used to load an audio file and get useful constants and the Fourier Transform
    """
    wavefile = load_sound(filename)
    Fs = int(wavefile.getframerate())
    num_samples = int(wavefile.getnframes())
    data = wavefile.readframes(num_samples)
    data = struct.unpack('{n}h'.format(n=num_samples), data)
    x = np.array(data)
    
    N=ceil(0.7*Fs)       # Window size of analysed signal (only one window of signal is analysed)
    dF_min=Fs/N   # Minimal frequency resolution
    w=np.hamming(N)  # Window
    width = 4*dF_min # largeur du pic spectral (en Hz) 4*dF_min
    eps=float(1e-20)   #precision
    timestep = 1/float(Fs)
    times = np.arange(len(x))*timestep
    
    return x,Fs, times

def plot_and_hear(audio, sr):
    display(ipd.Audio(audio, rate=sr))
    if audio.shape[0] != 2:
      plt.plot(audio)
      
def plot_sound(data, rate, title=None):
    if data.ndim == 1:
        data = data[:,np.newaxis]
    times = np.arange(data.shape[0])/rate
    plt.figure(figsize=(30,6))
    for channel in data.T:
        plt.fill_between(times, channel)
    plt.xlabel('Time (s)')
    plt.ylabel('Amplitude')
    plt.xlim(times[0], times[-1])
    if title:
        plt.title(title)
    plt.show()
    
def play_sound(data, rate):
    return ipd.Audio(data.T, rate=rate)

In [None]:
fname = 'mix.wav'

rate, x = wavfile.read(fname)
print('T = {:d}, M = {:d}'.format(*x.shape))
plot_sound(x, rate)
play_sound(x, rate)

## 1. ICA or IVA

$$Q_{IVA} (\mathbf{W}_i) = \sum_m\frac{1}{J}\sum_jG(\mathbf{y}_{j,m}) - \sum_i\log |\text{det}\mathbf{W}_i|$$

$G(\mathbf{y}_{j,m})$ is a contast function

## 2. Nonnegative Matix Factorization

$$Q_{NMF} = \sum_{i,j}\left(\frac{d_{i,j}}{\sum_lt_{il}v_{lj}} + \log \sum_lt_{il}v_{lj}\right)$$

## 3. Proposed method



In [None]:
#@title
def projection_back(Y, reference):
    """
    Args:
        Y: (n_sources, n_bins, n_frames)
        reference: (n_bins, n_frames) or (n_channels, n_bins, n_frames)
    Returns:
        scale: (n_sources, n_bins) or (n_channels, n_sources, n_bins)
    """
    # TODO: Use pseudo inverse ?
    n_dims = reference.ndim
    if n_dims == 2:
        # Y: (n_sources, n_bins, n_frames)
        # X: (1, n_bins, n_frames)
        X = reference[np.newaxis,:,:].transpose(1, 0, 2) # (n_bins, n_channels, n_frames)
        Y = Y.transpose(1, 0, 2) # (n_bins, n_sources, n_frames)
        Y_Hermite = Y.transpose(0, 2, 1).conj() # (n_bins, n_frames, n_sources)
        YY_Hermite_inverse = np.linalg.inv(Y @ Y_Hermite) # (n_bins, n_sources, n_sources)
        A = X @ Y_Hermite @ YY_Hermite_inverse # (n_bins, n_channels, n_sources)
        scale = A[:,0,:].transpose(1, 0) # (n_sources, n_bins)
    elif n_dims == 3:
        # Y: (n_sources, n_bins, n_frames)
        # X: (n_channels, n_bins, n_frames)
        X = reference.transpose(1, 0, 2) # (n_bins, n_channels, n_frames)
        Y = Y.transpose(1, 0, 2) # (n_bins, n_sources, n_frames)
        Y_Hermite = Y.transpose(0, 2, 1).conj() # (n_bins, n_frames, n_sources)
        YY_Hermite_inverse = np.linalg.inv(Y @ Y_Hermite) # (n_bins, n_sources, n_sources)
        A = X @ Y_Hermite @ YY_Hermite_inverse # (n_bins, n_channels, n_sources)
        scale = A.transpose(1, 2, 0) # (n_channels, n_sources, n_bins)
    else:
        raise ValueError("reference.ndim is expected 2 or 3, but given {}.".format(n_dims))

    return scale

In [None]:
#@title
EPS=1e-12
THRESHOLD=1e+12

class ILRMAbase:
    """
    Independent Low-rank Matrix Analysis
    """
    def __init__(self, n_basis=10, partitioning=False, normalize=True, callbacks=None, recordable_loss=True, eps=EPS):
        if callbacks is not None:
            if callable(callbacks):
                callbacks = [callbacks]
            self.callbacks = callbacks
        else:
            self.callbacks = None
        self.eps = eps
        
        self.n_basis = n_basis
        self.partitioning = partitioning
        self.normalize = normalize


        self.input = None
        self.recordable_loss = recordable_loss
        if self.recordable_loss:
            self.loss = []
        else:
            self.loss = None
    
    def _reset(self, **kwargs):

        for key in kwargs.keys():
            setattr(self, key, kwargs[key])

        n_basis = self.n_basis
        eps = self.eps

        X = self.input

        n_channels, n_bins, n_frames = X.shape
        n_sources = n_channels # n_channels == n_sources

        self.n_sources, self.n_channels = n_sources, n_channels
        self.n_bins, self.n_frames = n_bins, n_frames

        W = np.eye(n_sources, n_channels, dtype=np.complex128)
        self.demix_filter = np.tile(W, reps=(n_bins, 1, 1))
        
        self.estimation = self.separate(X, demix_filter=W)

        # if self.partitioning
        self.basis = np.random.rand(n_sources, n_bins, n_basis)
        self.activation = np.random.rand(n_sources, n_basis, n_frames)
      
    def separate(self, input, demix_filter):
        """
        Args:
            input (n_channels, n_bins, n_frames): 
            demix_filter (n_bins, n_sources, n_channels): 
        Returns:
            output (n_channels, n_bins, n_frames): 
        """
        input = input.transpose(1, 0, 2)
        estimation = demix_filter @ input
        output = estimation.transpose(1, 0, 2)

        return output
    
    def compute_demix_filter(self, estimation, input):
        X, Y = input, estimation
        X_Hermite = X.transpose(1, 2, 0).conj()
        XX_Hermite = X.transpose(1, 0, 2) @ X_Hermite
        demix_filter = Y.transpose(1, 0, 2) @ X_Hermite @ np.linalg.inv(XX_Hermite)

        return demix_filter

In [None]:
class GaussILRMA(ILRMAbase):
    def __init__(self, n_basis=10, partitioning=False, normalize='power', reference_id=0, callbacks=None, recordable_loss=True, eps=EPS, threshold=THRESHOLD):
        super().__init__(n_basis=n_basis, partitioning=partitioning, normalize=normalize, callbacks=callbacks, recordable_loss=recordable_loss, eps=eps)

        self.reference_id = reference_id
        self.threshold = threshold

    def __call__(self, input, iteration=100, **kwargs):
        """
        Args:
            input (n_channels, n_bins, n_frames)
        Returns:
            output (n_channels, n_bins, n_frames)
        """
        self.input = input

        self._reset(**kwargs)

        if self.recordable_loss:
            loss = self.compute_negative_loglikelihood()
            self.loss.append(loss)
        
        if self.callbacks is not None:
        
            for callback in self.callbacks:
                callback(self)

        for idx in tqdm(range(iteration)):

            self.update_once()

            if self.recordable_loss:
                loss = self.compute_negative_loglikelihood()
                self.loss.append(loss)

            if self.callbacks is not None:
                for callback in self.callbacks:
                    callback(self)
        

        X, W = input, self.demix_filter
        Y = self.separate(X, demix_filter=W)
        
        reference_id = self.reference_id
        
        scale = projection_back(Y, reference=X[reference_id])
        print(scale.shape)
        output = Y * scale[..., np.newaxis] # (n_sources, n_bins, n_frames)
        #output = Y
        self.estimation = output

        return output

    def __repr__(self):
        s = "Gauss-ILRMA("
        s += "n_basis={n_basis}"
        s += ", partitioning={partitioning}"
        s += ", normalize={normalize}"
        s += ")"

        return s.format(**self.__dict__)

    def update_once(self):
        eps = self.eps

        self.update_source_model_basic()
        self.update_spatial_model_ip()
        
        if self.normalize:
            X, W = self.input, self.demix_filter
            Y = self.separate(X, demix_filter=W)
            self.estimation = Y
            
            T = self.basis

            if self.normalize == 'power':
                P = np.abs(Y)**2
                aux = np.sqrt(P.mean(axis=(1, 2))) # (n_sources,)
                aux[aux < eps] = eps

                # Normalize
                W = W / aux[np.newaxis, :, np.newaxis]
                Y = Y / aux[:, np.newaxis, np.newaxis]

                # if self.partitioning
                T = T / (aux[:, np.newaxis, np.newaxis]**2)
            else:
                raise ValueError("Not support normalization based on {}. Choose 'power' or 'projection-back'".format(self.normalize))

            self.estimation = Y
            self.basis = T

            if self.demix_filter is not None:
                self.demix_filter = W
    
    def update_source_model_basic(self):
        eps = self.eps

        if self.demix_filter is None:
            Y = self.estimation
        else:
            X, W = self.input, self.demix_filter
            Y = self.separate(X, demix_filter=W)
        
        P = np.abs(Y)**2
        

        T, V = self.basis, self.activation

        # Update basis
        V_transpose = V.transpose(0, 2, 1)
        TV = T @ V
        TV[TV < eps] = eps
        division, TV_inverse = P / (TV**2), 1 / TV
        TVV = TV_inverse @ V_transpose
        TVV[TVV < eps] = eps
        T = T * (division @ V_transpose / TVV)**(1/2)
        
        # Update activations
        T_transpose = T.transpose(0, 2, 1)
        TV = T @ V
        TV[TV < eps] = eps
        division, TV_inverse = P / (TV**2), 1 / TV
        TTV = T_transpose @ TV_inverse
        TTV[TTV < eps] = eps
        V = V * (T_transpose @ division / TTV)**(1/2)

        self.basis, self.activation = T, V
    
    def update_spatial_model_ip(self):
        n_sources, n_channels = self.n_sources, self.n_channels
        n_bins = self.n_bins

        eps, threshold = self.eps, self.threshold

        # if self.partitioning
        T, V = self.basis, self.activation
        TV = T @ V
        R = TV # (n_sources, n_bins, n_frames)

        X = self.input
        W = self.demix_filter
        R = R[..., np.newaxis, np.newaxis] # (n_sources, n_bins, n_frames, 1, 1)
        
        X = X.transpose(1, 2, 0) # (n_bins, n_frames, n_channels)
        X = X[..., np.newaxis]
        X_Hermite = X.transpose(0, 1, 3, 2).conj()
        XX = X @ X_Hermite # (n_bins, n_frames, n_channels, n_channels)
        R[R < eps] = eps
        U = XX / R
        U = U.mean(axis=2) # (n_sources, n_bins, n_channels, n_channels)
        E = np.eye(n_sources, n_channels)
        E = np.tile(E, reps=(n_bins, 1, 1)) # (n_bins, n_sources, n_channels)

        for source_idx in range(n_sources):
            # W: (n_bins, n_sources, n_channels), U: (n_sources, n_bins, n_channels, n_channels)
            w_n_Hermite = W[:, source_idx, :] # (n_bins, n_channels)
            U_n = U[source_idx] # (n_bins, n_channels, n_channels)
            WU = W @ U_n # (n_bins, n_sources, n_channels)
            condition = np.linalg.cond(WU) < threshold # (n_bins,)
            condition = condition[:, np.newaxis] # (n_bins, 1)
            e_n = E[:, source_idx, :]
            w_n = np.linalg.solve(WU, e_n)
            wUw = w_n[:, np.newaxis, :].conj() @ U_n @ w_n[:, :, np.newaxis]
            denominator = np.sqrt(wUw.squeeze(axis=-1))
            w_n_Hermite = np.where(condition, w_n.conj()/ denominator, w_n_Hermite)
            # if condition number is too big, `denominator[denominator < eps] = eps` may diverge of cost function.
            W[:, source_idx, :] = w_n_Hermite

        self.demix_filter = W

        X = self.input
        Y = self.separate(X, demix_filter=W)
        
        self.estimation = Y
        
    def compute_negative_loglikelihood(self):
        n_frames = self.n_frames
        eps = self.eps

        X = self.input

        if self.demix_filter is None:
            Y = self.estimation
            W = self.compute_demix_filter(Y, X)
        else:
            W = self.demix_filter
            Y = self.separate(X, demix_filter=W)
        
        P = np.abs(Y)**2 # (n_sources, n_bins, n_frames)
        # if self.partitioning
        T, V = self.basis, self.activation
        TV = T @ V # (n_sources, n_bins, n_frames)
        R = TV
        
        R[R < eps] = eps
        loss = np.sum(P / R + np.log(R)) - 2 * n_frames * np.sum(np.log(np.abs(np.linalg.det(W))))

        return loss

In [None]:
mixture, sr = sf.read("sample2.wav")
x = mixture.T
n_channels, T = x.shape
n_sources = n_channels

In [None]:
display(ipd.Audio(mixture.T, rate=sr))

In [None]:
for idx in range(n_channels):
    display(ipd.Audio(x[idx], rate=sr))

In [None]:
fft_size, hop_size = 4096, 2048
_, _, X = ss.stft(x, nperseg=fft_size, noverlap=fft_size-hop_size)
np.random.seed(111)
ilrma = GaussILRMA(n_basis=2)
print(ilrma)
Y = ilrma(X, iteration=100)
_, y = ss.istft(Y, nperseg=fft_size, noverlap=fft_size-hop_size)
y = y[:, :T]

In [None]:
for idx in range(n_sources):
    display(ipd.Audio(y[idx], rate=sr))

In [None]:
plt.figure()
plt.title(f'Cost function with {ilrma.n_basis} basis')
plt.plot(ilrma.loss, color='black')
plt.hlines(0.95*ilrma.loss[-1],0,100, linestyle='--', color='r')
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.savefig('loss2.png')
plt.show()

In [None]:
mic_l, sr = sf.read("rsm2_mA.wav")
mic_r, sr = sf.read("rsm2_mB.wav")
x = np.vstack([mic_l, mic_r])
n_channels, T = x.shape
n_sources = n_channels

for idx in range(n_channels):
    display(ipd.Audio(x[idx], rate=sr))

In [None]:
fft_size, hop_size = 4096, 2048
_, _, X = ss.stft(x, nperseg=fft_size, noverlap=fft_size-hop_size)
np.random.seed(111)
ilrma = GaussILRMA(n_basis=2)
print(ilrma)
Y = ilrma(X, iteration=100)
_, y = ss.istft(Y, nperseg=fft_size, noverlap=fft_size-hop_size)
y = y[:, :T]

In [None]:
for idx in range(n_sources):
    display(ipd.Audio(y[idx], rate=sr))

In [None]:
plt.plot(y[0])
plt.show()

In [None]:
from scipy.io.wavfile import write
write("example.wav", sr, y[0])

In [None]:
plt.figure()
plt.title('')
plt.plot(ilrma.loss, color='black')
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.show()

In [None]:
mic_l, sr = sf.read("rssd_A.wav")
mic_r, sr = sf.read("rssd_B.wav")
x = np.vstack([mic_l, mic_r])
n_channels, T = x.shape
n_sources = n_channels

for idx in range(n_channels):
    display(ipd.Audio(x[idx], rate=sr))

In [None]:
fft_size, hop_size = 4096, 2048
_, _, X = ss.stft(x, nperseg=fft_size, noverlap=fft_size-hop_size)
np.random.seed(111)
ilrma = GaussILRMA(n_basis=5)
print(ilrma)
Y = ilrma(X, iteration=10)
_, y = ss.istft(Y, nperseg=fft_size, noverlap=fft_size-hop_size)
y = y[:, :T]

for idx in range(n_sources):
    display(ipd.Audio(y[idx], rate=sr))

In [None]:
mixture, sr = sf.read("mix.wav")
x = mixture.T
n_channels, T = x.shape
n_sources = n_channels

for idx in range(n_channels):
    display(ipd.Audio(x[idx], rate=sr))

In [None]:
fft_size, hop_size = 4096, 2048
_, _, X = ss.stft(x, nperseg=fft_size, noverlap=fft_size-hop_size)
np.random.seed(111)
ilrma = GaussILRMA(n_basis=5)
print(ilrma)
Y = ilrma(X, iteration=100)
_, y = ss.istft(Y, nperseg=fft_size, noverlap=fft_size-hop_size)
y = y[:, :T]

for idx in range(n_sources):
    display(ipd.Audio(y[idx], rate=sr))

In [None]:
from separation import bss_eval_sources
from itertools import permutations


def SDR(est, egs, mix):
    '''
        calculate SDR
        est: Network generated audio
        egs: Ground Truth
    '''
    length = egs.shape[0]
    sdr, _, _, _ = bss_eval_sources(egs[:length], est[:length])
    mix_sdr, _, _, _ = bss_eval_sources(egs[:length], mix[:length])
    return sdr-mix_sdr


def permutation_sdr(est_list, egs_list, mix, per):
    n = len(est_list)
    result = sum([SDR(est_list[a], egs_list[b], mix)
                      for a, b in enumerate(per)])/n
    return result

In [None]:
a, sr = sf.read("piano.wav")
b, sr = sf.read("violon.wav")

In [None]:
from itertools import permutations


In [None]:
SDR(mixture.T[0], a, a+b)

In [None]:
for p in permutations(range(2)):
  print(permutation_sdr(y, np.stack([a,b]),a+b,p))

In [None]:
x_l, sr = sf.read("mix_hf1.wav")
x_r, sr = sf.read("mix_hf2.wav")
a, sr = sf.read("homme.wav")
b, sr = sf.read("femme.wav")
x = np.stack([x_l, x_r])
n_channels, T = x.shape
n_sources = n_channels

fft_size, hop_size = 4096, 2048
_, _, X = ss.stft(x, nperseg=fft_size, noverlap=fft_size-hop_size)
sdrs = []
for n_bases in range(2,10):
  print(n_bases)
  sdrs_base = []
  for _ in range(10):
    ilrma = GaussILRMA(n_basis=n_bases)
    Y = ilrma(X, iteration=100)
    _, y = ss.istft(Y, nperseg=fft_size, noverlap=fft_size-hop_size)
    y = y[:, :T]
    sdrs_base.append(np.max(
    [permutation_sdr(y, np.stack([a,b]),a+b,p) for p in permutations(range(2))] 
    ))
  sdrs.append(sdrs_base)


In [None]:
sdrs = np.array(sdrs)

In [None]:
sdrs.mean(axis=1)

In [None]:
plt.bar(np.arange(2,10),sdrs.mean(axis=1))

In [None]:
sdrs.shape

In [None]:
plt.boxplot(sdrs.T, positions=range(2,10))
plt.xlabel('Number of basis')
plt.ylabel('SDR')
plt.title('Boxplot of SDR as a function of basis number')
plt.savefig('SDR.png')
plt.show()

In [None]:
display(ipd.Audio(y[0], rate=sr))
display(ipd.Audio(y[1], rate=sr))

In [None]:
SDR(mixture.T[1], a, a+b)

In [None]:
mixture, sr = sf.read("test.wav")
x = mixture.T
n_channels, T = x.shape
n_sources = n_channels

for idx in range(n_channels):
    display(ipd.Audio(x[idx], rate=sr))

In [None]:
display(ipd.Audio(mixture.T, rate=sr))

In [None]:
fft_size, hop_size = 11000, 2048
_, _, X = ss.stft(x, nperseg=fft_size, noverlap=fft_size-hop_size)
np.random.seed(111)
ilrma = GaussILRMA(n_basis=5)
print(ilrma)
Y = ilrma(X, iteration=100)
_, y = ss.istft(Y, nperseg=fft_size, noverlap=fft_size-hop_size)
y = y[:, :T]

In [None]:
display(ipd.Audio(y[0], rate=sr))
display(ipd.Audio(y[1], rate=sr))

In [None]:
mixture, sr = sf.read("test2.wav")
x = mixture.T
n_channels, T = x.shape
n_sources = n_channels

for idx in range(n_channels):
    display(ipd.Audio(x[idx], rate=sr))


In [None]:
fft_size, hop_size = 11000, 2048
_, _, X = ss.stft(x, nperseg=fft_size, noverlap=fft_size-hop_size)
np.random.seed(111)
ilrma = GaussILRMA(n_basis=10)
print(ilrma)
Y = ilrma(X, iteration=100)
_, y = ss.istft(Y, nperseg=fft_size, noverlap=fft_size-hop_size)
y = y[:, :T]

In [None]:
display(ipd.Audio(y[0], rate=sr))
display(ipd.Audio(y[1], rate=sr))