In [1]:
import pickle, copy, warnings, os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn import svm
from itertools import chain
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score
from sklearn.model_selection import KFold
warnings.filterwarnings('ignore')
from skfeature.utility.construct_W import construct_W
from scipy.sparse import diags

**Merge all subject band wise data**

In [2]:
subject_names = ['s01', 's02', 's03', 's04', 's05', 's06', 's07', 's08', 's09', 's10', 's11', 's12', 
                 's13', 's14', 's15', 's16', 's17', 's18', 's19', 's20', 's21',
                 's22', 's23', 's24', 's25', 's26', 's27', 's28', 's29', 's30', 's31', 's32']
eeg_channels = np.array(['Fp1', 'AF3', 'F3', 'F7', 'FC5', 'FC1', 'C3', 'T7', 'CP5', 'CP1', 'P3', 
                         'P7', 'PO3', 'O1', 'Oz', 'Pz', 'Fp2', 'AF4', 'Fz', 'F4', 'F8', 'FC6', 
                         'FC2', 'Cz', 'C4', 'T8', 'CP6', 'CP2', 'P4', 'P8', 'PO4', 'O2'])
class_labels = ['valence', 'arousal', 'all']
# deap dataset path
deap_dataset_path = '/Users/shyammarjit/Desktop/Brain Computer Interface/Deap Dataset/'
# put the path location of datfiles folder s.t. subject wise folder should contain datafiles
#datafiles_path = '/Users/shyammarjit/Desktop/Brain Computer Interface/Hybrid Sequential Forward channel selection (HSFCS)/Subject Dependent/band_48_fir_None_one/'
# testing
datafiles_path = '/Users/shyammarjit/Desktop/Brain Computer Interface/Hybrid Sequential Forward channel selection (HSFCS)/Subject Dependent/band_48_fir_None_two/'

In [3]:
# mergeing all band csv files
def get_band_data(band_name):
    band_paths = []
    for subject_name in subject_names:
        common_path = '/Users/shyammarjit/Desktop/Brain Computer Interface/Hybrid Sequential Forward channel selection (HSFCS)/Subject Dependent/band_48_fir_None_two/'
        if(band_name == ''):
            band_path =  common_path + subject_name + '/rawfiles/' + subject_name + '.csv'
        else:
            band_path =  common_path + subject_name + '/rawfiles/' + subject_name + '_' + band_name + '.csv'
        band_paths.append(band_path)
    band_df = pd.concat(map(pd.read_csv, band_paths), ignore_index = True)
    return band_df

In [4]:
data_theta = get_band_data('theta')
data_alpha = get_band_data('alpha')
data_beta = get_band_data('beta')
data_gamma = get_band_data('gamma')
all_band = get_band_data('')

**Get the label data for all users**

In [5]:
# deap dataset path
deap_dataset_path = '/Users/shyammarjit/Desktop/Brain Computer Interface/Deap Dataset/'

def emotion_label(labels, class_label):
    em_labels = []
    if(class_label == 'valence'):
        for i in range(0, labels.shape[0]):
            if (labels[i][0]>5): # high valence
                em_labels.append(1)
            else: # low valence
                em_labels.append(0)
        return em_labels
    elif(class_label == 'arousal'):
        for i in range(0, labels.shape[0]):
            if (labels[i][1]>5): # high arousal
                em_labels.append(1)
            else: # low arousal
                em_labels.append(0)
        return em_labels
    elif(class_label == 'all'):
        for i in range(0, labels.shape[0]):
            if (labels[i][0]>5): # high valence
                if(labels[i][1]>5): # high arousal
                    em_labels.append(1) # HVHA
                else:
                    em_labels.append(0) # HVLA
            else: # low valence
                if(labels[i][1]>5): # high arousal
                    em_labels.append(2) # LVHA
                else: # low arousal
                    em_labels.append(3) # LVLA
        return em_labels

def get_label_data(label_name):
    label_data, label_2d = [], []
    for subject in subject_names:
        with open(deap_dataset_path + subject + '.dat', 'rb') as f:
            raw_data = pickle.load(f, encoding = 'latin1')
        labels = raw_data['labels']
        label_2d.append(emotion_label(labels, label_name))
    label_data = list(chain.from_iterable(label_2d))
    label_data = pd.DataFrame(np.array([label_data]).T, index = None)
    return label_data

In [6]:
valence_data = get_label_data('valence')
arousal_data = get_label_data('arousal')
all_data = get_label_data('all')
valence_data, arousal_data, all_data = np.array(valence_data), np.array(arousal_data), np.array(all_data)
valence_data, arousal_data, all_data = valence_data.flatten(), arousal_data.flatten(), all_data.flatten()

# Fisher Score

