In [287]:
# from google.colab import drive
# drive.mount('/content/drive')

In [288]:
# !pip install torch_geometric

In [289]:
import numpy as np
import networkx as nx
import os
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.data import DataLoader
from torch_geometric.nn import GCNConv, global_mean_pool
from torch_geometric.data import Data

In [290]:
def readeeg(path):
  lines = np.loadtxt(path)
  eeg = [float(value) for value in lines]
  data = []
  num_samples = 7680
  num_channels = 16
  for i in range(num_channels):
    chnl = i*num_samples
    temp = []
    for j in range(num_samples):
      temp.append(eeg[j+chnl])
    data.append(temp)

  data = np.array(data)
  return data

In [291]:
def apply_laplacian_transform(eeg_data):
    laplacian_data = np.zeros_like(eeg_data)

    # Iterate over each channel
    for i in range(eeg_data.shape[0]):
        # Calculate the average of neighboring channels
        avg_neighboring = np.mean(eeg_data[:, [i-1, i+1]], axis=1, keepdims=True)

        # Subtract the average from the central channel value
        laplacian_data[:, i] = eeg_data[:, i] - avg_neighboring.flatten()

    return laplacian_data

In [292]:
def getdata(dataset, folder):
  case_path = os.path.join(dataset, folder)
  datas = []
  for filename in os.listdir(case_path):
    data = readeeg(os.path.join(case_path, filename))
    data = apply_laplacian_transform(data)
    datas.append(data)
  return datas

In [293]:
feature_names = ["F7", "F3", "F4", "F8", "T3", "C3", "Cz", "C4", "T4", "T5", "P3", "Pz", "P4", "T6", "O1", "O2"]

def get_graph(data):
  graph = nx.Graph()

  for i, feature_name in enumerate(feature_names):
      graph.add_node(i, name=feature_name)

  adj_matrix = np.corrcoef(data)
  num_channels = len(feature_names)
  for i in range(num_channels):
      graph.add_edge(i, i, weight=0.00)
      for j in range(i + 1, num_channels):
          weight = adj_matrix[i, j]  # Use correlation coefficient as edge weight
          graph.add_edge(i, j, weight=weight)

  return graph


def form_graphs(datas):
  graphs = []
  for data in datas:
    g = get_graph(data)
    graphs.append(g)

  return graphs

In [294]:
dataset_path = '/content/drive/MyDrive/Colab Notebooks/EEG_SHCEEEZ-DL/datasets/A'

laplace_norm = getdata(dataset_path, 'norm')
laplace_sch = getdata(dataset_path, 'sch')

In [295]:
norm_graphs = form_graphs(laplace_norm)
sch_graphs = form_graphs(laplace_sch)

In [296]:
norm_graphs[0][10]

AtlasView({0: {'weight': 0.3457712299320974}, 1: {'weight': 0.23630053786975563}, 2: {'weight': 0.4410845844727814}, 3: {'weight': -0.08745226728057598}, 4: {'weight': 0.30542686030149147}, 5: {'weight': 0.6346821729828586}, 6: {'weight': 0.6585225313784501}, 7: {'weight': 0.6828921089959428}, 8: {'weight': 0.2381219967921545}, 9: {'weight': 0.7239465773085957}, 10: {'weight': 0.0}, 11: {'weight': 0.9154522860836333}, 12: {'weight': 0.9090006997487697}, 13: {'weight': 0.7485677767248967}, 14: {'weight': 0.9217948588953351}, 15: {'weight': 0.8894881436147823}})

In [297]:
# import matplotlib.pyplot as plt

# graph = norm_graphs[0][10]
# degree_centrality = nx.degree_centrality(graph)

# # Create a dictionary for node colors based on centrality values
# node_colors = {node: centrality_value for node, centrality_value in degree_centrality.items()}

# # Plot the graph with nodes colored based on centrality values
# pos = nx.spring_layout(graph)
# nx.draw_networkx(graph, pos=pos, node_color=list(node_colors.values()), cmap='coolwarm')
# # nx.draw(norm_graphs[0][10], with_labels=True)
# plt.show()

In [298]:
train_ratio = 0.8
test_ratio = 0.1
validation_ratio = 0.1

def split_graphs(graphs, labels):
    combined_data = list(zip(graphs, labels))
    random.shuffle(combined_data)

    num_graphs = len(graphs)
    num_train = int(train_ratio * num_graphs)
    num_test = int(test_ratio * num_graphs)
    num_validation = num_graphs - num_train - num_test

    train_data = combined_data[:num_train]
    test_data = combined_data[num_train:num_train + num_test]
    validation_data = combined_data[num_train + num_test:]

    train_graphs, train_labels = zip(*train_data)
    test_graphs, test_labels = zip(*test_data)
    validation_graphs, validation_labels = zip(*validation_data)

    return train_graphs, test_graphs, validation_graphs, train_labels, test_labels, validation_labels

In [299]:
norm_labels = [0 for i in range(len(norm_graphs))]
sch_labels = [1 for i in range(len(sch_graphs))]

train_graphs, test_graphs, val_graphs, train_labels, test_labels, val_labels = split_graphs(norm_graphs + sch_graphs, norm_labels + sch_labels)

