# Emotions transfer learning

## By Rafael Pérez and Pablo Sarricolea

In [None]:
import pyedflib 
import numpy as np
import matplotlib.pyplot as plt
import libtlda
from libtlda.tca import TransferComponentClassifier
import scipy.signal as sig
import scipy.fftpack as fp
import os
import glob
import sklearn
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.multioutput import MultiOutputClassifier
from random import randrange
import random
%matplotlib inline

## Data loading

Constants needed depending on the data

In [None]:

fs=160
samples = 20000 #30 secs
directory_name='S001'

In [None]:
def loadEEG(file_name, file_labels,fs, samples):
    
    file = pyedflib.EdfReader(file_name)
    nOfSignals = file.signals_in_file
    sigbufs = np.zeros((nOfSignals,samples))
    for i in np.arange(nOfSignals):
        sigbufs[i,0:file.getNSamples()[0]]=file.readSignal(i)
    #sigbufs=sigbufs[:,0:samples]   
    x = np.arange(len(sigbufs[0]))/fs*1000
    labels = np.loadtxt(file_labels,dtype=str)
    sup = np.array(labels[:,1],dtype = float)
    sup=sup/160*1000
    labels[:,1]=sup
    labels=labels[:,(1,2,-1)]


    for i in range(len(labels[:,1])):
        if (labels[i,1]=="T0"):
            labels[i,1]=0
        elif (labels[i,1]=="T1"):
            labels[i,1]=1
        else: labels[i,1]=2
    
    
    labels = np.matrix(labels,dtype=float)
    labels[:,2]*=1000
    labels=np.matrix(labels,dtype=int)
    
    
    
    return x,sigbufs,labels

In [None]:
def loadDirectory(directory_name,fs,samples):
    fileEDF=glob.glob('S001/*.edf')
    
    for i in range(len(fileEDF)):
        fileEDF[i]="rdedfann -r "+ fileEDF[i]+" -F 160 >> "+ fileEDF[i]+".txt"
    
    file = open("S001/rdedfann.sh","w")

    file.write("#!/bin/bash\n")

    for i in fileEDF:
        file.write(i+'\n')

    file.close() 

    #move to the right directory
    #in bash: fromdos rdedfann.sh
    #sh rdedfann.sh
    fileEDF=glob.glob('S001/*.edf')
    fileLabels=glob.glob('S001/*.txt')
    patients = len(fileEDF)
    channels=64*patients
    eegMatrix =  np.zeros(shape=(64*patients,samples))
    j=0
    labelsF=np.zeros(shape=(patients,samples))

    for i in range(patients):
        
        _,eegMatrix[j:(i+1)*64,:],labels=loadEEG(fileEDF[i],fileLabels[i],fs,samples)
        index =0

        for k in range(len(labels)):
            nSamples=int(labels[k,2]/(1000/fs))
            labelsF[i,index:(index+nSamples-1)]=labels[k,1]*1
            index+=nSamples
        
        j=(i+1)*64
    return eegMatrix,labelsF
        

In [None]:

eegMatrix,labels=loadDirectory('S001',fs,samples)

print(eegMatrix.shape)
print(labels.shape)

## Preprocessing

Filtering

In [None]:
fNotch=60
fcs = fNotch/(fs/2)
nfilterb,nfiltera = sig.iirnotch(fcs,30)
fcs = 0.05 / (fs / 2.) 
b, a = sig.butter(5, fcs, 'highpass')

for i in range(896):
    eegMatrix[i,:] = sig.lfilter(nfilterb,nfiltera,eegMatrix[i,:])
    eegMatrix[i,:] = sig.filtfilt (b, a, eegMatrix[i,:])


Scaling

In [None]:
eegMatrixS=eegMatrix
scaler=sklearn.preprocessing.MinMaxScaler(feature_range=(0, 1), copy=True)
print(eegMatrixS.shape)
scaler.fit(eegMatrixS.T)
eegMatrixS=scaler.transform(eegMatrixS.T)
print(eegMatrixS.shape)

In [None]:
labels=labels.T
chunks=15
int(np.floor(samples/chunks))

## TCA approach

