# In the name of God.

# Final Project

# Epileptic Seizure Prediction Using EEG Signals or Video

In [None]:
# Mount Google Drive to access EEG data
from google.colab import drive
drive.mount('/content/drive')

# Check the contents of the chb directory
import os

# Path
chb_path = '/content/drive/MyDrive/G_Pr/chb'

# List the folders (patients)
print("Available patient folders:")
print(os.listdir(chb_path))

In [None]:
import glob

# List .edf files for each patient
for patient in os.listdir(chb_path):
    patient_folder = os.path.join(chb_path, patient)
    edf_files = sorted(glob.glob(os.path.join(patient_folder, "*.edf")))

    print(f"{patient}: {len(edf_files)} EDF files")
    for edf_file in edf_files[:3]:  # Just show first 3 as preview in each folder
        print("   ", os.path.basename(edf_file))
    print("   ...")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Install necessary packages
!pip install mne matplotlib numpy

# --- Load one EDF file to inspect EEG signals ---

import mne
import matplotlib.pyplot as plt

# Path to one sample EDF file (you can change the patient and file later)
edf_sample_path = os.path.join(chb_path, "chb01", "chb01_01.edf")

# Load the EDF file
raw = mne.io.read_raw_edf(edf_sample_path, preload=True)
print(raw)

# Plot EEG signals (first 30 seconds)
raw.plot(duration=30, n_channels=23, scalings='auto', show=True)

In [None]:
# Load a specific EDF file and plot EEG signals

import mne
import matplotlib.pyplot as plt

# Path to a specific EDF file (e.g., chb01_01.edf from subject chb01)
edf_file = '/content/drive/MyDrive/G_Pr/chb/chb01/chb01_01.edf'

# Load the EEG data from the EDF file with preload=True to load it into memory
raw = mne.io.read_raw_edf(edf_file, preload=True)

# Display basic information about the EEG recording
print(raw.info)

# Plot the first 10 seconds of EEG data using 20 channels and automatic scaling
raw.plot(duration=10, n_channels=20, scalings='auto')

In [None]:
# Extract raw EEG data and timestamps

# Get data and timestamps
eeg_data, times = raw.get_data(return_times=True)

print("EEG data shape:", eeg_data.shape)   # (channels, samples)
print("Timestamps shape:", times.shape)    # (samples,)
print("Sample rate:", raw.info['sfreq'])   # sampling frequency

In [None]:
import os
import mne

# Root directory containing the EEG data
root_dir = "/content/drive/MyDrive/G_Pr/chb"

# Band-pass filter range (1 to 40 Hz)
low_freq = 1.0
high_freq = 40.0

# Get the list of patient directories
patients = sorted([p for p in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, p))])

# Iterate through all patients and their EDF files
for patient in patients:
    patient_path = os.path.join(root_dir, patient)
    edf_files = sorted([f for f in os.listdir(patient_path) if f.endswith('.edf')])

    print(f"\n Patient {patient} - {len(edf_files)} EDF files")

    for edf_file in edf_files:
        edf_path = os.path.join(patient_path, edf_file)
        print(f" Loading {edf_file}...")

        try:
            # Load the raw EDF file
            raw = mne.io.read_raw_edf(edf_path, preload=True, verbose=False)

            # Apply band-pass filter from 1 to 40 Hz
            raw.filter(l_freq=low_freq, h_freq=high_freq, verbose=False)

            # Extract the data and corresponding timestamps
            data, timestamps = raw.get_data(return_times=True)
            print(f" Done. Data shape: {data.shape}")

        except Exception as e:
            print(f" Failed to process {edf_file}: {e}")

In [None]:
import os
import re

def parse_summary_file(summary_path):
    seizure_dict = {}
    current_edf = None

    with open(summary_path, 'r') as f:
        for line in f:
            line = line.strip()

            # Detect EDF file name
            if line.endswith('.edf'):
                current_edf = line
                seizure_dict[current_edf] = []

            # Detect seizure start/end
            elif 'Seizure' in line and 'Start Time' in line:
                start_time = float(re.findall(r'(\d+\.?\d*)', line)[0])
                seizure_dict[current_edf].append({'start': start_time})
            elif 'Seizure' in line and 'End Time' in line:
                end_time = float(re.findall(r'(\d+\.?\d*)', line)[0])
                if seizure_dict[current_edf]:
                    seizure_dict[current_edf][-1]['end'] = end_time

    return seizure_dict

