In [1]:
import scipy.io as spio

from __future__ import division

import numpy as np
import scipy as sp
from matplotlib import pyplot as plt
from matplotlib import ticker
import matplotlib as mpl
from os import path

from scipy.io import loadmat

from wyrm.types import Data

from wyrm import plot
#plot.beautify()
from wyrm.types import Data
from wyrm import processing as proc

DEBUG:matplotlib.backends:backend module://ipykernel.pylab.backend_inline version unknown


In [2]:
TRAIN_A = '../BCI_Comp_III_Wads_2004/data/Subject_A_Train.mat'
TRAIN_B = '../BCI_Comp_III_Wads_2004/data/Subject_B_Train.mat'

TEST_A = '../BCI_Comp_III_Wads_2004/data/Subject_A_Test.mat'
TEST_B = '../BCI_Comp_III_Wads_2004/data/Subject_B_Test.mat'

TRUE_LABELS_A = 'WQXPLZCOMRKO97YFZDEZ1DPI9NNVGRQDJCUVRMEUOOOJD2UFYPOO6J7LDGYEGOA5VHNEHBTXOO1TDOILUEE5BFAEEXAW_K4R3MRU'
TRUE_LABELS_B = 'MERMIROOMUHJPXJOHUVLEORZP3GLOO7AUFDKEFTWEOOALZOP9ROCGZET1Y19EWX65QUYU7NAK_4YCJDVDNGQXODBEV2B5EFDIDNR'

MATRIX = ['abcdef',
          'ghijkl',
          'mnopqr',
          'stuvwx',
          'yz1234',
          '56789_']

MARKER_DEF_TRAIN = {'target': ['target'], 'nontarget': ['nontarget']}
MARKER_DEF_TEST = {'flashing': ['flashing']}

SEG_IVAL = [0, 700]

In [3]:
def load_bci_data(filename):
    """Load the BCI Competition III Data Set 2.
    This method loads the data set and converts it into Wyrm's ``Data``
    format. Before you use it, you have to download the data set in
    Matlab format and unpack it. The directory with the extracted files
    must contain the ``Subject_*.mat``- and the ``eloc64.txt`` files.
    .. note::
        If you need the true labels of the test sets, you'll have to
        download them separately from
        http://bbci.de/competition/iii/results/index.html#labels
    Parameters
    ----------
    filename : str
        The path to the matlab file to load
    Returns
    -------
    cnt : continuous `Data` object
    Examples
    --------
    >>> dat = load_bcicomp3_ds2('/home/foo/data/Subject_A_Train.mat')
    """
    STIMULUS_CODE = {
        0 : "blankMatrix",
        # cols from left to right
        1 : "agmsy5",
        2 : "bhntz6",
        3 : "ciou17",
        4 : "djpv28",
        5 : "ekqw39",
        6 : "flrx4_",
        # rows from top to bottom
        7 : "abcdef",
        8 : "ghijkl",
        9 : "mnopqr",
        10: "stuvwx",
        11: "yz1234",
        12: "56789_"
        }

    # load the matlab data
    data_mat = loadmat(filename)
    # load the channel names (the same for all datasets
    eloc_file = path.sep.join([path.dirname(filename), 'eloc64.txt'])
    with open(eloc_file) as fh:
        data = fh.read()
    channels = []
    for line in data.splitlines():
        if line:
            chan = line.split()[-1]
            chan = chan.replace('.', '')
            channels.append(chan)
    # fix the channel names, some letters have the wrong capitalization
    for i, s in enumerate(channels):
        s2 = s.upper()
        s2 = s2.replace('Z', 'z')
        s2 = s2.replace('FP', 'Fp')
        channels[i] = s2
    # The signal is recorded with 64 channels, bandpass filtered
    # 0.1-60Hz and digitized at 240Hz. The format is Character Epoch x
    # Samples x Channels
    data = data_mat['Signal']
    data = data.astype('double')
    # For each sample: 1 if a row/colum was flashed, 0 otherwise
    flashing = data_mat['Flashing'].reshape(-1)
    #flashing = np.flatnonzero((np.diff(a) == 1)) + 1
    ##Creates an array where only the initial intensifications of each series appear
    tmp = []
    for i, _ in enumerate(flashing):
        if i == 0:
            tmp.append(flashing[i])
            continue
        if flashing[i] == flashing[i-1] == 1:
            tmp.append(0)
            continue
        tmp.append(flashing[i])
    flashing = np.array(tmp)
    # For each sample: 0 when no row/colum was intensified,
    # 1..6 for intensified columns, 7..12 for intensified rows
    stimulus_code = data_mat['StimulusCode'].reshape(-1)
    stimulus_code = stimulus_code[flashing == 1]
    # 0 if no row/col was intensified or the intensified did not contain
    # the target character, 1 otherwise
    stimulus_type = data_mat.get('StimulusType', np.array([])).reshape(-1)
    # The target characters
    target_chars = data_mat.get('TargetChar', np.array([])).reshape(-1)
    fs = 240
    data = data.reshape(-1, 64)
    timeaxis = np.linspace(0, data.shape[0] / fs * 1000, data.shape[0], endpoint=False)
    dat = Data(data=data, axes=[timeaxis, channels], names=['time', 'channel'], units=['ms', '#'])
    dat.fs = fs
    # preparing the markers
    target_mask = np.logical_and((flashing == 1), (stimulus_type == 1)) if len(stimulus_type) > 0 else []
    nontarget_mask = np.logical_and((flashing == 1), (stimulus_type == 0)) if len(stimulus_type) > 0 else []
    flashing = (flashing == 1)
    flashing = [[i, 'flashing'] for i in timeaxis[flashing]]
    targets = [[i, 'target'] for i in timeaxis[target_mask]]
    nontargets = [[i, 'nontarget'] for i in timeaxis[nontarget_mask]]
    dat.stimulus_code = stimulus_code[:]
    stim = []
    for i,_ in enumerate(flashing):
        stim.append([flashing[i][0], STIMULUS_CODE[stimulus_code[i]]])
    stimulus_code = stim
    #stimulus_code = zip([t for t, _ in flashing], [STIMULUS_CODE[i] for i in stimulus_code])
    #Raises error "TypeError: '<' not supported between instances of 'tuple' and 'list'" when calling sort() 
    #stimulus_code =[[t for t,_ in flashing], [STIMULUS_CODE[i] for i in stimulus_code]]
    #print(type(stimulus_code), type(flashing), type(targets), type(nontargets))
    markers = flashing[:]
    markers.extend(targets)
    markers.extend(nontargets)
    markers.extend(stimulus_code)
    markers.sort()
    dat.markers = markers[:]
    return dat


