# TMA4320 - Mal for kode til Prosjekt 1
Dette er et forslag til hvordan du kan strukturere python-kode for å gjøre uavhengig komponentanalyse på lydsignaler.

## Forberedelser
Først må vi importere data. Vi antar i koden nedenfor at lydfilene med miksede signaler ligger på audio/mix_1.wav,
audio/mix_2.wav, audio/mix_3.wav. Du skal i arkivet Prosjekt1.zip kunne finne disse filene samt wav_file_loader.py som blir importert i koden nedenfor.


In [1]:
"""I denne cella laster vi opp tre miksede lydklipp som vi skal bruke å teste ut algoritmen på"""
import numpy as np
from wav_file_loader import read_wavefiles

paths = ['audio/mix_1.wav', 'audio/mix_2.wav', 'audio/mix_3.wav']
data, sampling_rate = read_wavefiles(paths)
num_signals = data.shape[0]


In [2]:
"""I denne cellen fins en funksjon som normaliserer lydsignalenes volum, slik at de får omtrent samme lydstyrke"""
def normalize_audio(data):
    """Scale amplitude s.t. max(data[i]) == 1."""
    abs_data = np.absolute(data)
    maximums = np.amax(abs_data,1)
    # Divide each row by a different vector element:
    data = data / maximums.reshape((3,1))
    return data

data = normalize_audio(data)

In [3]:
"""Her kan du spille av de tre opplastede lydklippene"""
import IPython.display as ipd

ipd.display(ipd.Audio(data[0,:], rate=sampling_rate))
ipd.display(ipd.Audio(data[1,:], rate=sampling_rate))
ipd.display(ipd.Audio(data[2,:], rate=sampling_rate))

## Miksing
Denne delen om miksing, inkludert kodecellen nedenfor kan du foreløpig ignorere. Men dersom du seinere vil teste ut algoritmen ved å selv blande opplastede uavhengige signaler så kan funksjonene nedenfor komme til nytte.

In [4]:
def normalize_rowsums(A):
    """Divide each row in A by its sum.
    
    The sum of each row in the result is 1.0."""
    the_sum = np.sum(A,1)
    A = A / the_sum.reshape((3,1))
    return A

def random_mixing_matrix(signals, observations):
    """ Creates a random matrix
    
    Each element is a small positive number, not too close to 0.
    (1/11, 5/7).
    """
    A = 0.25 + np.random.rand(observations, signals)
    return normalize_rowsums(A)


In [5]:
A = random_mixing_matrix(num_signals, num_signals)
data_mixed = normalize_audio(A @ data)

In [6]:
import IPython.display as ipd

ipd.display(ipd.Audio(data_mixed[0,:], rate=sampling_rate))
ipd.display(ipd.Audio(data_mixed[1,:], rate=sampling_rate))
ipd.display(ipd.Audio(data_mixed[2,:], rate=sampling_rate))

## Preprosessering
I den etterfølgende cellen skal du skrive funksjonene <font color='blue'> center_rows </font> og <font color='blue'> whiten_rows </font> for å gjøre de preprosesseringsstegene som er omtalt i prosjektbeskrivelsen. I hvert tilfelle er variabelen Z et array av dimensjon $d\times N$ av miksede signaler. d er lik antall lydkilder og N er lik antall samlpler.

In [7]:
import numpy as np

def center_rows(Z):    
    mus = np.mean(Z, axis =1).reshape(Z.shape[0],1)  #Z.shape[0] = d (antall mikrofoner)
    #lager en vektor av gjennomsnittet av hver rad og endrer vektoren fra radvektor til kolonnevektor
    return Z - mus
    


def whiten_rows(Z):
    C = np.cov(Z)
    U, S, _ = np.linalg.svd( C, full_matrices=False )
    T  = U @ np.diag( 1 / np.sqrt(S) ) @ U.T
    Zw = np.matmul(T,Z)
    return Zw , T
    
    
    
    #Return whitened version of Z and the matrix for the transform, say Zw, T, where Zw=T*Z
    

    # Your code goes here.
    # Hints: The covariance matrix can be obtained by the function cov in numpy, call it C.
    # The following two statements compute T (inverse square root of C).
    #U, S, _ = np.linalg.svd(C, full_matrices=False)
    #T  = U @ np.diag(1 / np.sqrt(S)) @ U.T
 

In [8]:
#tester

Z = np.array( [[1,2,5],
           [1,4,6], 
            [3,4,7] ])



S = np.array( [[0.1, 0.3, 0.4, 0.8, 0.9],
    [3.2, 2.4, 2.4, 0.1, 5.5],
    [10.0, 8.2, 4.3, 2.6, 0.9]] )

print(center_rows(S))

#print(whiten_rows(S))

        

[[-0.4  -0.2  -0.1   0.3   0.4 ]
 [ 0.48 -0.32 -0.32 -2.62  2.78]
 [ 4.8   3.   -0.9  -2.6  -4.3 ]]


## Hovediterasjonen - maksimering av ikke-gaussiskhet

In [9]:
def normalize_rownorms(Z):
    norm = np.linalg.norm (Z, axis = 1).reshape(Z.shape[0],1) #normaliserer hver rad i Z og legger i en egen vektor
    return Z/norm  #dividerer hver rad med denne normen
  
    

In [10]:
#tester    
print (normalize_rownorms(Z))


[[0.18257419 0.36514837 0.91287093]
 [0.13736056 0.54944226 0.82416338]
 [0.34874292 0.46499055 0.81373347]]


