# Loading Files and Pre-Processing

In [None]:
import mne
import numpy as np
from scipy.signal import welch, stft
from scipy.stats import skew, kurtosis, entropy
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.decomposition import PCA
from sklearn.feature_selection import mutual_info_classif, SelectKBest
from collections import Counter

# List of EDF file paths to load
edf_files = [
    r"C:\Users\kyled\Downloads\Ind1.edf",
    r"C:\Users\kyled\Downloads\Ind3.edf",
    r"C:\Users\kyled\Downloads\Ind4.edf",
    r"C:\Users\kyled\Downloads\Ind5.edf",
]

# Loading the first file to use as a reference for channel names
print("Loading reference EDF file...")
reference_raw = mne.io.read_raw_edf(edf_files[0], preload=True)
reference_channels = reference_raw.info['ch_names']

raw_objects = []
for file_path in edf_files:
    print(f"Loading and cropping EDF file: {file_path}...")
    raw = mne.io.read_raw_edf(file_path, preload=True)
    raw.crop(tmin=60, tmax=361)
    raw.pick_channels(reference_channels)
    raw_objects.append(raw)

# Concatenating all loaded and processed objects
print("Concatenating all processed EDF files...")
raw = mne.concatenate_raws(raw_objects)

# Pre-processing the combined data
print("Filtering data...")
raw.filter(4, 30)
sfreq = raw.info['sfreq']

epoch_duration = 30  # seconds
start_times = np.arange(0, raw.times[-1] - epoch_duration, epoch_duration)
end_times = start_times + epoch_duration

words = ['yes', 'no', 'more', 'stop', 'help', 'want', 'eat', 'drink', 'I', 'you']

# Feature Extraction

In [None]:
def extract_features(epoch_data, sfreq):
    mean_vals = np.mean(epoch_data, axis=1)
    std_vals = np.std(epoch_data, axis=1)
    skew_vals = skew(epoch_data, axis=1)
    kurt_vals = kurtosis(epoch_data, axis=1)
    freqs, psd = welch(epoch_data, sfreq, nperseg=int(sfreq))
    theta_power = psd[:, (freqs > 4) & (freqs <= 8)].mean(axis=1)
    alpha_power = psd[:, (freqs > 8) & (freqs <= 12)].mean(axis=1)
    beta_power = psd[:, (freqs > 12) & (freqs <= 30)].mean(axis=1)
    _, _, Zxx = stft(epoch_data, fs=sfreq, nperseg=int(sfreq/2))
    stft_power = np.abs(Zxx).mean(axis=2)
    entropy_vals = np.array([entropy(np.abs(epoch_data[channel, :])) for channel in range(epoch_data.shape[0])])

    features = np.stack([
        mean_vals,
        std_vals,
        skew_vals,
        kurt_vals,
        theta_power,
        alpha_power,
        beta_power,
        stft_power.mean(axis=1),
        entropy_vals
    ], axis=1)

    return features

print("Segmenting data into 30-second epochs, then into 2-second sub-epochs, and extracting features...")
labeled_features_data = []
sub_epoch_duration = 2  # in seconds
for i, (start, end) in enumerate(zip(start_times, end_times)):
    start_sample = int(start * sfreq)
    end_sample = int(end * sfreq)
    epoch_data, _ = raw[:, start_sample:end_sample]
    word_label = words[i % len(words)]
    
    for j in range(int(epoch_duration / sub_epoch_duration)):
        sub_start = j * sub_epoch_duration * int(sfreq)
        sub_end = (j + 1) * sub_epoch_duration * int(sfreq)
        sub_epoch_data = epoch_data[:, sub_start:sub_end]
        features = extract_features(sub_epoch_data, sfreq)
        labeled_features_data.append((features, word_label))

# Extracting features and labels from the collected data
features, labels = zip(*labeled_features_data) if labeled_features_data else ([], [])
features = np.array(features) if features else np.empty((0, 0))
labels = np.array(labels)
    
# Flattening the last two dimensions of the features array
features_2d = features.reshape(features.shape[0], -1)

# Handling NaN values: replacing them with the column mean
imputer = SimpleImputer(missing_values=np.nan, strategy='mean')
features_imputed = imputer.fit_transform(features_2d)

# Scaling the features
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features_imputed)

# Feature Selection

In [None]:
# Mutual Information
num_sub_epochs_per_epoch = int(epoch_duration / sub_epoch_duration)
total_sub_epochs = num_sub_epochs_per_epoch * len(start_times)

num_features = features_scaled.shape[1]
k_best = min(num_features, 20)  # Ensure k does not exceed the number of available features
mi_selector = SelectKBest(mutual_info_classif, k=k_best)
features_mi = mi_selector.fit_transform(features_scaled, labels)

