In [58]:
import numpy as np
from mne.decoding import CSP
from scipy import stats
from scipy.signal import filtfilt, iirdesign
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.svm import SVC
from scipy.signal import resample
from scipy.stats import zscore
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from matplotlib.ticker import MultipleLocator
from sklearn.model_selection import cross_val_score

cm_bright = ListedColormap(["#FF0000", "#0000FF"])
majorLocator = MultipleLocator(2)
# NLRUD
# LR LU LD RU RD UD
LEFT = 0
RIGHT = 1
UP = 2
DOWN = 3
start_times_list = [[6, 12], [6, 18], [6, 24], [12, 18], [12, 24], [18, 24]]
labels_list = [[LEFT, RIGHT],[LEFT, UP],[LEFT, DOWN],[RIGHT, UP],[RIGHT, DOWN],[UP, DOWN]]
channel_names = ["C3", "C4", "Cz", "Pz", "Fz"]
channel_index = [4, 8, 7, 6, 5]
# Old data
# channel_index = [1,3,2]
# start_times_list = [[6,12]]
# labels_list = [[LEFT, RIGHT]]
# clf_list = []


def create_fbcsp(low_freq, n_filters=12, band_overlap=2,
                 band_width=4, n_csp_components=3, n_jobs=1):
    pipeline_list = []
    step = band_width - band_overlap
    bands = range(low_freq, low_freq + n_filters * step, step)
    for low in bands:
        pipeline_list.append(
            ("pipe%d" % low, Pipeline([
                ("filter", BandPassFilter(low, low + band_width)),
                ("csp", CSP(n_components=n_csp_components))
            ]))
        )
    return FeatureUnion(pipeline_list, n_jobs=n_jobs)
class BandPassFilter(BaseEstimator, TransformerMixin):
    def __init__(self, low_freq=7., high_freq=30., gpass=0.5, gstop=10.,
                 sfreq=250., ftype='cheby2'):
        nyquist = sfreq / 2.
        wp = [low_freq / nyquist, high_freq / nyquist]
        ws = [(low_freq - 0.5) / nyquist, (high_freq + 0.5) / nyquist]
        self.b, self.a = iirdesign(wp, ws, gpass, gstop, ftype=ftype)
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        return filtfilt(self.b, self.a, X)
class FeatureSelector(BaseEstimator, TransformerMixin):
    def __init__(self, features):
        self.features = features
    def fit(self, X, y=None):
        return self
    # Update to automatically select features
    def transform(self, X, y=None):
        return X[:, self.features]
    
def get_trials(eeg,start_times, pause_len, num_class, labels, start_timestamp, sfreq, trial_len, n_epochs, n_channels):
    '''
        total_trial_duraion = (trial_len+pause_len) * num_class
        start_times: start time for the classes to classify
    '''
    t = 1/sfreq
    n_time_samples = trial_len * sfreq
    direction = 2
    X = np.zeros((n_epochs, n_channels, n_time_samples))
    y = []
    i = 0
    total_trial_duraion = (trial_len+pause_len) * num_class

    for epoch in range(0,n_epochs,2):
        class0_start_sample = (epoch*total_trial_duraion + start_times[0]) * sfreq
        class0_end_sample = class0_start_sample + trial_len*sfreq
        class1_start_sample = (epoch*total_trial_duraion + start_times[1]) * sfreq
        class1_end_sample = class1_start_sample + trial_len*sfreq
        X[epoch,] = eeg[:,class0_start_sample:class0_end_sample]
        X[epoch+1,] = eeg[:,class1_start_sample:class1_end_sample]
        y.append(labels[0])
        y.append(labels[1])
    return X, np.array(y)
def plot_features(X, y, bands, band_width=4):
    n_feats = X.shape[1]
    fig, axarr = plt.subplots(3, int(n_feats/6), sharex=True, sharey=True, figsize=(15,7))
    flat = axarr.reshape(int(n_feats/2))
    
    for ax, low, i in zip(flat, bands, range(0, n_feats, 2)):
        ax.scatter(X[:,i], X[:,i+1], c=y, edgecolor="", alpha=0.6, cmap=cm_bright) # PLot the CSP features f1,f2 for X
        ax.set_title("%d - %d Hz" % (low, low + band_width))
        ax.xaxis.set_major_locator(majorLocator)
        ax.yaxis.set_major_locator(majorLocator)
    
    fig.suptitle("CSP features", fontsize=15)




# main function
# for index, start_times in enumerate(start_times_list):
#     signal = np.loadtxt("OpenBCI-RAW-2020-03-04_20-12-23.txt", delimiter=",", skiprows=6, usecols=channel_index) #(n_time_samples,n_channels)
#     signal2 = np.loadtxt("OpenBCI-RAW-2020-03-04_20-43-28.txt", delimiter=",", skiprows=6, usecols=channel_index) #(n_time_samples,n_channels)
#     # Signal = [T samples, channels]
#     # get_trials(eeg,start_times, pause_len, num_class, labels, start_timestamp, sfreq, trial_len, n_epochs, n_channels)
#     X1, y1 = get_trials(signal.T, start_times, 2, 5, labels_list[index], "5:15:32",256, 4, 10, 5)
#     X2, y2 = get_trials(signal2.T, start_times, 2, 5, labels_list[index], "5:15:32",256, 4, 10, 5)
#     print(X1.shape,X2.shape)
#     X = np.concatenate((X1,X2),axis=0)
#     y = np.concatenate((y1,y2),axis=0)
#     X_train, X_test, y_train, y_test = train_test_split(X, y,test_size=0.3, random_state=7, stratify=y)
#     # build model
#     fbcsp = create_fbcsp(6, n_jobs=2)               
#     svc = SVC(kernel="linear")
#     clf = Pipeline([
#         ("fbcsp", fbcsp),
#         ("fselector", FeatureSelector(features=[6, 7])),
#         ("classifier", svc)
#     ])
#     # train, score, and save model
#     clf.fit(X_train, y_train)
#     score = clf.score(X_test, y_test)
#     results = clf.predict(X_test)
#     print(results)
#     clf_list.append(clf)
#     print(score)
    
signal = np.loadtxt("OpenBCI-RAW-2020-03-04_20-12-23.txt", delimiter=",", skiprows=6, usecols=channel_index) #(n_time_samples,n_channels)    
X, y = get_trials(signal.T, [6,12], 2, 5, [LEFT, RIGHT], "5:15:32",256, 4, 10, 5)
X = np.expand_dims(X[0],axis=0)
print(X.shape)
results_arr = np.zeros(4)

for clf in clf_list:
    result = clf.predict(X)
    print(result)
    results_arr[result]+=1

print(np.argmax(results_arr))

(1, 5, 1024)
[0]
[0]
[0]
[1]
[1]
[3]
0