In [None]:
# --- Read seizure info for all patients from their summary files ---

base_dir = "/content/drive/MyDrive/G_Pr/chb"
patient_folders = sorted([f for f in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, f))])

all_seizure_data = {}

for patient_id in patient_folders:
    summary_path = os.path.join(base_dir, patient_id, f"{patient_id}-summary.txt")
    if os.path.exists(summary_path):
        print(f"Parsing: {summary_path}")
        seizure_info = parse_summary_file(summary_path)
        all_seizure_data[patient_id] = seizure_info
    else:
        print(f"Summary file not found for {patient_id}")

print("\n Finished parsing all patients.")

In [None]:
import os
import numpy as np
import pandas as pd
import mne

# Root directory
root_dir = '/content/drive/MyDrive/G_Pr/chb'

# Output list
all_features = []
expected_channels = None  # Number of channels to be used

for patient_folder in os.listdir(root_dir):
    patient_path = os.path.join(root_dir, patient_folder)
    if not os.path.isdir(patient_path):
        continue

    seizures = all_seizure_data.get(patient_folder, [])

    for edf_file in os.listdir(patient_path):
        if not edf_file.endswith('.edf'):
            continue

        edf_path = os.path.join(patient_path, edf_file)

        try:
            raw = mne.io.read_raw_edf(edf_path, preload=True, verbose=False)
            data, times = raw.get_data(return_times=True)
        except Exception as e:
            print(f"Error reading {edf_file}: {e}")
            continue

        # Set expected channel count from first successful file
        if expected_channels is None:
            expected_channels = data.shape[0]

        # Skip files with different number of channels
        if data.shape[0] != expected_channels:
            print(f"Skipping {edf_file} due to mismatched channel count.")
            continue

        # Seizure detection
        contains_seizure = 0
        for item in seizures:
            if not isinstance(item, (list, tuple)) or len(item) != 3:
                print(f" Warning: unexpected seizure entry format: {item}")
                continue
            seizure_file, onset, duration = item
            if seizure_file == edf_file:
                contains_seizure = 1
                break

        # Extract features
        features = []
        for channel in data:
            features.extend([
                np.mean(channel),
                np.std(channel),
                np.sum(channel ** 2) / len(channel),
            ])

        features.append(contains_seizure)
        features.append(patient_folder)
        features.append(edf_file)

        all_features.append(features)

# Create column names
column_names = []
for ch in range(expected_channels):
    column_names.extend([f'ch{ch+1}_mean', f'ch{ch+1}_std', f'ch{ch+1}_energy'])
column_names += ['seizure', 'patient', 'filename']

# Convert to DataFrame
features_df = pd.DataFrame(all_features, columns=column_names)

# Preview
features_df.head()

# Initial setting

In [None]:
import os
import numpy as np
import mne
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pickle

# Parameters
WINDOW_SIZE = 10  # in seconds
SAMPLING_RATE = 256  # Sampling rate for CHB-MIT dataset
WINDOW_SAMPLES = WINDOW_SIZE * SAMPLING_RATE  # Number of samples per segment

EDF_DIR = '/content/chbmit'  # Path to EDF files
ANNOTATION_FILE = 'chb-summary.csv'  # Seizure summary file (default)

# Function for extracting segments from files

In [None]:
def extract_segments(edf_file, seizure_starts, seizure_ends):
    raw = mne.io.read_raw_edf(edf_file, preload=True, verbose=False)
    data = raw.get_data()
    num_samples = data.shape[1]
    num_channels = data.shape[0]

    X = []
    y = []

    # Sliding over the entire file with a fixed-size window
    for start in range(0, num_samples - WINDOW_SAMPLES, WINDOW_SAMPLES):
        end = start + WINDOW_SAMPLES
        segment = data[:, start:end]

        label = 0  # Default: no seizure
        for sz_start, sz_end in zip(seizure_starts, seizure_ends):
            sz_start_sample = int(sz_start * SAMPLING_RATE)
            sz_end_sample = int(sz_end * SAMPLING_RATE)
            if end > sz_start_sample - 256 and start < sz_start_sample:
                label = 1  # Segment just before seizure onset
                break

        X.append(segment)
        y.append(label)

    return np.array(X), np.array(y)

# General function for reading files and creating dataset