# Choosing feature set to use for further model training
selected_features = features_mi

num_features_mi = features_mi.shape[1]  # Number of features after MI

# Splitting into Train, Validation and Test sets

In [None]:
train_features, val_features, train_labels, val_labels = train_test_split(
    selected_features, labels, test_size=0.3, random_state=42, stratify=labels)

print(f"Training data size: {len(train_features)}")
print(f"Validation data size: {len(val_features)}")

# Counting occurrences of each label in the training and validation sets
train_label_counts = Counter(train_labels)
val_label_counts = Counter(val_labels)

# Calculating the total number of samples in each set
total_train = len(train_labels)
total_val = len(val_labels)

# Printing the distribution of each label in each set
print("Training set label distribution:")
for label, count in train_label_counts.items():
    print(f"{label}: {count} ({count / total_train * 100:.2f}%)")

print("\nValidation set label distribution:")
for label, count in val_label_counts.items():
    print(f"{label}: {count} ({count / total_val * 100:.2f}%)")

# Building the Model

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer

class EEGTransformerEncoder(nn.Module):
    def __init__(self, input_dim, num_classes, d_model=256, nhead=8, num_layers=16, dim_feedforward=512, dropout_rate=0.1, noise_std=0.01):
        super(EEGTransformerEncoder, self).__init__()
        self.noise_std = noise_std
        self.linear_in = nn.Linear(input_dim, d_model)
        self.dropout_in = nn.Dropout(dropout_rate)
        
        # Using pre-LayerNorm
        encoder_layer = TransformerEncoderLayer(d_model=d_model, nhead=nhead, 
                                                dim_feedforward=dim_feedforward, 
                                                dropout=dropout_rate, 
                                                activation='gelu', 
                                                norm_first=True)

        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.batch_norm = nn.BatchNorm1d(d_model)
        self.linear_out = nn.Linear(d_model, num_classes)
        self.dropout_out = nn.Dropout(dropout_rate)

    def forward(self, x):
        if self.training and self.noise_std > 0.0:
            noise = torch.randn_like(x) * self.noise_std
            x = x + noise

        x = self.linear_in(x)
        x = self.dropout_in(x)
        
        x = self.transformer_encoder(x)  # Transformer encoder with pre-LayerNorm

        x = self.batch_norm(x)
        x = self.dropout_out(x)
        x = self.linear_out(x)

        return F.log_softmax(x, dim=1)

model = EEGTransformerEncoder(input_dim=num_features_mi, num_classes=10)
print(model)


# Creating Datasets

In [None]:
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder

class EEGDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]
    
# Encoding string labels to integers
label_encoder = LabelEncoder()
train_labels_encoded = label_encoder.fit_transform(train_labels)
val_labels_encoded = label_encoder.transform(val_labels)
    
train_dataset = EEGDataset(train_features, train_labels_encoded)
val_dataset = EEGDataset(val_features, val_labels_encoded)

# Defining DataLoader
batch_size = 256
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)


# Training Loop

In [None]:
import torch
import torch.nn.functional as F

# Training parameters
learning_rate = 1e-5
epochs = 600

# Using CrossEntropyLoss for classification
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5, weight_decay=1e-8)

train_accuracies = []
val_accuracies = []

l1_lambda = 0.0001

# Training loop
for epoch in range(epochs):
    model.train()
    train_loss = 0.0
    train_correct = 0
    total_train = 0

    for features, labels in train_loader:
        optimizer.zero_grad()
        output = model(features)
        loss = criterion(output, labels)
        l1_norm = sum(p.abs().sum() for p in model.parameters())
        loss += l1_lambda * l1_norm  # L1 regularization
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        train_loss += loss.item()
        predictions = torch.max(output, 1)[1]
        train_correct += (predictions == labels).sum().item()
        total_train += labels.size(0)

    train_accuracy = train_correct / total_train
    train_accuracies.append(train_accuracy)

    # Validation
    model.eval()
    val_loss = 0.0
    val_correct = 0
    total_val = 0

    with torch.no_grad():
        for features, labels in val_loader:
            output = model(features)
            loss = criterion(output, labels)
            val_loss += loss.item()
            predictions = torch.max(output, 1)[1]
            val_correct += (predictions == labels).sum().item()
            total_val += labels.size(0)

    val_accuracy = val_correct / total_val
    val_accuracies.append(val_accuracy)

    print(f'Epoch {epoch+1}, Loss: {train_loss / total_train}, Training Accuracy: {train_accuracy}, '
          f'Validation Loss: {val_loss / total_val}, Validation Accuracy: {val_accuracy}')