In [4]:
def preprocessing_simple(dat, MRK_DEF, *args, **kwargs):
    """Simple preprocessing that reaches 97% accuracy.
    """
    fs_n = dat.fs / 2
    b, a = proc.signal.butter(5, [10 / fs_n], btype='low')
    dat = proc.filtfilt(dat, b, a)
   
    dat = proc.subsample(dat, 20)
    epo = proc.segment_dat(dat, MRK_DEF, SEG_IVAL)
    fv = proc.create_feature_vectors(epo)
    return fv, epo

In [94]:
dat_train = load_bci_data(TRAIN_A)
dat_test = load_bci_data(TEST_A)

In [95]:
fv_train, epo_train = preprocessing_simple(dat_train, MARKER_DEF_TRAIN, SEG_IVAL)
fv_test, _ = preprocessing_simple(dat_test, MARKER_DEF_TEST, SEG_IVAL)



### LDA algorithm feature reduction

In [314]:
x = fv_train.data #data
y = fv_train.axes[0] #labels

In [304]:
# Means of each class
mu1 = np.matrix(np.mean(x[y == 0], axis = 0)) #non target
mu2 = np.matrix(np.mean(x[y == 1], axis = 0)) #target

n1 = x[y == 0].shape[0]
n2 = x[y == 1].shape[0]
N = n1 + n2

# Total mean
mu_total = (n1 * mu1 + n2 * mu2)/N

In [305]:
# Between class matrix
Sb1 = np.transpose(mu1 - mu_total) * (mu1 - mu_total) 
Sb2 = np.transpose(mu2 - mu_total) * (mu2 - mu_total) 

Sb = n1 * Sb1 + n2 * Sb2

In [306]:
# Within class matrix
d1 = np.matrix(x[y == 0]) - mu1
d2 = np.matrix(x[y == 1]) - mu2
Sw1 = np.transpose(d1) * (d1)
Sw2 = np.transpose(d2) * (d2)

Sw = Sw1 + Sw2

In [307]:
# Transformation matrix
W = np.linalg.inv(Sw) * Sb

In [308]:
# Eigenvalues and eigenvectors
lambda_, V = np.linalg.eig(W)
idxs = lambda_.argsort()
V = V[:,idxs[-5:]] # Most robust eigenvector


### LDA Classifier

In [317]:
class LDA():
    'Linear Discriminant Classifier'
    def __init__(self):
        w = np.array([])
        b = np.array([])
    def fit(self, x, y):
        """Fit LinearDiscriminantAnalysis model according
        to the given training data.'
        
        Arguments: 
        ----------
        x: array-like, shape (n_samples, n_features)
            Training data
        y: array, shape (n_samples)
            Training labels
        """
    
        # Means for each class
        mu1 = x[y == 0].mean(axis = 0)
        mu2 = x[y == 1].mean(axis = 0)
        mu = mu1 + mu2

        # Covariance matrices for each class
        cov1 = np.cov(x[y == 0], rowvar = False)
        cov2 = np.cov(x[y == 1], rowvar = False)
        covm = cov1 + cov2

        # Weight vector
        self.w = np.dot(np.linalg.pinv(covm), (mu2 - mu1))

        # Bias term
        self.b = -0.5 * np.dot(self.w.transpose(), mu)
        
    def predict_proba(self, x):
        """Estimates the probability
        Arguments: 
        ----------
        x: array-like, shape (n_samples, n_features)
            Training data
        Returns:
        --------
        y: array, shape (n_samples)
            Estimated probabilities
        """
        y = np.dot(x,self.w) + self.b
        return y

        

