In [1]:
import numpy as np
import mne
from numpy import dot
from numpy.linalg import matrix_rank, inv
from numpy.random import permutation
from scipy.linalg import eigh
from scipy.linalg import norm as mnorm

# Global constants
EPS = 1e-16
MAX_W = 1e8
ANNEAL = 0.9
MAX_STEP = 500
MIN_LRATE = 1e-6
W_STOP = 1e-6

def norm(x):
    """Computes the norm of a vector or the Frobenius norm of a
    matrix_rank

    """
    return mnorm(x.ravel())


class ica:

    def __init__(self, n_components=10):
        self.n_comp = n_components

    def fit(self, x2d):
        x_white, self.white, self.dewhite\
            = pca_whiten(x2d, self.n_comp)
        self.mix, self.sources, self.unmix\
            = infomax1(x_white, self.n_comp)
        return self


def diagsqrts(w):
    """
    Returns direct and inverse square root normalization matrices
    """
    Di = np.diag(1. / (np.sqrt(w) + np.finfo(float).eps))
    D = np.diag(np.sqrt(w))
    return D, Di


def pca_whiten(x2d, n_comp, verbose=True):
    """ data Whitening
    *Input
    x2d : 2d data matrix of observations by variables
    n_comp: Number of components to retain
    *Output
    Xwhite : Whitened X
    white : whitening matrix (Xwhite = np.dot(white,X))
    dewhite : dewhitening matrix (X = np.dot(dewhite,Xwhite))
    """
    x2d_demean = x2d - x2d.mean(axis=1).reshape((-1, 1))
    NSUB, NVOX = x2d_demean.shape
    if NSUB > NVOX:
        cov = dot(x2d_demean.T, x2d_demean) / (NSUB - 1)
        w, v = eigh(cov, eigvals=(NVOX - n_comp, NVOX - 1))
        D, Di = diagsqrts(w)
        u = dot(dot(x2d_demean, v), Di)
        x_white = v.T
        white = dot(Di, u.T)
        dewhite = dot(u, D)
    else:
        cov = dot(x2d_demean, x2d_demean.T) / (NVOX - 1)
        w, u = eigh(cov, eigvals=(NSUB - n_comp, NSUB - 1))
        D, Di = diagsqrts(w)
        white = dot(Di, u.T)
        x_white = dot(white, x2d_demean)
        dewhite = dot(u, D)
    return (x_white, white, dewhite)


def w_update(weights, x_white, bias1, lrate1):
    """ Update rule for infomax
    This function recieves parameters to update W1
    * Input
    W1: unmixing matrix (must be a square matrix)
    Xwhite1: whitened data
    bias1: current estimated bias
    lrate1: current learning rate
    startW1: in case update blows up it will start again from startW1
    * Output
    W1: updated mixing matrix
    bias: updated bias
    lrate1: updated learning rate
    """
    NCOMP, NVOX = x_white.shape
    block1 = int(np.floor(np.sqrt(NVOX / 3)))
    permute1 = permutation(NVOX)
    for start in range(0, NVOX, block1):
        if start + block1 < NVOX:
            tt2 = start + block1
        else:
            tt2 = NVOX
            block1 = NVOX - start

        unmixed = dot(weights, x_white[:, permute1[start:tt2]]) + bias1
        logit = 1 - (2 / (1 + np.exp(-unmixed)))
        weights = weights + lrate1 * dot(block1 * np.eye(NCOMP) +
                                         dot(logit, unmixed.T), weights)
        bias1 = bias1 + lrate1 * logit.sum(axis=1).reshape(bias1.shape)
        # Checking if W blows up
        if (np.isnan(weights)).any() or np.max(np.abs(weights)) > MAX_W:
            print("Numeric error! restarting with lower learning rate")
            lrate1 = lrate1 * ANNEAL
            weights = np.eye(NCOMP)
            bias1 = np.zeros((NCOMP, 1))
            error = 1

            if lrate1 > 1e-6 and \
               matrix_rank(x_white) < NCOMP:
                print("Data 1 is rank defficient"
                      ". I cannot compute " +
                      str(NCOMP) + " components.")
                return (None, None, None, 1)

            if lrate1 < 1e-6:
                print("Weight matrix may"
                      " not be invertible...")
                return (None, None, None, 1)
            break
        else:
            error = 0

    return(weights, bias1, lrate1, error)

