# Classification

In [None]:
import mne
import numpy as np 
import matplotlib.pyplot as plt
import torch
import os
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset, DataLoader
import torch.nn as nn
import torch.optim as optim

In [None]:
main_folder = './EEGData/MNE-eegbci-data/files/eegmmidb/1.0.0'
subdirectories = [f.path for f in os.scandir(main_folder) if f.is_dir()]
opened_files = []
closed_files = []

for subdirectory in subdirectories:
    files = os.listdir(subdirectory)
    if len(files) > 0:
        for file in files:
            if file[-6:] == '01.edf':
                # This is data for eyes opened
                eyes_opened = os.path.join(subdirectory, file)
                opened_files.append(eyes_opened)
            if file[-6:] == '02.edf':
                # This is data for eyes closed
                eyes_closed = os.path.join(subdirectory, file)
                closed_files.append(eyes_closed)
    else:
        print(f"No files found in {subdirectory}")

large_open_data_raw = []
large_closed_data_raw = []

flag = True

for i in range(len(opened_files)):
    if i not in [1,4,5,6,12,13,15,17,19,22,29,31,33,34.47,50,51,58,60,71,76,77,79,81,86,87,88,90,95,100]:
        data1 = mne.io.read_raw_edf(opened_files[i], preload=True, verbose=False).get_data(verbose=False)
        data2 = mne.io.read_raw_edf(closed_files[i], preload=True, verbose=False).get_data(verbose=False)
        for j in range(max(len(data1), len(data2))):
            if len(data1[j]) != 9760 or len(data2[j]) != 9760:
                flag = False
        if flag:
            large_open_data_raw.append(data1)
            large_closed_data_raw.append(data2)
        flag = True

print(large_open_data_raw.__len__())
print(large_closed_data_raw.__len__())

In [None]:
for i in range(len(large_open_data_raw)):
    if len(large_open_data_raw[i]) != 64:
        print(i)
    for j in range(len(large_open_data_raw[i])):
        if len(large_open_data_raw[i][j]) != 9760:
            print(i)

for i in range(len(large_closed_data_raw)):
    if len(large_closed_data_raw[i]) != 64:
        print(i)
    for j in range(len(large_closed_data_raw[i])):
        if len(large_closed_data_raw[i][j]) != 9760:
            print(i)

## Plotting

In [None]:
def single_channel_plot(data, channel, title):
    plt.figure(figsize=(20, 3))
    plt.plot(data[channel, :])
    plt.title(title)
    plt.show()

def plot_all_channels(data, title):
    plt.figure(figsize=(20, 10))
    plt.plot(data.T)
    plt.title(title)
    plt.show()

def plot_all_channels_subplots(data):
    plt.figure(figsize=(20, 10))
    for i in range(64):
        plt.subplot(8, 8, i+1)
        plt.plot(data[i, :])
        plt.title(f"Channel {i}")
    plt.show()

single_channel_plot(large_open_data_raw[0], 0, "Channel 0 for eyes opened")
single_channel_plot(large_closed_data_raw[0], 0, "Channel 0 for eyes closed")
plot_all_channels(large_open_data_raw[0], "All channels for eyes opened")
plot_all_channels(large_closed_data_raw[0], "All channels for eyes closed")
plot_all_channels_subplots(large_open_data_raw[0])
plot_all_channels_subplots(large_closed_data_raw[0])

## Pre-Processing

In [None]:
large_closed_data = []
large_open_data = []

# Define the frequency range for the filter
low_freq = 0.1  # Low-pass frequency in Hz
high_freq = 35 # High-pass frequency in Hz

for data in opened_files:
    data1 = mne.io.read_raw_edf(data, preload=True, verbose=False)
    data1 = mne.filter.filter_data(data1.get_data(), sfreq=data1.info['sfreq'], l_freq=low_freq, h_freq=high_freq, fir_design="firwin", verbose=False)
    eeg_data1 = data1
    large_open_data.append(eeg_data1)

for data in closed_files:
    data1 = mne.io.read_raw_edf(data, preload=True, verbose=False)
    data1 = mne.filter.filter_data(data1.get_data(), sfreq=data1.info['sfreq'], l_freq=low_freq, h_freq=high_freq, fir_design="firwin", verbose=False)
    eeg_data1 = data1
    large_closed_data.append(eeg_data1)

