In [60]:
import pandas as pd
import numpy as np
import torch
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import ConfusionMatrixDisplay
from tensorflow.keras.utils import to_categorical
import pickle
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import torch.nn as nn


In [20]:
TARGETS = ['seizure_vote','lpd_vote','gpd_vote','lrda_vote','grda_vote','other_vote']
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
CHUNK_SIZE = 20000
MODEL_PATH = "/kaggle/input/dec11/pytorch/default/1/lstm_model(2).pth"
FILE_PATH = "/kaggle/input/hms-harmful-brain-activity-classification/"
TEST_FOLDER = '/kaggle/input/hms-harmful-brain-activity-classification/test_eegs/'
RESUME = False
LAST_ITER = 1
FEATURES = ['Fp1', 'T3', 'C3', 'O1', 'Fp2', 'C4', 'T4', 'O2']
FILE_PATH = "/kaggle/input/hms-harmful-brain-activity-classification/"
SPECTROGRAM_FOLDER = os.path.join(FILE_PATH, "train_spectrograms")
TRAIN_CSV = os.path.join(FILE_PATH, "train.csv")

df = pd.read_csv(TRAIN_CSV)
train_df_temp, test_df = train_test_split(df, test_size=0.1, random_state=1)
train_df = [train_df_temp[i:i + CHUNK_SIZE] for i in range(0, train_df_temp.shape[0], CHUNK_SIZE)]

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))

        out = self.fc(out[:, -1, :]) 
        return out
        
def butter_lowpass_filter(data, cutoff_freq=20, sampling_rate=200, order=4):
    nyquist = 0.5 * sampling_rate
    normal_cutoff = cutoff_freq / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    filtered_data = lfilter(b, a, data, axis=0)
    return filtered_data

def load_eeg_data(eeg_id, base_path='/kaggle/input/hms-harmful-brain-activity-classification/train_eegs/'):
    file_path = os.path.join(base_path, f"{eeg_id}.parquet")
    if os.path.exists(file_path):
        data = pd.read_parquet(file_path)
        return data
    else:
        raise FileNotFoundError(f"File {file_path} not found.")
        
def preprocess_eeg_sample(eeg_data, start_row, end_row):
    eeg_sample = eeg_data[FEATURES].iloc[start_row:end_row, :]
    eeg_sample = eeg_sample.fillna(eeg_sample.mean())
    eeg_sample = butter_lowpass_filter(eeg_sample.values, cutoff_freq=20, sampling_rate=200)
    
    scaler = StandardScaler()
    eeg_sample_normalized = scaler.fit_transform(eeg_sample)
    
    return eeg_sample_normalized

def accuracy(predictions, labels):
    classes = torch.argmax(predictions, dim=1)
    return torch.mean((classes == labels).float())


In [3]:
print("full dataset:")
print(df['expert_consensus'].value_counts())
print("\ntraining dataset:")
print(train_df_temp['expert_consensus'].value_counts())
print("\ntest dataset:")
print(test_df['expert_consensus'].value_counts())

full dataset:
expert_consensus
Seizure    20933
GRDA       18861
Other      18808
GPD        16702
LRDA       16640
LPD        14856
Name: count, dtype: int64

training dataset:
expert_consensus
Seizure    18784
GRDA       17008
Other      16883
GPD        15043
LRDA       14966
LPD        13436
Name: count, dtype: int64

test dataset:
expert_consensus
Seizure    2149
Other      1925
GRDA       1853
LRDA       1674
GPD        1659
LPD        1420
Name: count, dtype: int64


