<h1>TMA4320 - Project 1 - Independent component analysis</h1>

<h2>Abstract</h2>
This notebook is the first project in TMA4320 at the Norwegian University of Science and Technology, and it describes the basic principles of independent component analysis, and uses it to decompose mixed sound signals into sepparate components.

^kan sikkert skrives bedre

<h2>1 - Introduction</h2>

Explain (fast)ICA, why/how it works etc.

Description of each function is provided below.

ca. en side.

<h2>2 - The algorithm</h2>

<h2>2.1 - Preparations</h2>

Firsly we need to import the data (sound files) that we will be working with and make them playable. The three cells below uploads the provided sound files, utilising the provided script "wav_file_loader.py", and the files are assumed to be placed in .audio/.


In [None]:
'''This cell uploads the files that are to be decomposed.'''

import numpy as np
from wav_file_loader import read_wavefiles

paths = ['audio/mix_1.wav', 'audio/mix_2.wav', 'audio/mix_3.wav']
data, sampling_rate = read_wavefiles(paths)
num_signals = data.shape[0]

In [None]:
'''This cell normalizes the signals to a common volume level.'''

def normalize_audio(data):
    """Scale amplitude s.t. max(data[i]) == 1."""
    abs_data = np.absolute(data)
    maximums = np.amax(abs_data,1)
    # Divide each row by a different vector element:
    data = data / maximums.reshape((3,1))
    return data

data = normalize_audio(data)

<h3>The output widgets of the cells below allows playback of the provided mixed audio.</h3>

In [None]:
'''This cell makes widgets for playing the uploaded clips.'''

import IPython.display as ipd

for i in range(data.shape[0]):
    ipd.display(ipd.Audio(data[i,:], rate=sampling_rate))

<h2>2.2 - Mixing</h2>

The cells in this section is for mixing sepparate, clear audio into noisy clips. This is used to create some custom clips to further test the algorithm.

In [None]:
def normalize_rowsums(A):
    '''
    Divide each row in A by its sum.
    The sum of each row in the result is 1.0.
    '''
    
    the_sum = np.sum(A,1)
    A = A / the_sum.reshape((3,1))
    return A

def random_mixing_matrix(signals, observations):
    '''
    Creates a random matrix
    Each element is a small positive number, not too close to 0.
    (1/11, 5/7).
    '''
    
    A = 0.25 + np.random.rand(observations, signals)
    return normalize_rowsums(A)


In [None]:
A = random_mixing_matrix(num_signals, num_signals)
data_mixed = normalize_audio(A @ data)

<h3>Again, the widgets below allows playback of the provided mixed audio.</h3>

In [None]:
for i in range(data.shape[0]):
    ipd.display(ipd.Audio(data[i,:], rate=sampling_rate))

<h2>2.3 - Custom sound clips</h2>

The cell below combines the functions from the above two sections(2.1 and 2.2) to upload and mix some audio of our own, and makes the resulting mix playable using the outputted widgets. This is decomposed together with the provided audio when the algorithm is run.

In [None]:
#Dette er egenmikset lyd

paths1 = ['audio/clip1.wav', 'audio/clip2.wav', 'audio/clip3.wav']
data1, sampling_rate1 = read_wavefiles(paths1)
num_signals1 = data1.shape[0]

B = random_mixing_matrix(num_signals1, num_signals1)
data_mixed1 = normalize_audio(B @ data1)

for i in range(data.shape[0]):
    ipd.display(ipd.Audio(data_mixed1[i,:], rate=sampling_rate1))

<h2>2.4 - Preprocessing</h2>

The following cell contains the functions <font color='blue'> center_rows() </font> and <font color='blue'> whiten_rows() </font>. (...)

In [None]:
def center_rows(Z):
    """
    Ensures each row has zero mean.
    Takes a matrix of arbitrary shape and subtracts from each row the mean value of that row.
    """
    
    # The code returns a dxN-matrix, say Zc, where each row has zero mean
    row_means = np.mean(Z, axis=1)
    Z_transposed = Z.transpose()
    Zc_transposed = Z_transposed - row_means
    Zc = Zc_transposed.transpose()
    return Zc #, mus