In [318]:
x_test = fv_test.data
clf = LDA()
clf.fit(x, y)
pred = clf.predict_proba(x_test)

### Character prediction 

In [254]:
def predict_character(pred, n_characters, labels):
    pred_target = pred
    #unscramble_idx = fv_test.stimulus_code.reshape(100, 15, 12).argsort()
    unscramble_idx = fv_test.stimulus_code.reshape(n_characters, -1, 12).argsort()
    static_idx = np.indices(unscramble_idx.shape)
    #lda_out_prob = pred.reshape(100, 15, 12)
    lda_out_prob = pred.reshape(n_characters, -1, 12)
    lda_out_prob = lda_out_prob[static_idx[0], static_idx[1], unscramble_idx]

    # destil the result of the 15 runs
    lda_out_prob = lda_out_prob.sum(axis=1)
    lda_out_prob = lda_out_prob.argsort()


    cols = lda_out_prob[lda_out_prob <= 5].reshape(n_characters, -1)
    rows = lda_out_prob[lda_out_prob > 5].reshape(n_characters, -1)
    text = ''
    for i in range(n_characters):
        row = rows[i][-1]-6
        col = cols[i][-1]
        letter = MATRIX[row][col]
        text += letter
    print()
    print('Constructed labels: %s' % text.upper())
    print('True labels       : %s' % labels)
    a = np.array(list(text.upper()))
    b = np.array(list(TRUE_LABELS_A))
    accuracy = np.count_nonzero(a == b) / len(a)
    print('Accuracy: %.1f%%' % (accuracy * 100))

In [319]:
predict_character(pred, 100, TRUE_LABELS_A)


Constructed labels: WQXPLZCOMRKOW7YFZDEZ1DPI9NNVGRPDJCUVRMEUOOOJD2UFYPOO6J7LDGYEGOA5VTNEHBWXOO1TDOILUEE5BFAFEXAW_K3R3MRU
True labels       : WQXPLZCOMRKO97YFZDEZ1DPI9NNVGRQDJCUVRMEUOOOJD2UFYPOO6J7LDGYEGOA5VHNEHBTXOO1TDOILUEE5BFAEEXAW_K4R3MRU
Accuracy: 94.0%


## Comparison Scikitlearn LDA and Mine

In [332]:
import time
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
cf = LinearDiscriminantAnalysis()
t_start = time.clock()
cf.fit(x,y)
pred1 = cf.predict_proba(x_test)
t_end = time.clock()

print('Time: {} s'.format(t_end - t_start))

Time: 3.398927415973617 s


In [333]:
import time
cf = LDA()
t_start = time.clock()
cf.fit(x,y)
pred2 = cf.predict_proba(x_test)
t_end = time.clock()

print('Time: {} s'.format(t_end - t_start))

Time: 1.089816254420981 s


In [338]:
print('\nSklearn LDA')
predict_character(pred1[:,1], 100, TRUE_LABELS_A)


Sklearn LDA

Constructed labels: WQXPLZCOMRKO97YFZDEZ1DPI9NNVGRQDJCUVRMEUOUOJD2UFYPOO6J7LDAYEGOA5VHNE9BWXOO1TDOILUEE5BFAREXAWRK4R3MRU
True labels       : WQXPLZCOMRKO97YFZDEZ1DPI9NNVGRQDJCUVRMEUOOOJD2UFYPOO6J7LDGYEGOA5VHNEHBTXOO1TDOILUEE5BFAEEXAW_K4R3MRU
Accuracy: 94.0%


In [335]:
print('\nMy LDA')
predict_character(pred2, 100, TRUE_LABELS_A)


My LDA

Constructed labels: WQXPLZCOMRKOW7YFZDEZ1DPI9NNVGRPDJCUVRMEUOOOJD2UFYPOO6J7LDGYEGOA5VTNEHBWXOO1TDOILUEE5BFAFEXAW_K3R3MRU
True labels       : WQXPLZCOMRKO97YFZDEZ1DPI9NNVGRQDJCUVRMEUOOOJD2UFYPOO6J7LDGYEGOA5VHNEHBTXOO1TDOILUEE5BFAEEXAW_K4R3MRU
Accuracy: 94.0%


In [336]:
pred2 == pred1[:,1]

array([False, False, False, ..., False, False, False])