plot_all_channels(large_open_data[0], "All channels for eyes opened")
plot_all_channels(large_closed_data[0], "All channels for eyes closed")
plot_all_channels_subplots(large_open_data_raw[0])
plot_all_channels_subplots(large_open_data[0])
plot_all_channels_subplots(large_closed_data_raw[0])
plot_all_channels_subplots(large_closed_data[0])


for i in range(len(large_closed_data)):
    if len(large_closed_data[i]) != 64:
        print(i)
    for j in range(len(large_closed_data[i])):
        if len(large_closed_data[i][j]) != 9760:
            print(i)

## PyTorch Set Up

In [None]:
torch.manual_seed(0)
np.random.seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

## Single Channel Classification

### Dataset

In [None]:
all_losses = []
all_test_accuracies = []


class FNNClassifierSingle(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(FNNClassifierSingle, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

input_size = 9760
hidden_size = 128
num_classes = 2

for channel in range(63):
    data = np.concatenate((np.array(large_open_data_raw)[:, channel, :], np.array(large_closed_data_raw)[:, channel, :]), axis=0)
    labels = np.concatenate((np.zeros(len(np.array(large_open_data_raw)[:, channel, :])), np.ones(len(np.array(large_closed_data_raw)[:, channel, :]))), axis=0)
    data = torch.Tensor(data).to(device)
    labels = torch.Tensor(labels).to(device)
    labels = labels.long()

    random_indices = np.arange(len(data))
    np.random.shuffle(random_indices)
    data = data[random_indices]
    labels = labels[random_indices]

    print(data.shape)
    print(labels.shape)

    test_size = 0.2 
    train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=test_size, random_state=42)

    train_data = torch.Tensor(train_data)
    train_labels = torch.Tensor(train_labels)
    test_data = torch.Tensor(test_data)
    test_labels = torch.Tensor(test_labels)

    batch_size = 32

    train_dataset = TensorDataset(train_data, train_labels)
    test_dataset = TensorDataset(test_data, test_labels)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)



    fnn_model = FNNClassifierSingle(input_size, hidden_size, num_classes).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(fnn_model.parameters(), lr=0.001)

    num_epochs = 100
    losses = []
    test_accuracies = []

    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        
        for inputs, labels in train_loader: 
            optimizer.zero_grad()
            outputs = fnn_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        epoch_loss = running_loss / len(train_loader)
        losses.append(epoch_loss)
        
        print(f'Epoch [{epoch + 1}/{num_epochs}] Loss: {epoch_loss:.4f}')
        
        correct_test = 0
        total_test = 0
        with torch.no_grad():
            for inputs, labels in test_loader: 
                outputs = fnn_model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total_test += labels.size(0)
                correct_test += (predicted == labels).sum().item()
        
        test_accuracy = 100 * correct_test / total_test
        test_accuracies.append(test_accuracy)


    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(losses)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss')

    plt.subplot(1, 2, 2)
    plt.plot(test_accuracies)
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title('Test Set Accuracy')

    plt.show()
    

    final_test_accuracy = test_accuracies[-1]
    all_losses.append(losses[-1])
    all_test_accuracies.append(test_accuracies[-1])
    print(f'Final accuracy on the test dataset: {final_test_accuracy:.2f}%')