In [None]:
def process_all_patients(base_dir, summary_csv):
    summary_df = pd.read_csv(summary_csv)
    all_X = []
    all_y = []

    for _, row in tqdm(summary_df.iterrows(), total=len(summary_df)):
        patient = row['patient']
        filename = row['filename']
        seizures = eval(row['seizure_intervals'])

        seizure_starts = [s[0] for s in seizures]
        seizure_ends = [s[1] for s in seizures]

        full_path = os.path.join(base_dir, patient, filename)
        if not os.path.exists(full_path):
            continue

        X, y = extract_segments(full_path, seizure_starts, seizure_ends)
        all_X.append(X)
        all_y.append(y)

    X = np.concatenate(all_X, axis=0)
    y = np.concatenate(all_y, axis=0)
    return X, y

# Creating csv summary file

In [None]:
import os
import pandas as pd
import re

SUMMARY_FILES = [
    "/content/drive/MyDrive/G_Pr/chb/chb01/chb01-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb02/chb02-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb03/chb03-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb04/chb04-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb05/chb05-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb06/chb06-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb07/chb07-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb08/chb08-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb09/chb09-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb10/chb10-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb11/chb11-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb12/chb12-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb13/chb13-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb14/chb14-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb15/chb15-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb16/chb16-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb17/chb17-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb18/chb18-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb19/chb19-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb20/chb20-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb21/chb21-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb22/chb22-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb23/chb23-summary.txt",
    "/content/drive/MyDrive/G_Pr/chb/chb24/chb24-summary.txt"

]

summary_data = []

for file_path in SUMMARY_FILES:
    with open(file_path, 'r') as file:
        lines = file.readlines()

    patient = os.path.basename(file_path).split('-')[0]
    current_edf = None
    seizure_times = []

    for line in lines:
        line = line.strip().lower()

        if "file name:" in line:
            current_edf = line.split(":")[1].strip()
            seizure_times = []

        elif "seizure start time" in line:
            # Extract number using regex
            start_time = int(re.search(r'\d+', line).group())
            seizure_times.append([start_time, -1])

        elif "seizure end time" in line and seizure_times:
            end_time = int(re.search(r'\d+', line).group())
            seizure_times[-1][1] = end_time

            summary_data.append({
                "patient": patient,
                "file": current_edf,
                "seizure_start": seizure_times[-1][0],
                "seizure_end": seizure_times[-1][1]
            })

df = pd.DataFrame(summary_data)
df.to_csv("chb-summary.csv", index=False)
print(" Summary file created: chb-summary.csv")

In [None]:
# +++
import os
import pandas as pd
import numpy as np
import mne
from sklearn.utils import shuffle
from joblib import dump

EDF_DIR = "/content/drive/MyDrive/G_Pr/chb"
SUMMARY_FILE = "chb-summary.csv"
WINDOW_SIZE = 2560  # 10 seconds at 256 Hz
STEP_SIZE = 2560    # non-overlapping windows

def extract_features_from_edf(edf_path, seizure_intervals):
    raw = mne.io.read_raw_edf(edf_path, preload=True, verbose=False)

    raw_data = raw.get_data()
    sfreq = int(raw.info['sfreq'])  # typically 256 Hz

    seizure_data = []
    non_seizure_data = []

    n_channels, n_samples = raw_data.shape

    for start in range(0, n_samples - WINDOW_SIZE, STEP_SIZE):
        end = start + WINDOW_SIZE
        window = raw_data[:, start:end]

        window_start_sec = start // sfreq
        window_end_sec = end // sfreq

        label = 0  # non-seizure by default
        for s_start, s_end in seizure_intervals:
            if (window_start_sec < s_end and window_end_sec > s_start):
                label = 1
                break

        if label == 1:
            seizure_data.append(window)
        else:
            non_seizure_data.append(window)

    # Balance classes
    min_len = min(len(seizure_data), len(non_seizure_data))
    seizure_data = seizure_data[:min_len]
    non_seizure_data = non_seizure_data[:min_len]

    X = np.array(seizure_data + non_seizure_data)
    y = np.array([1]*len(seizure_data) + [0]*len(non_seizure_data))

    return X, y