In [None]:
def cutInChunks(Xs,Xv,Xt,Ys,Yv,Yt) :   
    idx=0
    idx2=0
    chunkXsn=[]
    chunkXtn=[]
    chunkXvn=[]
    chunkYs=[]
    chunkYt=[]
    chunkYv=[]
    print(patTrain)
    print(Ys.shape)
    for j in range(patTrain):
        for i in (int(np.floor(samples/chunks)))*np.arange(1,chunks+1):
            chunkXsn.append(Xs[int(idx):int(i),idx2:idx2+64])
            chunkXtn.append(Xt[int(idx):int(i),idx2:idx2+64])
            chunkXvn.append(Xv[int(idx):int(i),idx2:idx2+64])
            if (np.max(Ys[int(idx):int(i),j])==1): 
                chunkYs.append(1)
            elif (np.max(Ys[int(idx):int(i),j])==2):
                chunkYs.append(2)
            idx=i
        idx=0
        idx2+=64



    idx=0
    for i in (int(np.floor(samples/chunks)))*np.arange(1,chunks+1):
        if (np.max(Yt[int(idx):int(i)])==1): 
            chunkYt.append(1)
        elif (np.max(Yt[int(idx):int(i)])==2):
            chunkYt.append(2)
        idx=i
        
        
    idx=0
    for i in (int(np.floor(samples/chunks)))*np.arange(1,chunks+1):
        if (np.max(Yv[int(idx):int(i)])==1): 
            chunkYv.append(1)
        elif (np.max(Yv[int(idx):int(i)])==2):
            chunkYv.append(2)
        idx=i

    chunkXsn=np.array(chunkXsn)
    chunkXtn=np.array(chunkXtn)
    chunkXvn=np.array(chunkXvn)
    chunkYs=np.array(chunkYs).reshape((len(chunkYs),1))
    chunkYt=np.array(chunkYt).reshape((len(chunkYt),1))
    chunkYv=np.array(chunkYt).reshape((len(chunkYv),1))
    
    print(chunkXsn.shape)
    print(chunkXtn.shape)
    print(chunkYs.shape)
    print(chunkXvn.shape)
    print(chunkYv.shape)
    
    return chunkXsn,chunkXvn,chunkXtn,chunkYs,chunkYv,chunkYt

In [None]:
def rightLeft(chunkXsn,chunkXtn,chunkYs,chunkYt):
    #ZERO PADDING
    idl,_=np.array(np.where(chunkYs==1))
    idr,_=np.array(np.where(chunkYs==2))

    chunkLefts=chunkXsn[(idl),:,:]
    chunkRights=chunkXsn[idr,:,:]

    print(chunkLefts.shape)
    print(chunkRights.shape)

    idl2,_=np.array(np.where(chunkYt==1))
    idr2,_=np.array(np.where(chunkYt==2))

    chunkLeftt = np.zeros(shape=(chunkLefts.shape))
    chunkRightt = np.zeros(shape=(chunkRights.shape))

    print(len(idr))

    chunkLeftt[0:len(idl2),:,:]=chunkXtn[(idl2),:,:]
    chunkRightt[0:len(idr2),:,:]=chunkXtn[(idr2),:,:]

    print(chunkLeftt.shape)
    print(chunkRightt.shape)

    
    #chunkRights=chunkRights.T.reshape((int(np.floor(samples/chunks)),len(idr)*channels))
    #chunkLefts=chunkLefts.T.reshape((int(np.floor(samples/chunks)),len(idl)*channels))
    
    chunkAuxR = chunkRights[0,:,:]
    chunkAuxL = chunkLefts[0,:,:]
    
    for i in np.arange(1,len(idr)):
        chunkAuxR = np.concatenate((chunkAuxR,chunkRights[i,:,:]),axis=1)
    
    for i in np.arange(1,len(idl)):
        chunkAuxL = np.concatenate((chunkAuxL,chunkLefts[i,:,:]),axis=1)
        
    chunkRights = chunkAuxR
    chunkLefts = chunkAuxL
    
    chunkAuxR = chunkRightt[0,:,:]
    chunkAuxL = chunkLeftt[0,:,:]
    
    for i in np.arange(1,len(idr2)):
        chunkAuxR = np.concatenate((chunkAuxR,chunkRightt[i,:,:]),axis=1)
    
    for i in np.arange(1,len(idl2)):
        chunkAuxL = np.concatenate((chunkAuxL,chunkRightt[i,:,:]),axis=1)
    
    chunkRightt = chunkAuxR
    chunkLeftt = chunkAuxL
    
    #chunkRightt=chunkRightt.T.reshape((int(np.floor(samples/chunks)),len(idr)*channels))
    #chunkLeftt=chunkLeftt.T.reshape((int(np.floor(samples/chunks)),len(idl)*channels))

    print(chunkLefts.shape)
    print(chunkLeftt.shape)
    
    return chunkRights,chunkLefts,chunkRightt,chunkLeftt


