In [4]:
import mne
import numpy as np
from scipy.signal import welch

left_data = mne.io.read_raw_fif('cleaned_eeg_data_left2.fif', preload=True)
right_data = mne.io.read_raw_fif('cleaned_eeg_data_right.fif', preload=True)
up_data = mne.io.read_raw_fif('cleaned_eeg_data_up.fif', preload=True)
down_data = mne.io.read_raw_fif('cleaned_eeg_data_down.fif', preload=True)

# left_data = mne.io.read_raw_fif('LEFT.fif', preload=True)
# right_data = mne.io.read_raw_fif('RIGHT.fif', preload=True)
# up_data = mne.io.read_raw_fif('UP.fif', preload=True)
# down_data = mne.io.read_raw_fif('DOWN.fif', preload=True)

epoch_duration = 1  
overlap = 0.5  
left_events = mne.make_fixed_length_events(left_data, duration=epoch_duration)
right_events = mne.make_fixed_length_events(right_data, duration=epoch_duration)
up_events = mne.make_fixed_length_events(up_data, duration=epoch_duration)
down_events = mne.make_fixed_length_events(down_data, duration=epoch_duration)

left_epochs = mne.Epochs(left_data, events=left_events, tmin=0, tmax=epoch_duration, baseline=None, preload=True)
right_epochs = mne.Epochs(right_data, events=right_events, tmin=0, tmax=epoch_duration, baseline=None, preload=True)
up_epochs = mne.Epochs(up_data, events=up_events, tmin=0, tmax=epoch_duration, baseline=None, preload=True)
down_epochs = mne.Epochs(down_data, events=down_events, tmin=0, tmax=epoch_duration, baseline=None, preload=True)

freq_bands = {'Delta': (0.5, 4),
              'Theta': (4, 8),
              'Alpha': (8, 13),
              'Beta': (13, 30),#should be more prevalent during our game
              'Gamma': (30, 40)}  

# def compute_avg_band_powers(epochs):
#     avg_band_powers = []
#     for epoch in epochs:
#         channel_band_powers = []
#         for channel_data in epoch:
#             for band in freq_bands.values():
#                 fmin, fmax = band
#                 freqs, psd = welch(channel_data, left_data.info['sfreq'], nperseg=left_data.info['sfreq']*epoch_duration, scaling='density')
#                 idx_band = np.logical_and(freqs >= fmin, freqs <= fmax)
#                 band_power = np.mean(psd[idx_band])
#                 channel_band_powers.append(band_power)
#         avg_band_powers.append(np.mean(channel_band_powers))
#     return avg_band_powers

# left_avg_band_powers = compute_avg_band_powers(left_epochs)
# right_avg_band_powers = compute_avg_band_powers(right_epochs)
# up_avg_band_powers = compute_avg_band_powers(up_epochs)
# down_avg_band_powers = compute_avg_band_powers(down_epochs)

# labels = np.concatenate((
#     np.zeros(len(left_avg_band_powers)),        # Left category labeled as 0
#     np.ones(len(right_avg_band_powers)),       # Right category labeled as 1
#     np.full(len(up_avg_band_powers), 2),       # Up category labeled as 2
#     np.full(len(down_avg_band_powers), 3)      # Down category labeled as 3
# ))

# all_avg_band_powers = (
#     left_avg_band_powers + 
#     right_avg_band_powers +
#     up_avg_band_powers +
#     down_avg_band_powers
# )

# X = np.array(all_avg_band_powers)
def compute_avg_band_amplitudes(epochs):
    avg_band_amplitudes = []
    for epoch in epochs:
        channel_band_amplitudes = []
        for channel_data in epoch:
            for band in freq_bands.values():
                fmin, fmax = band
                sp = np.fft.fft(channel_data)
                freq = np.fft.fftfreq(len(channel_data), d=1/left_data.info['sfreq'])
                freq = freq[1:int(np.ceil(len(channel_data) / 4))]  
                sp = sp[1:int(np.ceil(len(channel_data) / 4))]
                sp = np.sqrt(sp.real**2 + sp.imag**2)
                band_indices = np.logical_and(freq >= fmin, freq <= fmax)
                band_amplitude = np.mean(sp[band_indices])
                channel_band_amplitudes.append(band_amplitude)
        avg_band_amplitudes.append(np.mean(channel_band_amplitudes))
    return avg_band_amplitudes