In [None]:
class FNNClassifierSingle(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(FNNClassifierSingle, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

input_size = 9760
hidden_size = 128
num_classes = 2

In [None]:
plt.plot(all_losses)
plt.xlabel('Channel #')
plt.ylabel('Loss')
plt.title('Train Set Loss')
plt.show()


plt.plot(all_test_accuracies)
plt.xlabel('Channel #')
plt.ylabel('Accuracy (%)')
plt.title('Test Set Accuracy')
plt.show()

In [None]:
channel = 0
data = np.concatenate((np.array(large_open_data_raw)[:, channel, :], np.array(large_closed_data_raw)[:, channel, :]), axis=0)
labels = np.concatenate((np.zeros(len(np.array(large_open_data_raw)[:, channel, :])), np.ones(len(np.array(large_closed_data_raw)[:, channel, :]))), axis=0)
data = torch.Tensor(data).to(device)
labels = torch.Tensor(labels).to(device)
labels = labels.long()

random_indices = np.arange(len(data))
np.random.shuffle(random_indices)
data = data[random_indices]
labels = labels[random_indices]

print(data.shape)
print(labels.shape)

test_size = 0.2 
train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=test_size, random_state=42)

train_data = torch.Tensor(train_data)
train_labels = torch.Tensor(train_labels)
test_data = torch.Tensor(test_data)
test_labels = torch.Tensor(test_labels)

batch_size = 32

train_dataset = TensorDataset(train_data, train_labels)
test_dataset = TensorDataset(test_data, test_labels)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

class FNNClassifierSingle(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(FNNClassifierSingle, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

input_size = 9760
hidden_size = 128
num_classes = 2

fnn_model = FNNClassifierSingle(input_size, hidden_size, num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(fnn_model.parameters(), lr=0.001)

num_epochs = 2000
losses = []

for epoch in range(num_epochs):
    running_loss = 0.0
    correct = 0
    total = 0
    
    for inputs, labels in train_loader: 
        optimizer.zero_grad()
        outputs = fnn_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    epoch_loss = running_loss / len(train_loader)
    losses.append(epoch_loss)
    
    print(f'Epoch [{epoch + 1}/{num_epochs}] Loss: {epoch_loss:.4f}')
    
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()

correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in test_loader: 
        outputs = fnn_model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the network on the test dataset: {100 * correct / total:.2f}%')

## All Channel Classification

In [None]:
data = torch.Tensor(
    np.concatenate((
        np.array(large_closed_data_raw), np.array(large_open_data_raw)), axis=0)).to(device)

labels = torch.Tensor(
    np.concatenate((
        np.zeros(len(np.array(large_closed_data_raw))), np.ones(len(np.array(large_open_data_raw)))), axis=0)).to(device).long()

random_indices = np.arange(len(data))
np.random.shuffle(random_indices)
data = data[random_indices]
labels = labels[random_indices]

print(data.shape)
print(labels.shape)

test_size = 0.2 
train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=test_size, random_state=42)

train_data = torch.Tensor(train_data)
train_labels = torch.Tensor(train_labels)
test_data = torch.Tensor(test_data)
test_labels = torch.Tensor(test_labels)

batch_size = 16

train_dataset = TensorDataset(train_data, train_labels)
test_dataset = TensorDataset(test_data, test_labels)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class FNNClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(FNNClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

input_size = 9760  * 64
hidden_size = 128
num_classes = 2

In [None]:
fnn_model = FNNClassifier(input_size, hidden_size, num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(fnn_model.parameters(), lr=0.001)

num_epochs = 500
losses = []
test_accuracies = []
best_test_loss = float('inf')
patience = 20 
no_improvement = 0

for epoch in range(num_epochs):
    running_loss = 0.0
    correct = 0
    total = 0
    
    for inputs, labels in train_loader: 
        optimizer.zero_grad()
        outputs = fnn_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    epoch_loss = running_loss / len(train_loader)
    losses.append(epoch_loss)

    if epoch % 10 == 0:
        print(f'Epoch [{epoch}/{num_epochs}] Loss: {epoch_loss:.4f}')
    
    if epoch % 100 == 0:
        plt.figure(figsize=(12, 5))
        plt.subplot(1, 2, 1)
        plt.plot(losses)
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Training Loss')

        plt.subplot(1, 2, 2)
        plt.plot(test_accuracies)
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy (%)')
        plt.title('Test Set Accuracy')

        plt.show()
    
    correct_test = 0
    total_test = 0
    test_loss = 0.0
    with torch.no_grad():
        for inputs, labels in test_loader: 
            outputs = fnn_model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total_test += labels.size(0)
            correct_test += (predicted == labels).sum().item()
            test_loss += criterion(outputs, labels).item()
    
    test_accuracy = 100 * correct_test / total_test
    test_accuracies.append(test_accuracy)

    # Check for early stopping
    if test_loss < best_test_loss:
        best_test_loss = test_loss
        no_improvement = 0
    else:
        no_improvement += 1

    if no_improvement >= patience:
        print(f'Early stopping after {epoch} epochs due to no improvement in test loss.')
        break

final_test_accuracy = test_accuracies[-1]
print(f'Final accuracy on the test dataset: {final_test_accuracy:.2f}%')

plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')

plt.subplot(1, 2, 2)
plt.plot(test_accuracies)
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Test Set Accuracy')
plt.show()