In [None]:
def padding(chunkRightt,chunkLeftt,chunkRights,chunkLefts,pad=1):
    if (pad==0):
        chunkAux = np.ones(shape=chunkRights.shape)*np.mean(chunkRightt)
        chunkAux[:,0:len(chunkRightt[0])]=chunkRightt
        chunkRightt=chunkAux
        chunkAux = np.ones(shape=chunkLefts.shape)*np.mean(chunkLeftt)
        chunkAux[:,0:len(chunkLeftt[0])]=chunkLeftt
        plt.plot(chunkAux[:,0])
        plt.plot(chunkLeftt[:,0])
        chunkLeftt=chunkAux
    elif (pad == 1):
        chunkAux = np.zeros(shape=chunkRights.shape)
        chunkAux[:,0:len(chunkRightt[0])]=chunkRightt
        chunkRightt=chunkAux
        chunkAux = np.zeros(shape=chunkLefts.shape)
        chunkAux[:,0:len(chunkLeftt[0])]=chunkLeftt
        plt.plot(chunkAux[:,0])
        plt.plot(chunkLeftt[:,0])
        chunkLeftt=chunkAux
    return chunkRightt, chunkLeftt

In [None]:
def PCselection(Xs,sig=0.81):
    comp=0
    data_rescaled=Xs.T
    data_mean = np.mean(data_rescaled)
    print("mean")
    print(data_mean)
    data_center = data_rescaled - data_mean
    cov_matrix = np.cov(data_center)
    eigenval, eigenvec = np.linalg.eig(cov_matrix)
    significance = [np.abs(i)/np.sum(eigenval) for i in eigenval]
    #Plotting the Cumulative Summation of the Explained Variance
    plt.figure(figsize=(15,5))
    plt.plot(np.cumsum(significance))
    plt.xlabel('Number of Components')
    plt.ylabel('Variance (%)') #for each component
    plt.title('Pulsar Dataset Explained Variance')

    count=0
    for i in range(len(significance)):
        count += significance[i]
        if count > sig:
            print(significance[i])
            comp=i
            break
    plt.axvline(i,color='r')

    plt.show()
    return comp



In [None]:
def TCA (chunkS,chunkT,num_components,mu=1):
    clf = TransferComponentClassifier(loss='logistic', mu=1,num_components=num_components)
    C,K=clf.transfer_component_analysis(chunkS,chunkT)
    #remaping
    Xsn = np.dot(K[:int(np.floor(samples/chunks)), :], C)
    XsXt=np.concatenate((chunkS,chunkT),axis=0)
    K2 = clf.kernel(chunkT, XsXt  , type=clf.kernel_type,bandwidth=clf.bandwidth, order=clf.order)
    Xtn = np.dot(K2, C)
    
    return Xsn,Xtn



In [None]:
def scaleAfter(Xsnr,Xsnl,Xtnr,Xtnl):
    scaler1=sklearn.preprocessing.MinMaxScaler(feature_range=(0, 1), copy=True)
    scaler1.fit(Xsnr)
    XsnrSc=scaler1.transform(Xsnr)
    Xsnr = XsnrSc

    scaler1=sklearn.preprocessing.MinMaxScaler(feature_range=(0, 1), copy=True)
    scaler1.fit(Xsnl)
    XsnlSc=scaler1.transform(Xsnl)
    Xsnl = XsnlSc

    scaler1=sklearn.preprocessing.MinMaxScaler(feature_range=(0, 1), copy=True)
    scaler1.fit(Xtnr)
    XtnrSc=scaler1.transform(Xtnr)
    Xtnr = XtnrSc

    scaler1=sklearn.preprocessing.MinMaxScaler(feature_range=(0, 1), copy=True)
    scaler1.fit(Xtnl)
    XtnlSc=scaler1.transform(Xtnl)
    Xtnl = XtnlSc
    
    return Xsnr,Xsnl,Xtnr,Xtnl