In [None]:
for i, part_df in enumerate(train_df):
    # part_df = part_df.drop_duplicates(subset='eeg_id', keep='first')
    if i < LAST_ITER:
        continue
    X = []
    y = []
    count = 0
    for _, row in part_df.iterrows():
        count += 1
        eeg_id = row['eeg_id']
        label = row['expert_consensus'] 
        offset_seconds = row['eeg_label_offset_seconds']
        eeg_data = load_eeg_data(eeg_id) 

        start_row = int(offset_seconds * 200)  # 200 Hz
        end_row = start_row + (50 * 200)
        
        eeg_sample = preprocess_eeg_sample(eeg_data, start_row, end_row)
        
        X.append(eeg_sample)
        y.append(label)

    X = np.array(X) 
    y = np.array(y) 
    
    label_encoder = LabelEncoder()
    y_encoded = label_encoder.fit_transform(y) 
    y_onehot = to_categorical(y_encoded)
    
    input_size = X.shape[2]
    hidden_size = 128
    num_layers = 2
    num_classes = y_onehot.shape[1]
    batch_size = 16
    learning_rate = 0.001
    num_epochs = 10
    
    X_tensor = torch.tensor(X, dtype=torch.float32)
    y_tensor = torch.tensor(y_encoded, dtype=torch.long)

    dataset = TensorDataset(X_tensor, y_tensor)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    model = LSTMModel(input_size, hidden_size, num_layers, num_classes)
    if (os.path.exists(MODEL_PATH)) and RESUME:
        model.load_state_dict(torch.load(MODEL_PATH, weights_only=True))
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    model.to(DEVICE)
    train_losses = []
    train_acc = []
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        running_accuracy = 0.00
        for batch_X, batch_y in dataloader:
            batch_X, batch_y = batch_X.to(DEVICE), batch_y.to(DEVICE)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            running_accuracy += accuracy(outputs, batch_y)
            
        running_loss /= len(dataloader)
        running_accuracy /= len(dataloader)
        running_accuracy = running_accuracy.cpu().item() if isinstance(running_accuracy, torch.Tensor) else running_accuracy
        train_losses.append(running_loss)
        train_acc.append(running_accuracy)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss:.4f}, Accuracy: {running_accuracy:.4f}")
    # plt.plot(range(1, num_epochs+1), train_acc, label='Accuracy')
    # plt.xlabel('Epoch')
    # plt.ylabel('Accuracy')
    # plt.title('Accuracy vs Epoch')
    # plt.legend()
    # plt.grid()
    # plt.show()
    torch.save(model.state_dict(), "lstm_model.pth")

done reading, training now
done converting X to np
done converting Y to np
Shape of X (features): (20000, 10000, 8)
Shape of y (labels): (20000, 6)
Epoch [1/10], Loss: 1.7827, Accuracy: 0.2012


In [11]:
from sklearn.metrics import accuracy_score, precision_score, auc, recall_score, f1_score
from torch.utils.data import DataLoader, TensorDataset
import numpy as np

def compute_metrics(y_true, y_pred, average='weighted'):
    accuracy = accuracy_score(y_true, y_pred,)
    precision = precision_score(y_true, y_pred, average=average)
    recall = recall_score(y_true, y_pred, average=average)
    f1 = f1_score(y_true, y_pred, average=average)
    return accuracy, precision, recall, f1

Compute metrics

In [22]:
X = []
y = []
count = 0
test_df = test_df.drop_duplicates(subset='eeg_id', keep='first')
for _, row in test_df.iterrows():
    count += 1
    eeg_id = row['eeg_id']
    label = row['expert_consensus'] 
    offset_seconds = row['eeg_label_offset_seconds']
    eeg_data = load_eeg_data(eeg_id) 

    start_row = int(offset_seconds * 200) 
    end_row = start_row + (50 * 200) 
    
    eeg_sample = preprocess_eeg_sample(eeg_data, start_row, end_row)
    
    X.append(eeg_sample)
    y.append(label)

X = np.array(X) 
y = np.array(y) 

label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y) 
y_onehot = to_categorical(y_encoded)

input_size = X.shape[2]
hidden_size = 128
num_layers = 2
num_classes = y_onehot.shape[1]
batch_size = 16
learning_rate = 0.001
num_epochs = 10

if not RESUME:
    model = LSTMModel(input_size, hidden_size, num_layers, num_classes).to(DEVICE)
    if os.path.exists(MODEL_PATH):
        model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    model.eval()

X_tensor = torch.tensor(X, dtype=torch.float32).to(DEVICE)
y_tensor = torch.tensor(y_encoded, dtype=torch.long).to(DEVICE)

dataset = TensorDataset(X_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

print("Shape of X (features):", X.shape)
print("Shape of y (labels):", y_onehot.shape)

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch_X, batch_y in dataloader:
        batch_X = batch_X.to(DEVICE)
        outputs = model(batch_X)
        _, predicted = torch.max(outputs, 1)

        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(batch_y.cpu().numpy())



  model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))


Shape of X (features): (5703, 10000, 8)
Shape of y (labels): (5703, 6)


In [25]:
accuracy, precision, recall, f1 = compute_metrics(all_labels, all_preds, average='weighted')
print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

Accuracy: 0.3051, Precision: 0.2571, Recall: 0.3051, F1 Score: 0.1833


  _warn_prf(average, modifier, msg_start, len(result))


In [56]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
from sklearn.model_selection import train_test_split

class NetA(nn.Module):
    def __init__(self):
        super(NetA, self).__init__()
        self.a1 = nn.Conv2d(3, 32, 3) 
        self.a2 = nn.Conv2d(32, 64, 3) 
        self.b1 = nn.Linear(64 * 14 * 14, 128)

    def forward(self, x):
        x = F.relu(self.a1(x)) 
        x = F.max_pool2d(x, 2) 
        x = F.relu(self.a2(x)) 
        x = F.max_pool2d(x, 2) 
        x = torch.flatten(x, 1) 
        x = F.relu(self.b1(x)) 
        return x