In [7]:
def fisher_score(X, y):
    """
    This function implements the fisher score feature selection, steps are as follows:
    1. Construct the affinity matrix W in fisher score way
    2. For the r-th feature, we define fr = X(:,r), D = diag(W*ones), ones = [1,...,1]', L = D - W
    3. Let fr_hat = fr - (fr'*D*ones)*ones/(ones'*D*ones)
    4. Fisher score for the r-th feature is score = (fr_hat'*D*fr_hat)/(fr_hat'*L*fr_hat)-1
    Input
    -----
    X: {numpy array}, shape (n_samples, n_features)
        input data
    y: {numpy array}, shape (n_samples,)
        input class labels
    Output
    ------
    score: {numpy array}, shape (n_features,)
        fisher score for each feature
    Reference
    ---------
    He, Xiaofei et al. "Laplacian Score for Feature Selection." NIPS 2005.
    Duda, Richard et al. "Pattern classification." John Wiley & Sons, 2012.
    """
    # Construct weight matrix W in a fisherScore way
    kwargs = {"neighbor_mode": "supervised", "fisher_score": True, 'y': y}
    W = construct_W(X, **kwargs)

    # build the diagonal D matrix from affinity matrix W
    D = np.array(W.sum(axis=1))
    L = W
    tmp = np.dot(np.transpose(D), X)
    D = diags(np.transpose(D), [0])
    Xt = np.transpose(X)
    t1 = np.transpose(np.dot(Xt, D.todense()))
    t2 = np.transpose(np.dot(Xt, L.todense()))
    # compute the numerator of Lr
    D_prime = np.sum(np.multiply(t1, X), 0) - np.multiply(tmp, tmp)/D.sum()
    # compute the denominator of Lr
    L_prime = np.sum(np.multiply(t2, X), 0) - np.multiply(tmp, tmp)/D.sum()
    # avoid the denominator of Lr to be 0
    D_prime[D_prime < 1e-12] = 10000
    lap_score = 1 - np.array(np.multiply(L_prime, 1/D_prime))[0, :]

    # compute fisher score from laplacian score, where fisher_score = 1/lap_score - 1
    score = 1.0/lap_score - 1
    return np.transpose(score)

In [8]:
def get_fisher_score(em_labels):
    
    X_theta, X_alpha = np.array(data_theta), np.array(data_alpha)
    X_beta, X_gamma = np.array(data_beta), np.array(data_gamma)
    y = np.array(em_labels)
    # apply scalling of the given data
    scaler = MinMaxScaler()
    scaler.fit(X_theta)
    X_theta = scaler.transform(X_theta)
    scaler.fit(X_alpha)
    X_alpha = scaler.transform(X_alpha)
    scaler.fit(X_beta)
    X_beta = scaler.transform(X_beta)
    scaler.fit(X_gamma)
    X_gamma = scaler.transform(X_gamma)
    
    fscore_theta, fscore_alpha = fisher_score(X_theta, y), fisher_score(X_alpha, y)
    fscore_beta, fscore_gamma = fisher_score(X_beta, y), fisher_score(X_gamma, y)
    
    # Total Avearge F-Score (Theta, Alpha, Beta, Gamma)
    final_f_score = (fscore_theta + fscore_alpha + fscore_beta + fscore_gamma)/4
    fvalues = pd.Series(final_f_score)
    fvalues.index = eeg_channels
    fvalues.sort_values(ascending = False)
    fvalues.to_csv('fscore_final.csv')
    # for visualization run the below code
    # fvalues.sort_values(ascending = False).plot.bar(figsize=(10,8))
    df = fvalues.sort_values(ascending = False)
    da = pd.DataFrame(df)
    da.to_csv('channel_rank.csv')
    cr = pd.read_csv('channel_rank.csv')
    sort_channel_name = list(cr['Unnamed: 0'])
    os.remove('channel_rank.csv') # delete the csv file
    return sort_channel_name

# Classification

In [9]:
#Loading the dataset
def svmclassifier(channel_name, data):
    channel_names = []
    for i in range(0, len(channel_name)):
        draft = channel_name[i]
        channel_names.append(draft + "_alpha")
        channel_names.append(draft + "_beta")
        channel_names.append(draft + "_gamma")
        channel_names.append(draft + "_theta")
    x, y = data[channel_names], np.array(em_labels)
    # Implementing cross validation
    kf = KFold(n_splits = k, shuffle = False)
    acc_score = []
    for train_index , test_index in kf.split(x):
        x_train, x_test = x.iloc[train_index,:],x.iloc[test_index,:]
        y_train, y_test = y[train_index] , y[test_index]
        model = svm.SVC(kernel = 'poly')
        model.fit(x_train, y_train)
        pred_values = model.predict(x_test)
        #pred_values = model.predict(x_test)
        acc = accuracy_score(pred_values , y_test)
        acc_score.append(acc)
    avg_acc_score = sum(acc_score)/k
    return avg_acc_score

In [10]:
def growing_phase(channel_name):
    cn = channel_name[0]
    acc = svmclassifier([cn], all_band.copy())
    cn_list = []
    cn_list.append(cn)
    sort_cn = []
    for i in range(1, len(channel_name)):
        cur_cn = channel_name[i]
        cn_list.append(cur_cn)
        cur_acc = svmclassifier(cn_list, all_band.copy())
        if(cur_acc<acc):
            cn_list.remove(cur_cn)
        else:
            acc = cur_acc
    print('Accuracy in Growing Phase: ', acc)
    print('No of selected channels in Growing Phase: ', len(cn_list))
    print('Channels selected in Growing Phase: ', cn_list)
    return cn_list

# Main Drive Code

In [12]:
k = 15 # testing
em_labels = valence_data.copy()
valence_channel_name = get_fisher_score(em_labels)
optimal_valence_channel_name = growing_phase(valence_channel_name)
em_labels = arousal_data.copy()
arousal_channel_name = get_fisher_score(em_labels)
optimal_arousal_channel_name = growing_phase(arousal_channel_name)
em_labels = all_data.copy()
all_channel_name = get_fisher_score(em_labels)
optimal_all_channel_name = growing_phase(all_channel_name)

Accuracy in Growing Phase:  0.5640401276789786
No of selected channels in Growing Phase:  3
Channels selected in Growing Phase:  ['PO4', 'PO3', 'O2']


KeyboardInterrupt: 