print(f'Train Size: {len(train_graphs)}, Test Size: {len(test_graphs)}, Val Size: {len(val_graphs)}')

Train Size: 67, Test Size: 8, Val Size: 9


In [300]:
class GCN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, hidden_dim)
        self.pool = global_mean_pool
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = self.conv1(x, edge_index)
        x = torch.relu(x)
        x = self.conv2(x, edge_index)
        x = self.pool(x, data.batch)  # Graph-level pooling
        x = self.fc(x)  # Classification layer
        return x

In [301]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [302]:
train_data = [(train_graphs[i], train_labels[i]) for i in range(len(train_graphs))]
test_data = [(test_graphs[i], test_labels[i]) for i in range(len(test_graphs))]
val_data = [(val_graphs[i], val_labels[i]) for i in range(len(val_graphs))]

In [303]:
train_ds = [Data(x=torch.tensor([[graph[node][n]['weight'] for n in graph.nodes] for node in graph.nodes], dtype=torch.float), edge_index=torch.tensor(list(graph.edges), dtype=torch.long).t().contiguous(), y=torch.tensor(label, dtype=torch.long)) for graph, label in train_data]
test_ds = [Data(x=torch.tensor([[graph[node][n]['weight'] for n in graph.nodes] for node in graph.nodes], dtype=torch.float), edge_index=torch.tensor(list(graph.edges), dtype=torch.long).t().contiguous(), y=torch.tensor(label, dtype=torch.long)) for graph, label in test_data]
val_ds = [Data(x=torch.tensor([[graph[node][n]['weight'] for n in graph.nodes] for node in graph.nodes], dtype=torch.float), edge_index=torch.tensor(list(graph.edges), dtype=torch.long).t().contiguous(), y=torch.tensor(label, dtype=torch.long)) for graph, label in val_data]

In [304]:
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=32)
test_loader = DataLoader(test_ds, batch_size=32)



In [305]:
input_dim = 16
hidden_dim = 128
output_dim = 2

model = GCN(input_dim, hidden_dim, output_dim).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adagrad(model.parameters(), lr=0.01)

In [306]:
num_epochs = 100

model.train()
for epoch in range(num_epochs):
    for batch_data in train_loader:
        batch_data = batch_data.to(device)
        optimizer.zero_grad()
        output = model(batch_data)
        loss = criterion(output, batch_data.y)
        loss.backward()
        optimizer.step()

    # Evaluate on the validation set
    model.eval()
    with torch.no_grad():
        val_loss = 0
        val_correct = 0
        val_total = 0
        for val_data in val_loader:
            val_data = val_data.to(device)
            val_output = model(val_data)
            val_loss += criterion(val_output, val_data.y).item()
            _, val_predicted = torch.max(val_output, dim=1)
            val_total += val_data.y.size(0)
            val_correct += (val_predicted == val_data.y).sum().item()

        val_accuracy = 100 * val_correct / val_total
        val_loss /= len(val_loader)

        print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")

Epoch 1/100, Validation Loss: 0.6466, Validation Accuracy: 66.67%
Epoch 2/100, Validation Loss: 0.8597, Validation Accuracy: 33.33%
Epoch 3/100, Validation Loss: 0.7359, Validation Accuracy: 55.56%
Epoch 4/100, Validation Loss: 0.7636, Validation Accuracy: 55.56%
Epoch 5/100, Validation Loss: 0.8136, Validation Accuracy: 55.56%
Epoch 6/100, Validation Loss: 0.7693, Validation Accuracy: 66.67%
Epoch 7/100, Validation Loss: 0.8216, Validation Accuracy: 55.56%
Epoch 8/100, Validation Loss: 0.9596, Validation Accuracy: 33.33%
Epoch 9/100, Validation Loss: 0.7667, Validation Accuracy: 77.78%
Epoch 10/100, Validation Loss: 0.9443, Validation Accuracy: 33.33%
Epoch 11/100, Validation Loss: 0.7963, Validation Accuracy: 77.78%
Epoch 12/100, Validation Loss: 0.9056, Validation Accuracy: 55.56%
Epoch 13/100, Validation Loss: 0.8162, Validation Accuracy: 66.67%
Epoch 14/100, Validation Loss: 0.7817, Validation Accuracy: 66.67%
Epoch 15/100, Validation Loss: 0.7902, Validation Accuracy: 77.78%
Epoc

In [310]:
model.eval()
test_loss = 0
correct = 0

with torch.no_grad():
    for batch_data in train_loader:
        batch_data = batch_data.to(device)
        output = model(batch_data)
        test_loss += criterion(output, batch_data.y).item()
        _, predicted = torch.max(output, 1)
        correct += (predicted == batch_data.y).sum().item()

test_loss /= len(train_loader.dataset)
accuracy = correct / len(train_loader.dataset)

print(f"Test loss: {test_loss}, Accuracy: {accuracy}")

Test loss: 0.009688706317944313, Accuracy: 0.9402985074626866


In [316]:
model_save_path = '/content/drive/MyDrive/Colab Notebooks/EEG_SHCEEEZ-DL/models/gcnn/gcnn_laplace.h5'
torch.save(model.state_dict(), model_save_path)