def build_dataset_from_summary(summary_file, edf_dir):
    df = pd.read_csv(summary_file)
    all_X = []
    all_y = []

    expected_shape = None

    for i, row in df.iterrows():
        edf_path = os.path.join(edf_dir, row["patient"], row["file"])
        if not os.path.exists(edf_path):
            print(f"File not found: {edf_path}")
            continue

        if row["seizure_start"] == -1:
            seizure_intervals = []
        else:
            seizure_intervals = [(int(row["seizure_start"]), int(row["seizure_end"]))]

        try:
            X, y = extract_features_from_edf(edf_path, seizure_intervals)
        except Exception as e:
            print(f"Error processing {edf_path}: {e}")
            continue

        # Only Keep files with same channels
        if expected_shape is None:
            expected_shape = X.shape[1:]
        elif X.shape[1:] != expected_shape:
            print(f"Skipping due to shape mismatch: {edf_path}")
            continue

        all_X.append(X)
        all_y.append(y)

    X = np.concatenate(all_X, axis=0)
    y = np.concatenate(all_y, axis=0)
    X, y = shuffle(X, y, random_state=42)

    return X, y

# Run and save
X, y = build_dataset_from_summary(SUMMARY_FILE, EDF_DIR)
print("Final dataset shape:", X.shape, y.shape)

# Save using joblib
dump((X, y), "eeg_dataset.joblib")
print("Dataset saved as eeg_dataset.joblib")

In [None]:
import joblib
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import shuffle

# Load the dataset
X, y = joblib.load('/content/eeg_dataset.joblib')

# Ensure the data type is float32 for compatibility with Keras models
X = X.astype(np.float32)
y = y.astype(np.int32)

# Shuffle the data
X, y = shuffle(X, y, random_state=42)

# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print("Train shape:", X_train.shape)
print("Test shape:", X_test.shape)

# CNN Model

In [None]:
print("Train class distribution:", np.bincount(y_train))
print("Test class distribution:", np.bincount(y_test))

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.regularizers import l2
from sklearn.metrics import classification_report, accuracy_score, roc_auc_score

# === Reshape EEG input to [samples, time, channels] ===
X_train_reshaped = X_train.reshape((X_train.shape[0], X_train.shape[2], X_train.shape[1]))
X_test_reshaped = X_test.reshape((X_test.shape[0], X_test.shape[2], X_test.shape[1]))

# === Normalize using standardization ===
mean = np.mean(X_train_reshaped, axis=(0, 1), keepdims=True)
std = np.std(X_train_reshaped, axis=(0, 1), keepdims=True)
X_train_norm = (X_train_reshaped - mean) / (std + 1e-7)
X_test_norm = (X_test_reshaped - mean) / (std + 1e-7)

# === Build optimized 1D-CNN model (simplified & regularized) ===
model = Sequential([
    Input(shape=(X_train_norm.shape[1], X_train_norm.shape[2])),

    Conv1D(16, kernel_size=7, activation='relu', kernel_regularizer=l2(0.002)),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    Conv1D(32, kernel_size=5, activation='relu', kernel_regularizer=l2(0.002)),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    Conv1D(64, kernel_size=3, activation='relu', kernel_regularizer=l2(0.002)),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    Flatten(),
    Dropout(0.6),

    Dense(64, activation='relu', kernel_regularizer=l2(0.002)),
    BatchNormalization(),
    Dropout(0.5),

    Dense(1, activation='sigmoid')
])

# === Compile the model ===
model.compile(optimizer=Adam(learning_rate=0.0003),
              loss='binary_crossentropy',
              metrics=['accuracy'])

# === Callbacks for improved training ===
early_stop = EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True, verbose=1)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6, verbose=1)

# Save the best model during training
checkpoint = ModelCheckpoint("best_model.keras", monitor='val_loss',
                             save_best_only=True, mode='min', verbose=1)

# === Train the model ===
history = model.fit(X_train_norm, y_train,
                    epochs=100,
                    batch_size=32,
                    validation_split=0.25,
                    verbose=1,
                    callbacks=[early_stop, reduce_lr, checkpoint])

# === Load best weights if needed (optional after training) ===
model.load_weights("best_model.keras")

In [None]:
# === Predict probabilities (sigmoid output) ===
y_pred = model.predict(X_test_norm).ravel()  # Probabilities (values between 0 and 1)

# === Convert probabilities to binary class predictions ===
y_pred_binary = (y_pred >= 0.5).astype(int)

# === Evaluation Metrics ===
from sklearn.metrics import classification_report, accuracy_score, roc_auc_score

print("\nClassification Report:\n")
print(classification_report(y_test, y_pred_binary))
print("Accuracy Score:", accuracy_score(y_test, y_pred_binary))
print("ROC AUC Score:", roc_auc_score(y_test, y_pred))  # Not y_pred_binary

In [None]:
import matplotlib.pyplot as plt