left_avg_band_amplitudes = compute_avg_band_amplitudes(left_epochs)
right_avg_band_amplitudes = compute_avg_band_amplitudes(right_epochs)
up_avg_band_amplitudes = compute_avg_band_amplitudes(up_epochs)
down_avg_band_amplitudes = compute_avg_band_amplitudes(down_epochs)
all_avg_band_amplitudes = left_avg_band_amplitudes + up_avg_band_amplitudes+down_avg_band_amplitudes+right_avg_band_amplitudes

X = np.array(all_avg_band_amplitudes)
labels = np.concatenate((
    np.zeros(len(left_avg_band_amplitudes)),        # Left category labeled as 0
    np.ones(len(right_avg_band_amplitudes)),       # Right category labeled as 1
    np.full(len(up_avg_band_amplitudes), 2),       # Up category labeled as 2
    np.full(len(down_avg_band_amplitudes), 3)      # Down category labeled as 3
))

Opening raw data file cleaned_eeg_data_left2.fif...
Isotrak not found
    Range : 0 ... 10527 =      0.000 ...    82.242 secs
Ready.
Reading 0 ... 10527  =      0.000 ...    82.242 secs...
Opening raw data file cleaned_eeg_data_right.fif...
Isotrak not found
    Range : 0 ... 16255 =      0.000 ...   126.992 secs
Ready.
Reading 0 ... 16255  =      0.000 ...   126.992 secs...
Opening raw data file cleaned_eeg_data_up.fif...
Isotrak not found
    Range : 0 ... 10136 =      0.000 ...    79.188 secs
Ready.
Reading 0 ... 10136  =      0.000 ...    79.188 secs...
Opening raw data file cleaned_eeg_data_down.fif...
Isotrak not found
    Range : 0 ... 18327 =      0.000 ...   143.180 secs
Ready.
Reading 0 ... 18327  =      0.000 ...   143.180 secs...
Not setting metadata
82 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 82 events and 129 original time points ...
0 bad epochs dropped
Not setting metadata
127 matching events fou

  left_data = mne.io.read_raw_fif('cleaned_eeg_data_left2.fif', preload=True)
  right_data = mne.io.read_raw_fif('cleaned_eeg_data_right.fif', preload=True)
  up_data = mne.io.read_raw_fif('cleaned_eeg_data_up.fif', preload=True)
  down_data = mne.io.read_raw_fif('cleaned_eeg_data_down.fif', preload=True)


In [5]:
num_left_epochs = len(left_epochs)
num_right_epochs = len(right_epochs)

print(f"Number of epochs for left data: {num_left_epochs}")
print(f"Number of epochs for right data: {num_right_epochs}")

num_up_epochs = len(up_epochs)
num_down_epochs = len(down_epochs)

print(f"Number of epochs for up data: {num_up_epochs}")
print(f"Number of epochs for down data: {num_down_epochs}")

Number of epochs for left data: 82
Number of epochs for right data: 126
Number of epochs for up data: 79
Number of epochs for down data: 143


In [9]:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
import numpy as np

def exclude_one_epoch_for_testing(epochs):
    testing_epochs = []
    rest_epochs = []

    for idx, epoch in enumerate(epochs):
        if idx == 5:  
            testing_epochs.append(epoch)
        else:
            rest_epochs.append(epoch)

    return testing_epochs, rest_epochs

left_testing, left_training = exclude_one_epoch_for_testing(left_epochs)
right_testing, right_training = exclude_one_epoch_for_testing(right_epochs)
up_testing, up_training = exclude_one_epoch_for_testing(up_epochs)
down_testing, down_training = exclude_one_epoch_for_testing(down_epochs)

