In [5]:
import os
os.environ["KERAS_BACKEND"] = "jax" # you can also use tensorflow or torch

import keras_cv
import keras
from keras import ops
import tensorflow as tf

import cv2
import pandas as pd
import numpy as np
from glob import glob
from tqdm.notebook import tqdm
import joblib

import matplotlib.pyplot as plt 

In [6]:
class CFG:
    verbose = 1  # Verbosity
    seed = 42  # Random seed
    preset = "efficientnetv2_b2_imagenet"  # Name of pretrained classifier
    image_size = [400, 300]  # Input image size
    epochs = 13 # Training epochs
    batch_size = 64  # Batch size
    lr_mode = "cos" # LR scheduler mode from one of "cos", "step", "exp"
    drop_remainder = True  # Drop incomplete batches
    num_classes = 6 # Number of classes in the dataset
    fold = 0 # Which fold to set as validation data
    class_names = ['Seizure', 'LPD', 'GPD', 'LRDA','GRDA', 'Other']
    label2name = dict(enumerate(class_names))
    name2label = {v:k for k, v in label2name.items()}

In [7]:
keras.utils.set_random_seed(CFG.seed)

In [9]:
# Load the test dataset
test_df = pd.read_csv("/kaggle/input/test-eeg/test_data_eeg.csv")

In [11]:
BASE_PATH = "/kaggle/input/hms-harmful-brain-activity-classification"

EEG_DIR = "/tmp/dataset/hms-hbac"
os.makedirs(EEG_DIR+'/train_eegs', exist_ok=True)
os.makedirs(EEG_DIR+'/test_eegs', exist_ok=True)

In [13]:
# Define a function to process a single eeg_id
def process_eeg(eeg_id, split="train"):
    eeg_path = f"{BASE_PATH}/{split}_eegs/{eeg_id}.parquet"
    eeg = pd.read_parquet(eeg_path)
    eeg = eeg.fillna(0).values[:, 1:].T # fill NaN values with 0, transpose for (Time, Freq) -> (Freq, Time)
    eeg = eeg.astype("float32")
    np.save(f"{EEG_DIR}/{split}_eegs/{eeg_id}.npy", eeg)

# Get unique spec_ids of train and valid data
eeg_ids = test_df["eeg_id"].unique()

# Parallelize the processing using joblib for training data
_ = joblib.Parallel(n_jobs=-1, backend="loky")(
    joblib.delayed(process_eeg)(eeg_id, "train")
    for eeg_id in tqdm(eeg_ids, total=len(eeg_ids))
)

  0%|          | 0/5678 [00:00<?, ?it/s]

In [14]:
def build_augmenter(dim=CFG.image_size):
    augmenters = [
        keras_cv.layers.MixUp(alpha=2.0),
        keras_cv.layers.RandomCutout(height_factor=(1.0, 1.0),
                                     width_factor=(0.06, 0.1)), # freq-masking
        keras_cv.layers.RandomCutout(height_factor=(0.06, 0.1),
                                     width_factor=(1.0, 1.0)), # time-masking
    ]
    
    def augment(img, label):
        data = {"images":img, "labels":label}
        for augmenter in augmenters:
            if tf.random.uniform([]) < 0.5:
                data = augmenter(data, training=True)
        return data["images"], data["labels"]
    
    return augment