# Plot training & validation accuracy values
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
!pip install mne --quiet
!pip install matplotlib numpy --quiet

In [None]:
import mne
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# Mount Google Drive to access EEG data
from google.colab import drive
drive.mount('/content/drive')

# Check the contents of the chb directory
import os

In [None]:
# Path to the EEG file with a seizure
seizure_file = "/content/drive/MyDrive/G_Pr/chb/chb01/chb01_15.edf"

# Path to the normal EEG file
normal_file = "/content/drive/MyDrive/G_Pr/chb/chb01/chb01_09.edf"

# Load the EEG file with a seizure
raw_seizure = mne.io.read_raw_edf(seizure_file, preload=True)

# Load the normal EEG file
raw_normal = mne.io.read_raw_edf(normal_file, preload=True)

In [None]:
# Display information about the seizure EEG file
raw_seizure.info

# Plot the first few seconds of the seizure EEG file for inspection
raw_seizure.plot(n_channels=10, duration=10, scalings='auto')

In [None]:
def segment_raw_data(raw, segment_duration_sec=5):
    """
    Segments raw EEG data into fixed-duration windows.

    Parameters:
        raw (mne.io.Raw): Raw EEG data
        segment_duration_sec (int): Duration of each segment in seconds

    Returns:
        List of numpy arrays, each representing a segment
    """
    sampling_rate = int(raw.info['sfreq'])
    segment_samples = segment_duration_sec * sampling_rate
    data, _ = raw[:]
    num_segments = data.shape[1] // segment_samples
    segments = []

    for i in range(num_segments):
        start = i * segment_samples
        end = start + segment_samples
        segment = data[:, start:end]
        if segment.shape[1] == segment_samples:
            segments.append(segment)

    return segments

In [None]:
# Segment both seizure and normal data into 5-second chunks
seizure_segments = segment_raw_data(raw_seizure, segment_duration_sec=5)
normal_segments = segment_raw_data(raw_normal, segment_duration_sec=5)

print(f"Number of seizure segments: {len(seizure_segments)}")
print(f"Number of normal segments: {len(normal_segments)}")

In [None]:
import mne

# Load the EDF file (you can change to your own path)
raw = mne.io.read_raw_edf("/content/drive/MyDrive/G_Pr/chb/chb01/chb01_15.edf", preload=True)

# Print the list of EEG channel names
print("Channel names in this EDF file:")
print(raw.ch_names)

# Creating Spectogram image from segments

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import spectrogram
from tqdm import tqdm

def save_multi_channel_spectrograms(segments, raw_info, label, output_dir):
    selected_channels = ['FP1-F3', 'F7-T7', 'T7-P7']
    channel_indices = [raw_info['ch_names'].index(ch) for ch in selected_channels if ch in raw_info['ch_names']]

    label_dir = os.path.join(output_dir, label)
    os.makedirs(label_dir, exist_ok=True)

    for i, segment in tqdm(enumerate(segments), total=len(segments)):
        selected_signals = [segment[idx] for idx in channel_indices]
        combined_signal = np.mean(selected_signals, axis=0)

        f, t, Sxx = spectrogram(combined_signal, fs=256)

        plt.figure(figsize=(2.24, 2.24))
        plt.pcolormesh(t, f, 10 * np.log10(Sxx), shading='gouraud')
        plt.axis('off')
        plt.tight_layout(pad=0)
        filename = os.path.join(label_dir, f'{label}_{i}.png')
        plt.savefig(filename, dpi=100, bbox_inches='tight', pad_inches=0)
        plt.close()

In [None]:
save_multi_channel_spectrograms(seizure_segments, raw.info, label='seizure', output_dir='/content/eeg_spectrograms')
save_multi_channel_spectrograms(normal_segments, raw.info, label='normal', output_dir='/content/eeg_spectrograms')

In [None]:
!pip install opencv-python

In [None]:
import cv2
import os
from natsort import natsorted  # For numerical sorting of files

# Path to the folder where spectrogram images are stored
image_folder = '/content/eeg_spectrograms/normal/'

# Output video file name
video_name = 'eeg_spectrogram_video.avi'

# Get the list of images and sort them numerically
images = natsorted([img for img in os.listdir(image_folder) if img.endswith(".png")])

# Read the dimensions of the first image to set the video size
first_frame = cv2.imread(os.path.join(image_folder, images[0]))
height, width, _ = first_frame.shape