testing_epochs = left_testing + right_testing + up_testing + down_testing
training_epochs = left_training + right_training + up_training + down_training

X_train = np.array(compute_avg_band_amplitudes(training_epochs))

labels_train = np.concatenate((
    np.zeros(len(left_training)),        # Left category labeled as 0
    np.ones(len(right_training)),       # Right category labeled as 1
    np.full(len(up_training), 2),       # Up category labeled as 2
    np.full(len(down_training), 3)      # Down category labeled as 3
))

X_test = np.array(compute_avg_band_amplitudes(testing_epochs))

labels_test = np.concatenate((
    np.zeros(len(left_testing)),        # Left category labeled as 0
    np.ones(len(right_testing)),       # Right category labeled as 1
    np.full(len(up_testing), 2),       # Up category labeled as 2
    np.full(len(down_testing), 3)      # Down category labeled as 3
))

X_train = X_train.reshape(-1, 1)  

lda = LinearDiscriminantAnalysis()
lda.fit(X_train, labels_train)

X_test = X_test.reshape(-1, 1)  

predicted_labels = lda.predict(X_test)

prediction_probabilities = lda.predict_proba(X_test)
max_class_probabilities = prediction_probabilities.max(axis=1)

for i, (predicted_label, max_class_prob) in enumerate(zip(predicted_labels, max_class_probabilities)):
    print(f"Epoch {i + 1}: Predicted Label - {predicted_label}, Max Class Probability - {max_class_prob}")




Epoch 1: Predicted Label - 3.0, Max Class Probability - 0.363979582484867
Epoch 2: Predicted Label - 3.0, Max Class Probability - 0.3733364709397264
Epoch 3: Predicted Label - 3.0, Max Class Probability - 0.33493213097056734
Epoch 4: Predicted Label - 3.0, Max Class Probability - 0.3601286868774951


In [10]:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

X_train, X_test, y_train, y_test = train_test_split(X, labels, test_size=0.2, random_state=42)
X_train = X_train.reshape(X_train.shape[0], -1)

lda_classifier = LinearDiscriminantAnalysis()
lda_classifier.fit(X_train, y_train)

X_test = X_test.reshape(X_test.shape[0], -1)

predictions = lda_classifier.predict(X_test)

accuracy = accuracy_score(y_test, predictions)
print(f"Accuracy: {accuracy * 100:.2f}%")

report = classification_report(y_test, predictions)
print("Classification Report:\n", report)

Accuracy: 39.53%
Classification Report:
               precision    recall  f1-score   support

         0.0       0.70      0.29      0.41        24
         1.0       0.62      0.19      0.29        26
         2.0       0.00      0.00      0.00        12
         3.0       0.32      0.92      0.48        24

    accuracy                           0.40        86
   macro avg       0.41      0.35      0.30        86
weighted avg       0.47      0.40      0.34        86



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [11]:
from joblib import dump
model_filename = 'lda_model.joblib'  
dump(lda_classifier, model_filename)

['lda_model.joblib']

In [12]:
# new_data = mne.io.read_raw_fif('cleaned_eeg_data_right.fif', preload=True)

# new_events = mne.make_fixed_length_events(new_data, duration=epoch_duration)

# new_epochs = mne.Epochs(new_data, events=new_events, tmin=0, tmax=epoch_duration, baseline=None, preload=True)

# new_avg_band_amplitudes = compute_avg_band_amplitudes(new_epochs)

# X_new = np.array(new_avg_band_amplitudes)
# X_new = X_new.reshape(X_new.shape[0], -1)

# predictions_new = lda_classifier.predict(X_new)
# predictions_probabilities = lda_classifier.predict_proba(X_new)
# print("Probabilities for each class:")
# for i, probs in enumerate(predictions_probabilities):
#     print(f"Sample {i+1}: Left: {probs[0]:.2f}, Up: {probs[1]:.2f}, Down: {probs[2]:.2f}")
