In [1]:
import numpy as np
import scipy.io as sio
from scipy import signal
from scipy.signal import butter, lfilter, iirnotch
from brainflow.board_shim import BoardShim, BrainFlowInputParams, BoardIds
import matplotlib.pyplot as plt
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.model_selection import cross_val_score, StratifiedKFold

In [10]:
def bandpass_filter(data, low, high, fs, order=4):
    nyq = 0.5 * fs
    low /= nyq
    high /= nyq
    b, a = butter(order, [low, high], btype='band')
    return lfilter(b, a, data, axis=-1)

In [11]:
def csp(X, y, n_filters=2):
    """
    Compute CSP filters.
    X: ndarray, shape (trials, channels, samples)
    y: ndarray, shape (trials,)
    """
    classes = np.unique(y)
    covs = []


    # Compute class covariance matrices
    for c in classes:
        Xc = X[y == c]
        cov = np.mean([np.cov(trial) for trial in Xc], axis=0)
        covs.append(cov)


    # Solve generalized eigenvalue problem
    R = covs[0] + covs[1]
    eigvals, eigvecs = np.linalg.eig(np.linalg.pinv(R) @ covs[0])


    # Sort eigenvalues
    ix = np.argsort(eigvals)[::-1]
    eigvecs = eigvecs[:, ix]


    # Select filters
    filters = np.hstack([eigvecs[:, :n_filters], eigvecs[:, -n_filters:]])
    return filters.real

In [12]:
def fbcsp(X, y, fs, bands=[(8, 12), (12, 16), (16, 22), (22, 30)], n_filters=2):
    """
    X: ndarray, shape (trials, channels, samples)
    y: ndarray, labels
    fs: sampling rate
    """
    features = []
    filters_per_band = []


    for band in bands:
        # Bandpass filter
        Xf = np.array([bandpass_filter(trial, band[0], band[1], fs) for trial in X])

        # Compute CSP filters
        W = csp(Xf, y, n_filters=n_filters)
        filters_per_band.append(W)


        # Project and log-variance features
        Xc = np.array([W.T @ trial for trial in Xf])
        var = np.var(Xc, axis=2)
        var /= var.sum(axis=1, keepdims=True)
        feat = np.log(var)
        features.append(feat)

    # Concatenate across bands
    features = np.concatenate(features, axis=1)
    return features, filters_per_band

In [13]:
debug = False

# Setting up the board
if debug:
    # Create a synthetic board for debugging purposes
    board_id = BoardIds.SYNTHETIC_BOARD
    params = BrainFlowInputParams()
else:
    # Setting up the board
    params = BrainFlowInputParams()
    params.serial_number = 'UN-2023.02.30'
    board_id = BoardIds.UNICORN_BOARD
    
board = BoardShim(board_id, params)

# Getting specific board details
channels = board.get_eeg_channels(board_id) #EEG Channels
timestamp_channel = board.get_timestamp_channel(board_id) # Timestamp channel
marker_channel = board.get_marker_channel(board_id) # Marker channel for synchronization
sampling_rate = BoardShim.get_sampling_rate(board_id) # Hz

In [25]:
dataset = np.loadtxt(r"..//dataset1.csv", delimiter=",")

In [15]:
eegdata = dataset[300:, channels]
eegdata.shape

(175675, 8)

In [16]:
markers = dataset[300:, marker_channel]

In [17]:
temp_idx = np.arange(0, len(markers))
temp_idx = temp_idx[markers != 0]
temp_idx = temp_idx[markers[temp_idx] != 67]



eeg_samples = []
labels = []

for i in range(len(temp_idx)):
    temp_eeg = eegdata[temp_idx[i]:temp_idx[i]+4*sampling_rate, :]
    
    eeg_samples.append(temp_eeg)
    labels.append(markers[temp_idx[i]])

eeg_samples = np.array(eeg_samples)
labels = np.array(labels)-1

In [18]:
labels

array([0., 0., 1., 1., 1., 0., 0., 1., 0., 1., 0., 1., 1., 1., 0., 0., 0.,
       1., 0., 1., 0., 1., 0., 1., 0., 1., 1., 1., 0., 0., 1., 1., 1., 0.,
       1., 0., 1., 0., 0., 0., 0., 0., 1., 1., 0., 0., 1., 1., 0., 1., 1.,
       1., 1., 0., 0., 0., 1., 1., 0., 0.])

In [26]:
print(labels.shape)
print(eeg_samples.shape)
eeg_samples = np.transpose(eeg_samples, (0, 2, 1))

(60,)
(60, 1000, 8)


In [27]:



X_feat, filters = fbcsp(eeg_samples, labels, sampling_rate)


clf = LDA()
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(clf, X_feat, labels, cv=cv)


print("FBCSP+LDA Accuracy: %.2f ± %.2f" % (scores.mean(), scores.std()))

FBCSP+LDA Accuracy: 0.58 ± 0.15


In [34]:
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from mne.decoding import CSP  # CSP implementation in MNE

X_train, X_test, y_train, y_test = train_test_split(
    eeg_samples, labels, test_size=0.2, shuffle=False
)

# -----------------------------
# Build pipeline: CSP → LDA
# -----------------------------
clf = Pipeline([
    ("csp", CSP(n_components=2, reg=None, log=True, norm_trace=False)),
    ("lda", LDA())
])

# -----------------------------
# Train
# -----------------------------
clf.fit(X_train, y_train)

# -----------------------------
# Evaluate
# -----------------------------
train_acc = clf.score(X_train, y_train)
test_acc = clf.score(X_test, y_test)

print(f"Train accuracy: {train_acc:.2f}")
print(f"Test accuracy: {test_acc:.2f}")

Computing rank from data with rank=None
    Using tolerance 2.7e+05 (2.2e-16 eps * 8 dim * 1.5e+20  max singular value)
    Estimated rank (data): 8
    data: rank 8 computed from 8 data channels with 0 projectors
Reducing data rank from 8 -> 8
Estimating class=0.0 covariance using EMPIRICAL
Done.
Estimating class=1.0 covariance using EMPIRICAL
Done.
Train accuracy: 0.62
Test accuracy: 0.58
