In [None]:
import os
import numpy as np
import pandas as pd
from scipy.signal import butter, lfilter
from sklearn.decomposition import FastICA
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from mne.decoding import CSP
import math
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV

In [None]:
base_path = './'
train_df = pd.read_csv(os.path.join(base_path, 'train.csv'))
validation_df = pd.read_csv(os.path.join(base_path, 'validation.csv'))
test_df = pd.read_csv(os.path.join(base_path, 'test.csv'))

# Function to load a trial's EEG data
def load_trial_data(row, base_path='.'):
    # Determine dataset type based on ID range
    id_num = row['id']
    if id_num <= 4800:
        dataset = 'train'
    elif id_num <= 4900:
        dataset = 'validation'
    else:
        dataset = 'test'

    # Construct the path to EEGdata.csv
    eeg_path = f"{base_path}/{row['task']}/{dataset}/{row['subject_id']}/{row['trial_session']}/EEGdata.csv"

    # Load the entire EEG file
    eeg_data = pd.read_csv(eeg_path)

    # Calculate indices for the specific trial
    trial_num = int(row['trial'])
    if row['task'] == 'MI':
        samples_per_trial = 2250  # 9 seconds * 250 Hz
    else:  # SSVEP
        samples_per_trial = 1750  # 7 seconds * 250 Hz

    start_idx = (trial_num - 1) * samples_per_trial
    end_idx = start_idx + samples_per_trial - 1

    # Extract the trial data
    trial_data = eeg_data.iloc[start_idx:end_idx+1]
    return trial_data

In [None]:
EEG_CHANNELS = ['FZ', 'C3', 'CZ', 'C4', 'PZ', 'PO7', 'OZ', 'PO8']
SAMPLE_RATE = 250
BANDPASS = (8, 30)
CROP_WINDOW = (1.5, 7)
ica_model = FastICA(random_state=42, max_iter=1000)

In [None]:
# --- Signal Processing ---
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    b, a = butter(order, [lowcut / nyq, highcut / nyq], btype='band')
    return b, a

def apply_bandpass(data, lowcut, highcut, fs):
    b, a = butter_bandpass(lowcut, highcut, fs)
    return lfilter(b, a, data, axis=0)

# --- ICA Cleaning ---
def apply_ica(data, transform_only=False):
    ica_model.n_components = data.shape[1]
    if transform_only:
        transformed = ica_model.transform(data)
    else:
        transformed = ica_model.fit_transform(data)
        
    cleaned = ica_model.inverse_transform(transformed)
    return cleaned

def normalize_for_plot(df: pd.DataFrame) -> pd.DataFrame:
    return (df - df.mean(axis=0)) / df.std(axis=0)

# --- Trial Preprocessing ---
def preprocess_trial(trial_df: pd.DataFrame, transform_only=False) -> pd.DataFrame:
    # 1. Select EEG channels
    eeg = trial_df[EEG_CHANNELS].values

    # 2. Bandpass filter
    eeg = apply_bandpass(eeg, *BANDPASS, fs=SAMPLE_RATE)

    # 3. ICA artifact removal
    eeg = apply_ica(eeg, transform_only)

    start_idx = CROP_WINDOW[0] * SAMPLE_RATE
    end_idx = CROP_WINDOW[1] * SAMPLE_RATE
    eeg = eeg[math.floor(start_idx):math.floor(end_idx), :]

    # 5. Z-score normalization (per channel)
    eeg = normalize_for_plot(eeg)

    return eeg.T  # return shape: (n_channels, n_samples)

In [None]:
def extract_csp_features(X_list, y_list, n_components=4, for_deep_learning=False):
    X = np.array(X_list)  # shape: (n_trials, n_channels, n_samples)
    y = np.array(y_list)

    if for_deep_learning:
        csp = CSP(n_components, transform_into='csp_space')
    else:
        csp = CSP(n_components, transform_into='average_power', log=True)

    X_csp = csp.fit_transform(X, y)
    return X_csp, csp

In [None]:
# def compare_eeg_preprocessing(original_df: pd.DataFrame, processed_df: pd.DataFrame, channels=None, sample_rate=250, seconds=5):
#     """
#     Plot original vs preprocessed EEG signals for selected channels and duration.
#     """
#     if channels is None:
#         channels = ['C3', 'CZ', 'C4']