# Getting Test file ready

In [None]:
test_edf_file = r"C:\Users\kyled\Downloads\Ind2.edf"

# Preprocessing steps for the new test EDF file
print(f"Loading and preprocessing the test EDF file: {test_edf_file}...")
test_raw = mne.io.read_raw_edf(test_edf_file, preload=True)
test_raw.crop(tmin=60, tmax=361)
test_raw.pick_channels(reference_channels)
test_raw.filter(4, 30)
sfreq_test = test_raw.info['sfreq']

epoch_duration = 30
sub_epoch_duration = 2

test_start_times = np.arange(0, test_raw.times[-1] - epoch_duration, epoch_duration)
test_end_times = test_start_times + epoch_duration

labeled_test_features_data = []
for i, (start, end) in enumerate(zip(test_start_times, test_end_times)):
    start_sample = int(start * sfreq_test)
    end_sample = int(end * sfreq_test)
    epoch_data, _ = test_raw[:, start_sample:end_sample]
    word_label = words[i % len(words)]
    
    for j in range(int(epoch_duration / sub_epoch_duration)):
        sub_start = j * sub_epoch_duration * int(sfreq_test)
        sub_end = (j + 1) * sub_epoch_duration * int(sfreq_test)
        sub_epoch_data = epoch_data[:, sub_start:sub_end]
        features = extract_features(sub_epoch_data, sfreq_test)
        labeled_test_features_data.append((features, word_label))

# Extracting features and labels from the test data
test_features, test_labels = zip(*labeled_test_features_data) if labeled_test_features_data else ([], [])
test_features = np.array(test_features) if test_features else np.empty((0, 0))
test_labels = np.array(test_labels)

# Flattening the last two dimensions of the test features array
test_features_2d = test_features.reshape(test_features.shape[0], -1)

# Handling NaN values in test data: replacing them with the column mean
test_features_imputed = imputer.transform(test_features_2d)

# Scaling the test features
test_features_scaled = scaler.transform(test_features_imputed)

# Applying mutual information feature selection to the test features
test_features_mi = mi_selector.transform(test_features_scaled)

# Encoding string labels to integers for the test set
test_labels_encoded = label_encoder.transform(test_labels)

test_dataset = EEGDataset(test_features_mi, test_labels_encoded)

# Defining test DataLoader
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# Testing the Model

### Calculating Metrics

In [None]:
from sklearn.metrics import precision_recall_fscore_support
test_loss = 0.0
test_correct = 0
total_test = 0

all_test_predictions = []
all_test_labels = []

model.eval()
with torch.no_grad():
    for features, labels in test_loader:
        output = model(features)
        loss = criterion(output, labels)
        test_loss += loss.item()
        predictions = torch.max(output, 1)[1]
        test_correct += (predictions == labels).sum().item()
        total_test += labels.size(0)
        all_test_predictions.extend(predictions.cpu().numpy())
        all_test_labels.extend(labels.cpu().numpy())

test_accuracy = test_correct / total_test

print(f'Test Loss: {test_loss / total_test}, Test Accuracy: {test_accuracy}')

# Calculating precision, recall, and F1-score
precision, recall, f1_score, _ = precision_recall_fscore_support(all_test_labels, all_test_predictions, average='macro')
print(f'Test Precision: {precision:.4f}, Test Recall: {recall:.4f}, Test F1 Score: {f1_score:.4f}')

### Designing Confusion Matrix

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import numpy as np

label_names = ['yes', 'no', 'more', 'stop', 'help', 'want', 'eat', 'drink', 'I', 'you']

# Computing the confusion matrix
cm = confusion_matrix(all_test_labels, all_test_predictions, labels=np.arange(10))  # Adjust labels range based on your actual labels

plt.figure(figsize=(10, 7))
ax = sns.heatmap(cm, annot=False, fmt='d', cmap='Blues', xticklabels=label_names, yticklabels=label_names)

# Threshold for colors
threshold = cm.max() / 2.

# Looping over the cells to change the text color based on the background
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        ax.text(j + 0.5, i + 0.5, cm[i, j],
                horizontalalignment='center',
                verticalalignment='center',
                color="white" if cm[i, j] > threshold else "black")

plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.tight_layout()
plt.show()

### Designing line-graph showing training and validation over time

In [None]:
from sklearn.metrics import precision_recall_fscore_support
import matplotlib.pyplot as plt
# Plotting training and validation accuracy
plt.figure(figsize=(10, 5))
plt.plot(train_accuracies, label='Training Accuracy')
plt.plot(val_accuracies, label='Validation Accuracy')
plt.title('Training and Validation Accuracy over Epochs')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()