def whiten_rows(Z):
    """
    Return whitened version of Z and the matrix for the transform, say Zw, T, where Zw=T*Z
    """
    
    # Covariance matrix, C
    C = np.cov(Z)
    
    # The following two statements compute T (inverse square root of C).
    U, S, _ = np.linalg.svd(C, full_matrices=False)
    T  = U @ np.diag(1 / np.sqrt(S)) @ U.T
    
    # Finally the withened version of Z, Zw
    Zw = np.matmul(T,Z)
    
    return Zw, T

<h2>2.5 - Main iteration - Maximization of non-gaussianity(??)</h2>

In [None]:
def normalize_rownorms(Z):
    """
    Divides each row in matrix Z by its Euclidean norm, 
    so that he norm of each row in the output is equal to one.
    
    It first computes the Euclidean norm of each row of the input matrix Z,
    then scales each row by this norm. 
    """
     
    e_norms = np.linalg.norm(Z, axis=1)
    Z_trans = Z.transpose()
    Z_norm_trans = Z_trans / e_norms
    Z_norm = Z_norm_trans.transpose()
    return Z_norm

In [None]:
def decorrelate_weights(W):
    """
    This is the orthogonalization step (or decorrelation step) The dxd input matrix W is projected onto an 
    orthogonal matrix by the transformation Wd = (WW^T)^{-1/2} W as described in the note. The single output 
    argument is the projected W-matrix (Wd).
    
    Uses a similar technque for computing the inverse square root as in the whitening step.
    """
    
    # Matrix product of W and its transposed
    WW_T = np.matmul(W,W.transpose())
    
    # Computes T, the inverse square root of WW_T
    U, S, _ = np.linalg.svd(WW_T, full_matrices=False)
    T = U @ np.diag(1 / np.sqrt(S)) @ U.transpose()
    
    # Finally computes the decorrelated matrix Wd
    Wd = np.matmul(T, W)

    return Wd


In [None]:
    # kurtosis and derivative as lambda funtions.
    kurtosis = lambda u: 4*(u**3)
    kurtosis_d = lambda u: 12*(u**2)
    
    # negentropy and derivative as lambda functions.
    negentropy = lambda u: u * np.exp(-((u**2)/2))
    negentropy_d = lambda u: -np.exp(-((u**2)/2)) * (u**2 - 1)
 

In [None]:
def update_W(W, Zcw, func, func_d):
    """
    Calculates W_k+1 from W_k.
    The input is W=W_k (d x d) as well as the cenztered, whitened data Zcw (dxN known as tilde{x} in the note)
    Output is the new W (W_{k+1}).
    
    This function does the two iteration steps in the note: The optimisation step and the 
    orthogonalisation (decorrelation) step. The first step, the orthogonalisation is provided by the 
    function decorrelate_weights which is called below.
    
    Uses kurtosis or negentropy, whichever is passed as input.
    """
    
    s_k = np.dot(W, Zcw)
        
    #Apply kurtosis or negentropy functions and derivatives.
    G = func(s_k)
    G_d = func_d(s_k)
    N = G.shape[1]
        
    # Computes W_k+1, here called W_p.
    W_p = (1/N) * np.dot(G, Zcw.transpose()) - np.matmul(np.diag(np.average(G_d, axis=1)), W)
    
    # Normalizes rows of W_p (W_k+1).
    W_pn = normalize_rownorms(W_p)
    
    # Orthogonalization step:
    W_pnd = decorrelate_weights(W_pn)

    return W_pnd

In [None]:
def measure_of_convergence(W1, W2):
    """
    This function computes an error estimate for the maximisation iteration, it computes the convergence
    criterion given in the note. 
    Input: W1 is the previous iterate, and W2 is the one just computed.
    Output: The quantity delta defined in the note.
    """
    
    a_s = np.absolute(np.sum(np.multiply(W2, W1), axis=1))
    delta = np.absolute(np.amax((1-a_s)))
    return delta

In [None]:
import warnings

tol_default = 1e-10