#     n_samples = sample_rate * seconds
#     time = np.arange(n_samples) / sample_rate

#     fig, axs = plt.subplots(len(channels), 1, figsize=(12, 2.5 * len(channels)), sharex=True)

#     for idx, ch in enumerate(channels):
#         axs[idx].plot(time, original_df[ch][:n_samples], label='Original', alpha=0.7)
#         axs[idx].plot(time, processed_df[ch][:n_samples], label='Preprocessed', alpha=0.7)
#         axs[idx].set_title(f"Channel: {ch}")
#         axs[idx].legend(loc='upper right')
#         axs[idx].set_ylabel('Amplitude (μV)')

#     axs[-1].set_xlabel('Time (s)')
#     plt.tight_layout()
#     plt.suptitle("EEG Signal Before vs After Preprocessing", fontsize=16, y=1.02)
#     plt.show()

In [None]:
# trial_data = load_trial_data(train_df.iloc[0], base_path)
# preprocessed = preprocess_trial(trial_data)
# compare_eeg_preprocessing(normalize_for_plot(trial_data), preprocessed)

In [None]:
from tqdm import tqdm

def preprocess_data(df, base_path, transform_only=False, test_data=False):
    X, y = [], []
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="Preprocessing Trials"):
        try:
            trial_data = load_trial_data(row, base_path)
            processed = preprocess_trial(trial_data, transform_only)
            X.append(processed)
            if not test_data:
                y.append(0 if row["label"] == "Left" else 1)
        except Exception as e:
            print(f"Trial {idx} skipped due to error: {e}")
    return np.array(X), np.array(y)


In [None]:
X_train, y_train = preprocess_data(train_df[:2400], base_path)
X_val, y_val = preprocess_data(validation_df[:50], base_path, transform_only=True)

X_full = np.concatenate([X_train, X_val], axis=0)
y_full = np.concatenate([y_train, y_val], axis=0)

In [None]:
X_test, y_test = preprocess_data(test_df[:50], base_path, transform_only=True, test_data=True)

In [None]:
X_train_csp, csp_model = extract_csp_features(X_train, y_train, n_components=4)
X_val_csp = csp_model.transform(X_val)
X_test_csp = csp_model.transform(X_test)

# print("Accuracy:", accuracy_score(y_test, y_pred))
# print("Classification Report:\n", classification_report(y_test, y_pred))

# sns.heatmap(
#     confusion_matrix(y_test, y_pred),
#     annot=True,
#     fmt='d',
#     cmap='Blues',
#     xticklabels=['Left', 'Right'],
#     yticklabels=['Left', 'Right']
# )
# plt.xlabel("Predicted")
# plt.ylabel("Actual")
# plt.title("Confusion Matrix")
# plt.show()


In [None]:

def svm_pipeline(X_train, y_train, X_test):
    clf_pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', SVC(kernel='linear', C=1, gamma=0.01))
    ])

    clf_pipeline.fit(X_train, y_train)
    # param_grid = {
    #     'classifier__C': [0.1, 1, 10],
    #     'classifier__gamma': [0.01, 0.1, 1]
    # }

    # grid = GridSearchCV(clf_pipeline, param_grid, cv=5)
    # grid.fit(X_train, y_train)
    return clf_pipeline.predict(X_test)

def lda_pipeline(X_train, y_train, X_test):
    clf_pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', LDA())
    ])
    clf_pipeline.fit(X_train, y_train)
    return clf_pipeline.predict(X_test)

y_pred_svm = svm_pipeline(X_train_csp, y_train, X_test_csp)
y_pred_lda = lda_pipeline(X_train_csp, y_train, X_test_csp)

In [None]:
def save_predictions(y_pred: np.ndarray, test_df: pd.DataFrame, filename: str):
    # map y_pred to left or right
    predictions = np.where(y_pred == 0, "Left", "Right")

    # create a dataframe with the predictions and the actual labels
    predictions_df = pd.DataFrame({
        "label": predictions
    })

    predictions_df["id"] = test_df["id"]
    predictions_df = predictions_df[["id", "label"]]
    predictions_df.to_csv(f'{filename}_submission.csv', index=False)

In [None]:
save_predictions(y_pred_lda, test_df, "lda")

In [None]:
save_predictions(y_pred_svm, test_df, "svm")