class CNN_LSTM(nn.Module):
    def __init__(self, cnn_model, lstm_model):
        super(CNN_LSTM, self).__init__()
        self.cnn = cnn_model
        self.lstm = lstm_model

    def forward(self, x):
        batch_size, c, h, w = x.size()
        timesteps = 1 

        cnn_out = []
        for t in range(timesteps):
            cnn_out.append(self.cnn(x[:, t, :, :]))  
        cnn_out = torch.stack(cnn_out, dim=1)

        out = self.lstm(cnn_out)  
        return out


cnn_model = NetA() 
lstm_model = LSTMModel(input_size=128, hidden_size=64, num_layers=2, num_classes=6)  
combined_model = CNN_LSTM(cnn_model, lstm_model)

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
combined_model = combined_model.to(DEVICE)

optimizer = torch.optim.Adam(combined_model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [None]:
# df = pd.read_csv(TRAIN_CSV)

# df["spectrogram_path"] = df["spectrogram_id"].apply(lambda x: os.path.join(SPECTROGRAM_FOLDER, f"{x}.parquet"))

# train_df, val_df = train_test_split(df, test_size=0.1, random_state=1)

# class EEGDataset(Dataset):
#     def __init__(self, dataframe, transform=None):
#         self.dataframe = dataframe
#         self.transform = transform
#         self.eps = 1e-6

#         self.targets = dataframe['expert_consensus'].apply(pd.to_numeric, errors='coerce').fillna(0)

#         label_encoder = LabelEncoder()
#         self.encoded_targets = label_encoder.fit_transform(self.targets)
#         self.num_classes = len(np.unique(self.encoded_targets))

#     def __len__(self):
#         return len(self.dataframe)

#     def __getitem__(self, idx):
#         row = self.dataframe.iloc[idx]
#         spectrogram_path = row["spectrogram_path"]
#         spectrogram_data = self._process_spectrogram(spectrogram_path)
        
#         target = torch.tensor(self.encoded_targets[idx], dtype=torch.long)
#         return spectrogram_data, target

#     def _process_spectrogram(self, path):
#         data = pd.read_parquet(path)
#         data = data.fillna(-1).values[:, 1:].T 
#         data = np.clip(data, np.exp(-6), np.exp(10)) 
        
#         spectrogram_tensor = torch.unsqueeze(torch.tensor(data, dtype=torch.float32), dim=0) 
#         if self.transform:
#             spectrogram_tensor = self.transform(spectrogram_tensor)
#         return spectrogram_tensor

# class ResizeTransform:
#     def __init__(self, size=(64, 64)):
#         self.size = size

#     def __call__(self, tensor):
#         return transforms.functional.resize(tensor, self.size)

# transform = transforms.Compose([
#     ResizeTransform(size=(64, 64)) 
# ])

# train_dataset = EEGDataset(train_df, transform=transform)
# val_dataset = EEGDataset(val_df, transform=transform)

# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
# val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2)

# cnn_model = NetA()
# lstm_model = LSTMModel(input_size=128, hidden_size=64, num_layers=2, num_classes=train_dataset.num_classes)
# combined_model = CNN_LSTM(cnn_model, lstm_model)
# combined_model.to(DEVICE)

# optimizer = torch.optim.Adam(combined_model.parameters(), lr=0.001)
# criterion = nn.BCEWithLogitsLoss() 

# num_epochs = 10
# true_labels = []
# predictions = []

# for epoch in range(num_epochs):
#     combined_model.train()
#     running_loss = 0.0

#     for images, targets in train_loader:
#         images, targets = images.to(DEVICE), targets.to(DEVICE)
#         optimizer.zero_grad()
#         outputs = combined_model(images)
#         loss = criterion(outputs, targets.float())

#         loss.backward()
#         optimizer.step()

#         running_loss += loss.item()

#     print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss / len(train_loader)}")


# combined_model.eval()
# val_loss = 0.0
# with torch.no_grad():
#     for images, targets in val_loader:
#         images, targets = images.to(DEVICE), targets.to(DEVICE)

#         outputs = combined_model(images)
#         loss = criterion(outputs, targets.float()) 
#         val_loss += loss.item()

#         true_labels.append(targets.cpu().numpy())
#         predictions.append((outputs > 0.5).cpu().numpy()) 


# true_labels = np.concatenate(true_labels, axis=0)
# predictions = np.concatenate(predictions, axis=0)