def fast_ICA(Z, signals_to_find, func, func_d, tol=tol_default, max_iter=100):
    """ 
    This is the function that organises all the work.
    
    Input: Z is the unprocessed data
           signals_to_find: in our case, always d the number of sources
           tol is the tolerance, default value 1.0e-10
           max_iter abort after max_iter iterations if not converged, (to avoid infinite loop)
           Which function to measure gaussanity (kurtosis or negentropy)
           
    Output: Z_ica, the separated signals (dxN matrix, approximating the sources)
            W The final converged W-matrix (dxd)
            Also some other variables of interest can be returned if desired
    """
   
    # center the rows of Z
    Z_cent = center_rows(Z)

    # whiten the centered rows
    Z_cent_wit, T = whiten_rows(Z_cent)
 
    # Put W_0 = W to a random initial value and normalise the rows to length 1
    M = Z_cent.shape[0]
    W_0 = np.random.rand(M, M)
    W_0 = normalize_rowsums(W_0)
    
    # Initialise some variables to prepare for the while-loop (such as delta)
    delta = tol + 1
    number_of_iter = 0

    # while delta>tol and number_of_iterations < max_iter:
    # an iteration to get a new W-iterate 
    # the error estimate to update delta

    while delta > tol and number_of_iter < max_iter:
        W_p  = update_W(W_0, Z_cent_wit, func, func_d)
        delta = measure_of_convergence(W_0, W_p)
        W_0 = W_p
        number_of_iter += 1
        
    # Clean up, check if converged or max_iter attained
    if number_of_iter == max_iter:
        print('max_iter using reached \ndelta: ', delta)
        
    else:
        print("Endelig delta: ", delta)
        print("Antall iterasjoner: ", number_of_iter, '\n')
        
    Z_ica = np.matmul(W_p, Z_cent_wit)
    
    return Z_ica, W_p

In [None]:
'''
Runs the code and sepparates the audio!
'''
# Sepparates provided mix:
print('Provided mix: \n')


print('kurtosis: \n')
Z_ica_kur, W_ica_kur = fast_ICA(data, 3, kurtosis, kurtosis_d)
print('negentropy: \n')
Z_ica_neg, W_ica_neg = fast_ICA(data, 3, negentropy, negentropy_d)


# Sepparates custom mix:
print('Custom mix: \n')

print('kurtosis: \n')
Z_ica_kur1, W_ica_kur1 = fast_ICA(data1, 3, kurtosis, kurtosis_d)
print('negentropy: \n')
Z_ica_neg1, W_ica_neg1 = fast_ICA(data, 3, negentropy, negentropy_d)

<h2>3 - Results</h2>

Below both the original mixed and the unmixed files can be played back and compared.

<h3>The provided mixed audio:</h3>

In [None]:
for i in range(data.shape[0]):
    ipd.display(ipd.Audio(data[i,:], rate=sampling_rate))

<h3>The provided audio unmixed(kurtosis):</h3>

In [None]:
for i in range(data.shape[0]):
    ipd.display(ipd.Audio(Z_ica_kur[i,:], rate=sampling_rate))

<h3>The provided audio unmixed(negentropy):</h3>

In [None]:
for i in range(data.shape[0]):
    ipd.display(ipd.Audio(Z_ica_neg[i,:], rate=sampling_rate))

<h3>The custum audio unmixed:</h3>

In [None]:
for i in range(data.shape[0]):
    ipd.display(ipd.Audio(data_mixed1[i,:], rate=sampling_rate1))

<h3>The custom audio unmixed(kurtosis):</h3>

In [None]:
for i in range(data.shape[0]):
    ipd.display(ipd.Audio(Z_ica_kur1[i,:], rate=sampling_rate1))

<h3>The custom audio unmixed(negentropy):</h3>

In [None]:
for i in range(data.shape[0]):
    ipd.display(ipd.Audio(Z_ica_neg1[i,:], rate=sampling_rate1))

<h2>Conclusion/discussion</h2>

Not sure what to say here thoo.


Would be cool to log number of iterations for kurtosis and negentropy and occurrence for many runs, and eventually plot to get comparison data.
-Could make a cell that runs the algorith say 20 times, and makes the plots, but it will have a significant runtime, so it should be placed late(last) in the document. If done like this one should be vary of this cell when using "run all".