In [None]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

#### Problem 2 - Solving the cocktail party problem with ICA

"Independent component analysis was originally developed to deal with problems that are closely related to the cocktail-party problem. The goal is to find a linear representation of nongaussian data so that the components are statistically independent, or as independent as possible. Such a representation seems to capture the essential structure of the data in many applications, including feature extraction and signal separation." More details on ICA can be found in https://www.cs.helsinki.fi/u/ahyvarin/papers/NN00new.pdf
<br>

In this problem, we take the mixed sounds and images, and apply ICA tn them to separate the sources.

<span style="color:blue"> <i> 1. Read the 3 sound files, and plot them as a function of time.  (In order to better see the features, you may plot them with some offsets.) </i></span> <br>

In [None]:
from scipy.io import wavfile

In [None]:
fs, data1 = wavfile.read('mix_sound1.wav')
fs, data2 = wavfile.read('mix_sound2.wav')
fs, data3 = wavfile.read('mix_sound3.wav')
#fs is the sample rate, i.e., how many data points in one second.

data = np.float64(np.c_[data1, data2, data3])

In [None]:
plt.figure(figsize = (10,7))

...

plt.xlabel('Time [second]')
plt.ylabel('Sound signal + offset')
plt.show()

<span style="color:blue"> <i> 2. Now run the following cells and play the sounds. </i></span> <br>

In [None]:
from IPython.display import Audio

In [None]:
Audio('mix_sound1.wav')

In [None]:
Audio('mix_sound2.wav')

In [None]:
Audio('mix_sound3.wav')

You can tell there are 3 signals mixed in these sounds. You can consider these sounds as recorded by 3 different recorders that have different distrance to the 3 sources. In orther words,
<br><br>
\begin{equation}
 \bf X = A S + \mu \tag{1}
\end{equation}
<br><br>
where $\boldsymbol X$ is a vector of these 3 sounds, $\boldsymbol S$ is a vector of 3 signals, $\boldsymbol \mu$ is a vector of the mean of $\boldsymbol X$, and $\boldsymbol A$ is the mixing matrix. 
<br><br>
Next, using the sklearn's fastICA module, we will separate the signals from these sounds.
<br><br>
(1) Define the fastICA model:

&nbsp; **ica = FastICA(algorithm='parallel')**

(2) Using "fit_transform," fit the model with the data and obtain the signals

&nbsp; **S = ica.fit_transform(data)**

<span style="color:blue"> <i> 3. Find the 3 signals in the sound files. Plot the signals. (Again, you may plot them with some offsets.)</i></span> <br>

In [None]:
from sklearn.decomposition import FastICA

In [None]:
...

In [None]:
plt.figure(figsize = (10,7))

...

plt.xlabel('Time [second]')
plt.ylabel('Sound signal + offset')
plt.show()

You will find the amplitude of the signals is very small. This is because fastICA whitens the data first before applying ICA, so that the covariance matrix of the signals is I. We can amplify the signals by multiplying them with a large number.

<span style="color:blue"> <i> 4. Now let's save the signals as wav files and play the sounds. </i></span> <br>

In [None]:
Amp = 1e6

wavfile.write('signal_sound1.wav', fs, np.int16(Amp*S[:,0]))
wavfile.write('signal_sound2.wav', fs, np.int16(Amp*S[:,1]))
wavfile.write('signal_sound3.wav', fs, np.int16(Amp*S[:,2]))

Play the sounds.

In [None]:
...

In [None]:
...

In [None]:
...

Now we can reconstruct the mixed sounds with the signals. The mixing matrix is given by ica.mixing_, and the mean of the data is given by ica.mean_. Note that the $X$ and $S$ from equation (1) are matrices of shape (Nsignal, Nsample), but the data and the signal you get from FastICA are matrices of shape (Nsample, Nsignal).

<span style="color:blue"> <i> 5. Reconstruct the sounds from the source signal, and show that the reconstruct sounds is very close to the given sounds using np.allclose</i></span> <br>

In [None]:
#mixing matrix
A = ...
mu = ...

X = ...

In [None]:
print (np.allclose(X, data))

<span style="color:blue"> <i> 6. The ICA requires the data to be centered and whitened. The FastICA module from sklearn does the data centering and whitening automatically. Now let's disable the data preprocessing in FastICA by ica = FastICA(whiten=False), and then redo the analysis in part 3 and 4. Plot and play the sounds. Does ICA work without data preprocessing?</i></span> <br>