# infomax1: single modality infomax


def infomax1(x_white, verbose=False):
    """Computes ICA infomax in whitened data
    Decomposes x_white as x_white=AS
    *Input
    x_white: whitened data (Use PCAwhiten)
    verbose: flag to print optimization updates
    *Output
    A : mixing matrix
    S : source matrix
    W : unmixing matrix
    """
    NCOMP = x_white.shape[0]
    # Initialization
    weights = np.eye(NCOMP)
    old_weights = np.eye(NCOMP)
    d_weigths = np.zeros(NCOMP)
    old_d_weights = np.zeros(NCOMP)
    lrate = 0.005 / np.log(NCOMP)
    bias = np.zeros((NCOMP, 1))
    change = 1
    angle_delta = 0
    if verbose:
        print("Beginning ICA training...")
    step = 1

    while step < MAX_STEP and change > W_STOP:

        (weights, bias, lrate, error) = w_update(weights, x_white, bias, lrate)

        if error != 0:
            step = 1
            error = 0
            lrate = lrate * ANNEAL
            weights = np.eye(NCOMP)
            old_weights = np.eye(NCOMP)
            d_weigths = np.zeros(NCOMP)
            old_d_weights = np.zeros(NCOMP)
            bias = np.zeros((NCOMP, 1))
        else:
            d_weigths = weights - old_weights
            change = norm(d_weigths)**2

            if step > 2:
                angle_delta = np.arccos(
                    np.sum(d_weigths * old_d_weights) /
                    (norm(d_weigths) * norm(old_d_weights) + 1e-8)
                ) * 180 / np.pi

            old_weights = np.copy(weights)

            if angle_delta > 60:
                lrate = lrate * ANNEAL
                old_d_weights = np.copy(d_weigths)
            elif step == 1:
                old_d_weights = np.copy(d_weigths)

            if verbose and change < W_STOP:
                print("Step %d: Lrate %.1e,"
                      "Wchange %.1e,"
                      "Angle %.2f" % (step, lrate,
                                      change, angle_delta))

        step = step + 1

    # A,S,W
    return (inv(weights), dot(weights, x_white), weights)

# Single modality ICA


def ica1(x_raw, ncomp, verbose=False):
    '''
    Single modality Independent Component Analysis
    '''
    if verbose:
        print("Whitening data...")
    
    x_white, _, dewhite = pca_whiten(x_raw, ncomp)
    if verbose:
        print('x_white shape: %d, %d' % x_white.shape)
        print("Done.")
    if verbose:
        print("Running INFOMAX-ICA ...")
    mixer, sources, unmixer = infomax1(x_white, verbose)
    mixer = dot(dewhite, mixer)

    scale = sources.std(axis=1).reshape((-1, 1))
    sources = sources / scale
    scale = scale.reshape((1, -1))
    mixer = mixer * scale

    if verbose:
        print("Done.")
    return (mixer, sources, unmixer)


if __name__ == "__main__":
    file_path = '/data/users2/mjafarlou1/MEG/stc/sub-01/sub-01_task-RDR_run-29_meg.stc-lh.stc'
    stc = mne.read_source_estimate(file_path)

    n_components = 200
    mixer, sources, unmixer = ica1(stc.data, n_components, verbose=True)

Whitening data...


  w, u = eigh(cov, eigvals=(NSUB - n_comp, NSUB - 1))


x_white shape: 200, 265505
Done.
Running INFOMAX-ICA ...
Beginning ICA training...


  logit = 1 - (2 / (1 + np.exp(-unmixed)))


Numeric error! restarting with lower learning rate
Numeric error! restarting with lower learning rate
Numeric error! restarting with lower learning rate
Numeric error! restarting with lower learning rate
Numeric error! restarting with lower learning rate
Numeric error! restarting with lower learning rate
Numeric error! restarting with lower learning rate
Numeric error! restarting with lower learning rate
Done.


In [2]:
sources.shape

(200, 265505)

In [3]:
unmixer.shape

(200, 200)

In [4]:
mixer.shape

(5124, 200)

In [5]:
stc.data.shape

(5124, 265505)