def build_decoder(with_labels=True, target_size=CFG.image_size, dtype=32):
    def decode_signal(path, offset=None):
        # Read .npy files and process the signal
        file_bytes = tf.io.read_file(path)
        sig = tf.io.decode_raw(file_bytes, tf.float32)
        sig = sig[1024//dtype:]  # Remove header tag
        sig = tf.reshape(sig, [400, -1])
        
        # Extract labeled subsample from full spectrogram using "offset"
        if offset is not None: 
            offset = offset // 2  # Only odd values are given
            sig = sig[:, offset:offset+300]
            
            # Pad spectrogram to ensure the same input shape of [400, 300]
            pad_size = tf.math.maximum(0, 300 - tf.shape(sig)[1])
            sig = tf.pad(sig, [[0, 0], [0, pad_size]])
            sig = tf.reshape(sig, [400, 300])
        
        # Log spectrogram 
        sig = tf.clip_by_value(sig, tf.math.exp(-4.0), tf.math.exp(8.0)) # avoid 0 in log
        sig = tf.math.log(sig)
        
        # Normalize spectrogram
        sig -= tf.math.reduce_mean(sig)
        sig /= tf.math.reduce_std(sig) + 1e-6
        
        # Mono channel to 3 channels to use "ImageNet" weights
        sig = tf.tile(sig[..., None], [1, 1, 3])
        return sig
    
    def decode_label(label):
        label = tf.one_hot(label, CFG.num_classes)
        label = tf.cast(label, tf.float32)
        label = tf.reshape(label, [CFG.num_classes])
        return label
    
    def decode_with_labels(path, offset=None, label=None):
        sig = decode_signal(path, offset)
        label = decode_label(label)
        return (sig, label)
    
    return decode_with_labels if with_labels else decode_signal


def build_dataset(paths, offsets=None, labels=None, batch_size=32, cache=True,
                  decode_fn=None, augment_fn=None,
                  augment=False, repeat=True, shuffle=1024, 
                  cache_dir="", drop_remainder=False):
    if cache_dir != "" and cache is True:
        os.makedirs(cache_dir, exist_ok=True)
    
    if decode_fn is None:
        decode_fn = build_decoder(labels is not None)
    
    if augment_fn is None:
        augment_fn = build_augmenter()
    
    AUTO = tf.data.experimental.AUTOTUNE
    slices = (paths, offsets) if labels is None else (paths, offsets, labels)
    
    ds = tf.data.Dataset.from_tensor_slices(slices)
    ds = ds.map(decode_fn, num_parallel_calls=AUTO)
    ds = ds.cache(cache_dir) if cache else ds
    ds = ds.repeat() if repeat else ds
    if shuffle: 
        ds = ds.shuffle(shuffle, seed=CFG.seed)
        opt = tf.data.Options()
        opt.experimental_deterministic = False
        ds = ds.with_options(opt)
    ds = ds.batch(batch_size, drop_remainder=drop_remainder)
    ds = ds.map(augment_fn, num_parallel_calls=AUTO) if augment else ds
    ds = ds.prefetch(AUTO)
    return ds

In [15]:
# Test
test_paths = test_df.eeg2_path.values
test_offsets = test_df.eeg_label_offset_seconds.values.astype(int)
test_labels = test_df.class_label.values
test_ds = build_dataset(test_paths, test_offsets, test_labels, batch_size=CFG.batch_size,
                         repeat=False, shuffle=False, augment=False, cache=True)

In [17]:
# Load the model from the uploaded dataset
model = keras.models.load_model('/kaggle/input/best-model-eeg/best_model_eeg.keras', compile=True)

In [18]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Predict probabilities for each class
y_pred_probs = model.predict(test_ds)

# Convert probabilities to class labels
y_pred = np.argmax(y_pred_probs, axis=1)

# Calculate accuracy
accuracy = accuracy_score(test_labels, y_pred)
print("Test Accuracy:", accuracy)

# Calculate precision
precision = precision_score(test_labels, y_pred, average='weighted')
print("Precision:", precision)

# Calculate recall
recall = recall_score(test_labels, y_pred, average='weighted')
print("Recall:", recall)

# Calculate F1 score
f1 = f1_score(test_labels, y_pred, average='weighted')
print("F1 Score:", f1)


[1m167/167[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m90s[0m 505ms/step
Test Accuracy: 0.3008426966292135
Precision: 0.4611666718845289
Recall: 0.3008426966292135
F1 Score: 0.2575258788372279