In [None]:
...

In [None]:
plt.figure(figsize = (10,7))

...

plt.xlabel('Time [second]')
plt.ylabel('Sound signal + offset')
plt.show()

Save the sounds.

In [None]:
...

Play the sounds.

In [None]:
...

In [None]:
...

In [None]:
...

<span style="color:blue"> <i> 7. The principal companent analysis (PCA) also tries to interpret the underlying structure of the data by decomposing the data into linear combinations of the principal components. Now let's apply PCA to the sounds and see if the signals are cleanly separated in the principal components. Plot the principal components, save them as wav files and play the sounds. How does it compares to Part 3 and 4? </i></span> <br>

In [None]:
from sklearn.decomposition import PCA

In [None]:
...

In [None]:
plt.figure(figsize = (10,7))

...

plt.xlabel('Time [second]')
plt.ylabel('Sound signal + offset')
plt.show()

Save the sounds.

In [None]:
...

Play the sounds.

In [None]:
...

In [None]:
...

In [None]:
...

Now let's take a look at another example. Suppose now we have some linearly mixed images, and we are going to find the original photos with ICA. (This example is from https://github.com/vishwajeet97/Cocktail-Party-Problem)

<span style="color:blue"> <i> 8. Load in photos, and plot them. </i></span> <br>

In [None]:
import matplotlib.image as mpimg

In [None]:
img1=mpimg.imread('unos.jpg')
img2=mpimg.imread('dos.jpg')
img3=mpimg.imread('tres.jpg')

In [None]:
...

The imag is a 2D array. You will need to flatten the data for the following analysis.

<span style="color:blue"> <i> 9. Redo the analysis in part 3 and 4. Separate the original photos and plot them. Note that the sign of the signals recovered by ICA may not be correct, so you probably need to multiply the photos by -1. </i></span> <br>

In [None]:
...

Plot the original photos.

In [None]:
...

ICA algorithm tries to find the most non-Gaussian directions of given data. FastICA uses the KL divergence between the data and standard Gaussian (negentropy) to charactrize the non-Gaussianity. Another way to measure non-Gaussianity is to use the Wasserstein distance between the data and standard Gaussian. In 1D, the Wasserstein distance, also called earth mover's distance, has a closed form solution using Cumulative Distribution Function (CDF). Below we provide you the code for doing ICA using Wasserstein distance. The code searches for the most non-Gaussian directions by maximizing the Wasserstein distance between the data and Gaussian.

<span style="color:blue"> <i> 10. Perform ICA on the mixed photos from Q9 (do the following steps) </i></span> <br>

1. Whitens the data with sklearn.decomposition.PCA(whiten=True)
2. Run ICA with Wasserstein distance: $A$ = ICA_Wasserstein($X_{\mathrm{whiten}}$)
3. Recover the Signal S from the whitened data and mixing matrix $A$. Note that $\mu=0$ because of the whitening, and the shape of $X$ of equation (1) is (Nsignal, Nsample).
4. Plot the signals (original photos).

In [None]:
import torch
import torch.optim as optim

def Percentile(input, percentiles):
    """
    Find the percentiles of a tensor along the last dimension.
    Adapted from https://github.com/aliutkus/torchpercentile/blob/master/torchpercentile/percentile.py
    """
    percentiles = percentiles.double()
    in_sorted, in_argsort = torch.sort(input, dim=-1)
    positions = percentiles * (input.shape[-1]-1) / 100
    floored = torch.floor(positions)
    ceiled = floored + 1
    ceiled[ceiled > input.shape[-1] - 1] = input.shape[-1] - 1
    weight_ceiled = positions-floored
    weight_floored = 1.0 - weight_ceiled
    d0 = in_sorted[..., floored.long()] * weight_floored
    d1 = in_sorted[..., ceiled.long()] * weight_ceiled
    result = d0+d1
    return result


def SlicedWassersteinDistanceG(x, pg, q, p, perdim=True):
    if q is None:
        px = torch.sort(x, dim=-1)[0]
    else:
        px = Percentile(x, q)

    if perdim:
        WD = torch.mean(torch.abs(px-pg) ** p)
    else:
        WD = torch.mean(torch.abs(px-pg) ** p, dim=-1)
    return WD