In [None]:
def getScores(Xsnr,Xsnl,Xtnr,Xtnl,Xvnr,Xvnl,num_compr,num_compl):
    XsnrY=np.concatenate((Xsnr,np.ones(shape=(num_compr,1)).T))
    XsnlY=np.concatenate((Xsnl,np.zeros(shape=(num_compl,1)).T))
    XsrlY=np.concatenate((XsnlY,XsnrY),axis=1)
    print(XsrlY.shape)

    XtnrY=np.concatenate((Xtnr,np.ones(shape=(num_compr,1)).T))
    XtnlY=np.concatenate((Xtnl,np.zeros(shape=(num_compl,1)).T))
    XtrlY=np.concatenate((XtnlY,XtnrY),axis=1)
    print(XsrlY.shape)
    
    XvnrY=np.concatenate((Xvnr,np.ones(shape=(num_compr,1)).T))
    XvnlY=np.concatenate((Xvnl,np.zeros(shape=(num_compl,1)).T))
    XvrlY=np.concatenate((XvnlY,XvnrY),axis=1)
    print(XvrlY.shape)
    
    
    np.random.shuffle(XsrlY.T)
    np.random.shuffle(XtrlY.T)
    np.random.shuffle(XvrlY.T)
    
    scores=[]
    scoreR=[]
    scoreL=[]

    y = XsrlY[-1,:].reshape(num_compl+num_compr)

    for i in range(100):
        knn = KNeighborsClassifier(n_neighbors=i+1)
        knn.fit(XsrlY[0:-1,:].T,y)
        #rscore=knn.score(Xtnr.T,np.ones(num_compr))
        #lscore=knn.score(Xtnl.T,np.zeros(num_compl))
        #scores.append((rscore*num_compr+lscore*num_compl)/(num_compr+num_compl))
        #scoreR.append(rscore)
        #scoreL.append(lscore)
        scores.append(knn.score(XvrlY[0:-1,:].T, XvrlY[-1,:].reshape(num_compl+num_compr)))

    maxim = np.argmax(scores)+1
    knn = KNeighborsClassifier(maxim)
    knn.fit(XsrlY[0:-1,:].T,y)
    print(XtrlY[0:-1,:].T.shape)
    print(XtrlY[-1,:].reshape(num_compl+num_compr))
                              
    testScore=knn.score(XtrlY[0:-1,:].T, XtrlY[-1,:].reshape(num_compl+num_compr))

    print(testScore)
    return testScore


In [None]:
def scoresNormal(chunkRights,chunkLefts,chunkRightt,chunkLeftt):

    ones=np.ones((1,len(chunkRights[0])))
    chunkTrainR=np.concatenate((chunkRights,ones),axis=0)


    ones=np.ones((1,len(chunkRightt[0])))
    chunkTestR=np.concatenate((chunkRightt,ones),axis=0)


    zeros=np.zeros((1,len(chunkLefts[0])))
    chunkTrainL=np.concatenate((chunkLefts,zeros),axis=0)

    zeros=np.zeros((1,len(chunkLeftt[0])))
    chunkTestL=np.concatenate((chunkLeftt,zeros),axis=0)
    chunkTrain = np.concatenate((chunkTrainL,chunkTrainR), axis=1)
    chunkTest = np.concatenate((chunkTestL,chunkTestR), axis=1)
    
    np.random.shuffle(chunkTrain.T)
    np.random.shuffle(chunkTest.T)
    scores=[]
    for i in np.arange(5,15):
        knn = KNeighborsClassifier(n_neighbors=i)
        knn.fit(chunkTrain[0:-1,:].T,chunkTrain[-1,:])
        scores.append(knn.score(chunkTest[0:-1,:].T,chunkTest[-1,:]))
    print(np.max(scores))
    return float(np.max(scores))
    