In [11]:
def decorrelate_weights(W):
    Wt = np.transpose(W)
    Wp = np.matmul(W,Wt)
    
    
    U, S, _ = np.linalg.svd( Wp, full_matrices=False )
    Wd  = U @ np.diag( 1 / S ) @ U.T @ W                #inverserer matrisen og mult. med W
    
    return Wd

""" This is the orthogonalization step (or decorrelation step) The dxd input matrix W is projected onto an 
    orthogonal matrix by the transformation Wd = (WW^T)^{-1} W as described in the note. The single output 
    argument is the projected W-matrix (Wd)
    Hint: 1Use a similar technque for the inverse square root as in the whitening step
    """



' This is the orthogonalization step (or decorrelation step) The dxd input matrix W is projected onto an \n    orthogonal matrix by the transformation Wd = (WW^T)^{-1} W as described in the note. The single output \n    argument is the projected W-matrix (Wd)\n    Hint: 1Use a similar technque for the inverse square root as in the whitening step\n    '

In [12]:
#tester
print(Z)
print (decorrelate_weights(Z))

[[1 2 5]
 [1 4 6]
 [3 4 7]]
[[-0.28571429 -0.78571429  0.57142857]
 [-0.42857143  0.57142857 -0.14285714]
 [ 0.57142857  0.07142857 -0.14285714]]


In [13]:
#funksjonen regner ut W_k1 fra W_k
# der input W (dxd) og Zcw (dxN)
def update_W(W, Zcw):
     
    #optimering
    s_k = W@Zcw #matrisemultiplikasjon
    
    G = 4 * np.power(s_k, 3) 
    dG = 12* np.power(s_k, 2)
    
    N = Zcw.shape[1] #finner antall kolonner i matrisen Zcw
    EdG = np.mean(dG, axis=1) #finner forventningsveriden av den deriverte av funksjonen G, dG
   
    W_pluss = 1/N * G @ Zcw.T @ np.diag(EdG)@ W
    
    
    # ortogonalisering
    W_k1 = decorrelate_weights(W_pluss)
    
    
    return W_k1


In [14]:
print( update_W(Z,Z))

[[ 1.14710878e-08  2.24441826e-09 -5.05843939e-09]
 [-8.67559813e-10  4.68568058e-09 -2.44096942e-09]
 [-1.92235855e-09 -2.95878916e-09  2.34985776e-09]]


In [15]:
# sjekker avviket mellom Wk og Wk1
# antar at input-matrisen er normalisert

def measure_of_convergence(W1, W2):
    summ = 0
   
    summ = np.sum(W1*W2, axis = 1)
    delta = np.amax(1 - np.absolute(summ) )
    return delta 
    
    """This function computes an error estimate for the maximisation iteration, it computes the convergence
    criterion given in the note. 
    Input: W1 is the previous iterate, and W2 is the one just computed. # ? hva mener de her?
    Output: The quantity delta defined in the note.
    Typical numpy-functions to use: numpy.sum, numpy.absolute, numpy.amax.
    
    Your code goes here:
    """
print (measure_of_convergence(Z,Z)) 
    

-29


In [16]:
import warnings

#d er antall mikrofoner/lydkilder
# N er antall signaler
def fast_ICA(Z, d, tol=1e-10, max_iter=100):
    
    Z_C = center_rows(Z)
    Zcw, T = whiten_rows(Z_C) #forbereder matrisen til neste steg
    
    
    W_0 = np.random.rand(d,d) #lager random matrise
    Wk = normalize_rownorms(W_0) #normaliserer den random matrisen
    
    delta = 1 #definerer delta
    iterasjoner = 0 
    
    while (delta > tol and iterasjoner < max_iter):
        Wk1 = update_W(Wk, Zcw)
        delta = measure_of_convergence(Wk,Wk1)
        Wk = Wk1    
    
        iterasjoner += 1
    Wk = Wk1
    Z_ica = Wk@Zcw
    
    return Z_ica, Wk
    
""" 
This is the function that organises all the work.
    
Input: Z is the unprocessed data
           signals_to_find: in our case, always d the number of sources
           tol is the tolerance, default value 1.0e-10
           max_iter abort after max_iter iterations if not converged, (to avoid infinite loop)
Output: Z_ica, the separated signals (dxN matrix, approximating the sources)
            W The final converged W-matrix (dxd)
            Also some other variables of interest can be returned if desired
"""
# center the rows of Z
# whiten the centered rows
# Put W_0 = W to a random initial value and normalise the rows to length 1
# Initialise some variables to prepare for the while-loop (such as delta)
# while delta>tol and number_of_iterations < max_iter:
#      do an iteration to get a new W-iterate 
#      Compute the error estimate to update delta
# Clean up, check if converged or max_iter attained
    


' \nThis is the function that organises all the work.\n    \nInput: Z is the unprocessed data\n           signals_to_find: in our case, always d the number of sources\n           tol is the tolerance, default value 1.0e-10\n           max_iter abort after max_iter iterations if not converged, (to avoid infinite loop)\nOutput: Z_ica, the separated signals (dxN matrix, approximating the sources)\n            W The final converged W-matrix (dxd)\n            Also some other variables of interest can be returned if desired\n'

In [17]:
Z, W = fast_ICA(data_mixed,3, tol=1e-10, max_iter=100)

In [18]:
ipd.display(ipd.Audio(Z[0,:], rate=sampling_rate))
ipd.display(ipd.Audio(Z[1,:], rate=sampling_rate))
ipd.display(ipd.Audio(Z[2,:], rate=sampling_rate))