def SWD_prepare(Npercentile=100, device=torch.device("cuda:0"), gaussian=True):
    start = 50 / Npercentile
    end = 100-start
    q = torch.linspace(start, end, Npercentile, device=device)
    if gaussian:
        pg = 2**0.5 * torch.erfinv(2*q/100-1)
        return q, pg

    
def maxSWDdirection(x, x2='gaussian', n_component=None, maxiter=200, Npercentile=None, p=2, eps=1e-6):

    #if x2 is None, find the direction of max sliced Wasserstein distance between x and gaussian
    #if x2 is not None, find the direction of max sliced Wasserstein distance between x and x2 

    if x2 != 'gaussian':
        assert x.shape[1] == x2.shape[1]
        if x2.shape[0] > x.shape[0]:
            x2 = x2[torch.randperm(x2.shape[0])][:x.shape[0]]
        elif x2.shape[0] < x.shape[0]:
            x = x[torch.randperm(x.shape[0])][:x2.shape[0]]

    ndim = x.shape[1]
    if n_component is None:
        n_component = ndim

    q = None
    if x2 == 'gaussian':
        if Npercentile is None:
            q, pg = SWD_prepare(len(x), device=x.device)
            q = None
        else:
            q, pg = SWD_prepare(Npercentile, device=x.device)
    elif Npercentile is not None:
        q = SWD_prepare(Npercentile, device=x.device, gaussian=False)


    #initialize w. algorithm from https://arxiv.org/pdf/math-ph/0609050.pdf
    wi = torch.randn(ndim, n_component, device=x.device)
    Q, R = torch.qr(wi)
    L = torch.sign(torch.diag(R))
    w = (Q * L).T

    lr = 0.1
    down_fac = 0.5
    up_fac = 1.5
    c = 0.5

    #algorithm from http://noodle.med.yale.edu/~hdtag/notes/steifel_notes.pdf
    #note that here w = X.T
    #use backtracking line search
    w1 = w.clone()
    w.requires_grad_(True)
    if x2 == 'gaussian':
        loss = -SlicedWassersteinDistanceG(w @ x.T, pg, q, p)
    else:
        loss = -SlicedWassersteinDistance(w @ x.T, w @ x2.T, q, p)
    loss1 = loss
    for i in range(maxiter):
        GT = torch.autograd.grad(loss, w)[0]
        w.requires_grad_(False)
        WT = w.T @ GT - GT.T @ w
        e = - w @ WT #dw/dlr
        m = torch.sum(GT * e) #dloss/dlr

        lr /= down_fac
        while loss1 > loss + c*m*lr:
            lr *= down_fac
            if 2*n_component < ndim:
                UT = torch.cat((GT, w), dim=0).double()
                V = torch.cat((w.T, -GT.T), dim=1).double()
                w1 = (w.double() - lr * w.double() @ V @ torch.pinverse(torch.eye(2*n_component, dtype=torch.double, device=x.device)+lr/2*UT@V) @ UT).to(torch.get_default_dtype())
            else:
                w1 = (w.double() @ (torch.eye(ndim, dtype=torch.double, device=x.device)-lr/2*WT.double()) @ torch.pinverse(torch.eye(ndim, dtype=torch.double, device=x.device)+lr/2*WT.double())).to(torch.get_default_dtype())

            w1.requires_grad_(True)
            if x2 == 'gaussian':
                loss1 = -SlicedWassersteinDistanceG(w1 @ x.T, pg, q, p)
            else:
                loss1 = -SlicedWassersteinDistance(w1 @ x.T, w1 @ x2.T, q, p)

        if torch.max(torch.abs(w1-w)) < eps:
            w = w1
            break

        lr *= up_fac
        loss = loss1
        w = w1
    if x2 == 'gaussian':
        WD = SlicedWassersteinDistanceG(w @ x.T, pg, q, p, perdim=False)
    else:
        WD = SlicedWassersteinDistance(w @ x.T, w @ x2.T, q, p, perdim=False)
    return w.T, WD**(1/p)

In [None]:
def ICA_Wasserstein(x):
    
    A, WD = maxSWDdirection(torch.tensor(x, dtype=torch.float32))
        
    return A.detach().numpy()

In [None]:
#load the data
...

In [None]:
#Whiten the data
...

In [None]:
#ICA
...

In [None]:
#Recover S
...

Plot the original photos.

In [None]:
...