# Define the output video using OpenCV
video_writer = cv2.VideoWriter(video_name, cv2.VideoWriter_fourcc(*'XVID'), 5, (width, height))

# Add images to the video
for image in images:
    img_path = os.path.join(image_folder, image)
    frame = cv2.imread(img_path)
    video_writer.write(frame)

video_writer.release()
print(" Video successfully created:", video_name)

In [None]:
from google.colab.patches import cv2_imshow
import cv2

cap = cv2.VideoCapture('eeg_spectrogram_video.avi')

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    cv2_imshow(frame)
    # Pause to view the frame - waitKey doesn't work in Colab, so we use sleep instead
    import time
    time.sleep(0.5)  # 0.5 seconds between each frame

cap.release()

In [None]:
# Convert video from AVI to MP4 using ffmpeg
!ffmpeg -i eeg_spectrogram_video.avi -vcodec libx264 eeg_spectrogram_video.mp4

In [None]:
from IPython.display import HTML
from base64 import b64encode

mp4_file = "eeg_spectrogram_video.mp4"

# Read the video and convert it to HTML for playback inside Colab
mp4 = open(mp4_file, 'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML(f"""
<video width=600 controls>
    <source src="{data_url}" type="video/mp4">
</video>
""")

# Creating Dataset and Training CNN with PyTorch

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from PIL import Image
import os

# Definition of Customized EEG dataset

In [None]:
class EEGDataset(Dataset):
    def __init__(self, image_dir, label, transform=None):
        self.image_dir = image_dir
        self.label = label
        self.transform = transform
        self.image_paths = [os.path.join(image_dir, fname) for fname in os.listdir(image_dir) if fname.endswith('.png')]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, self.label

In [None]:
# Transformations (resize to 224x224 and normalize for ResNet)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5]*3, std=[0.5]*3)
])

# Create datasets
seizure_dataset = EEGDataset('/content/eeg_spectrograms/seizure', label=1, transform=transform)
normal_dataset = EEGDataset('/content/eeg_spectrograms/normal', label=0, transform=transform)

# Combine and split
full_dataset = seizure_dataset + normal_dataset
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# Use from Pretrained ResNet18 dataset

In [None]:
import torch.nn as nn
from torchvision import models

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

base_model = models.resnet18(pretrained=True)

# Freeze all layers except the last few (Optional)
for param in base_model.parameters():
    param.requires_grad = False

# Replace the fully connected layer with one including dropout
base_model.fc = nn.Sequential(
    nn.Dropout(p=0.5),              # Dropout 50%
    nn.Linear(base_model.fc.in_features, 2)
)

model = base_model.to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
import copy
import torch
import torch.nn as nn

num_epochs = 50  # Increase the number of epochs since we have EarlyStopping

# Set up Early Stopping
patience = 5
best_val_loss = float('inf')
best_model_wts = copy.deepcopy(model.state_dict())
epochs_no_improve = 0

# Add weight decay to optimizer (L2 Regularization)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)

# Scheduler to reduce LR when validation loss stops improving
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=2, factor=0.5, verbose=True)

train_losses, val_losses = [], []
train_accuracies, val_accuracies = [], []

for epoch in range(num_epochs):
    # ----------- Training ----------
    model.train()
    total_train_loss = 0
    correct_train = 0
    total_train = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

    avg_train_loss = total_train_loss / len(train_loader)
    train_accuracy = 100 * correct_train / total_train
    train_losses.append(avg_train_loss)
    train_accuracies.append(train_accuracy)

    # ----------- Validation ----------
    model.eval()
    total_val_loss = 0
    correct_val = 0
    total_val = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            total_val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()

    avg_val_loss = total_val_loss / len(val_loader)
    val_accuracy = 100 * correct_val / total_val
    val_losses.append(avg_val_loss)
    val_accuracies.append(val_accuracy)

    scheduler.step(avg_val_loss)

    print(f"Epoch [{epoch+1}/{num_epochs}] "
          f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_accuracy:.2f}% "
          f"| Val Loss: {avg_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")

    # -------- Early Stopping --------
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        best_model_wts = copy.deepcopy(model.state_dict())
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print(" Early stopping activated.")
            break

# Load the best weights
model.load_state_dict(best_model_wts)

In [None]:
import matplotlib.pyplot as plt

# Loss
plt.figure(figsize=(10,4))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()

# Accuracy
plt.figure(figsize=(10,4))
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Validation Accuracy')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.grid(True)
plt.show()