In [None]:
def fold (Xs,Xv,Xt,Ys,Yv,Yt):
    chunkXsn,chunkXvn,chunkXtn,chunkYs,chunkYv,chunkYt=cutInChunks(Xs,Xv,Xt,Ys,Yv,Yt)
    chunkRights,chunkLefts,chunkRightt,chunkLeftt=rightLeft(chunkXsn,chunkXtn,chunkYs,chunkYt)
    _,_,chunkRightv,chunkLeftv = rightLeft(chunkXsn,chunkXvn,chunkYs,chunkYv)
    
    chunkRighttp, chunkLefttp=padding(chunkRightt,chunkLeftt,chunkRights,chunkLefts,pad=0)
    chunkRightvp, chunkLeftvp=padding(chunkRightv,chunkLeftv,chunkRights,chunkLefts,pad=0)
    
    num_compr=PCselection(chunkRights)
    num_compl=PCselection(chunkLefts)
    
    Xsnr, Xtnr = TCA(chunkRights,chunkRighttp,num_components=num_compr)
    Xsnl, Xtnl = TCA(chunkLefts,chunkLefttp,num_components=num_compl)
    
    _,Xvnr=TCA(chunkRights,chunkRightvp,num_components=num_compr)
    _,Xvnl=TCA(chunkRights,chunkRightvp,num_components=num_compl)
    
    #Xsnr,Xsnl,Xtnr,Xtnl= scaleAfter(Xsnr,Xsnl,Xtnr,Xtnl)
    print("TCA completed")
    scoreTCA =getScores(Xsnr,Xsnl,Xtnr,Xtnl,Xvnr,Xvnl,num_compr,num_compl)
    scoreN=scoresNormal(chunkRights,chunkLefts,chunkRightt,chunkLeftt)
    print("fold")
    return  scoreTCA,scoreN

In [None]:

channels = 64
patTrain = 12
patVal=1
patTest=1


vector = np.arange(0,896)
vector = list(vector)
vectorY = np.arange(0,14)
scoresTCA=[]
scoresN=[]
for i in np.arange(0,13):  
    isin = np.logical_not(np.isin(vector,vector[(patTrain-i)*channels:(patTrain+patVal+patTest-i)*channels]))
    indx = np.array(np.where(isin==True))
    indx=indx.reshape((len(indx[0])))


    Xs = eegMatrixS[:,indx]
    
    #Xv = eegMatrixS[:,patTrain*channels:(patTrain+patVal)*channels]
    Xv = np.zeros((Xs.shape))
    Xv[:,0:patVal*channels] = eegMatrixS[:,(patTrain-i)*channels:(patTrain+patVal-i)*channels]

    Xt = np.zeros((Xs.shape))
    Xt[:,0:patTest*channels] = eegMatrixS[:,(patTrain+patVal-i)*channels:(patTrain+patVal+patTest-i)*channels]

    Yv = labels[:,patTrain-i:patTrain+patVal-i]

    Yt = labels[:,patTrain+patVal-i:patTrain+patVal+patTest-i]
    
    isinY = np.logical_not(np.isin(vectorY,vectorY[(patTrain-i):(patTrain+patVal+patTest-i)]))

    indxY = np.array(np.where(isinY==True))
    indxY = indxY.reshape(patTrain)

    Ys = labels[:,indxY]
    scoreTCA,scoreNormal=fold(Xs,Xv,Xt,Ys,Yv,Yt)
    scoresTCA.append(scoreTCA)
    scoresN.append(scoreNormal)


In [None]:
FIGURE_COUNTER=0
SMALL_SIZE = 8
MEDIUM_SIZE = 10
BIGGER_SIZE = 13

plt.rc('font', size=MEDIUM_SIZE)        
plt.rc('axes', titlesize=MEDIUM_SIZE)     
plt.rc('axes', labelsize=MEDIUM_SIZE)    
plt.rc('xtick', labelsize=MEDIUM_SIZE)    
plt.rc('ytick', labelsize=MEDIUM_SIZE)    
plt.rc('legend', fontsize=SMALL_SIZE)    
plt.rc('figure', titlesize=MEDIUM_SIZE) 

In [None]:
plt.plot(scoresTCA)
plt.plot(np.ones(13)*np.mean(scoresTCA))
plt.plot(scoresN)
plt.plot(np.ones(13)*np.mean(scoresN))
plt.title("With TCA vs without TCA ")
plt.legend(['accuracy after TCA',"accuracy without TCA","mean accuracy with TCA", "mean accuracy without TCA" ],loc="upper left")

plt.xlabel('Epoch');
plt.ylabel('Accuracy');