In [4]:
 %pip install torch torchvision torchaudio

Note: you may need to restart the kernel to use updated packages.


In [5]:
%pip install --upgrade pip

Note: you may need to restart the kernel to use updated packages.


In [None]:
import numpy as np

In [1]:
import torch

# Load the dataset
data = torch.load('eeg_signals_raw_with_mean_std.pth')

# Inspect keys and shapes
print(type(data))
print(data.keys())

<class 'dict'>
dict_keys(['dataset', 'labels', 'images', 'means', 'stddevs'])


In [2]:
import numpy as np

eeg_data = np.array(data['dataset'])  # Now this is a NumPy array
labels = np.array(data['labels'])
means = np.array(data['means'])
stds = np.array(data['stddevs'])

  means = np.array(data['means'])
  stds = np.array(data['stddevs'])


In [None]:
print(type(eeg_data[0]))        
print(eeg_data[0].keys())

<class 'dict'>
dict_keys(['eeg', 'image', 'label', 'subject'])


In [4]:
for i in range(5):
    print(f"Sample {i} shape: {np.array(eeg_data[i]['eeg']).shape}")

Sample 0 shape: (128, 500)
Sample 1 shape: (128, 511)
Sample 2 shape: (128, 532)
Sample 3 shape: (128, 492)
Sample 4 shape: (128, 511)


  print(f"Sample {i} shape: {np.array(eeg_data[i]['eeg']).shape}")


In [None]:
target_length = 532  # Adjust based on max observed length

eeg_signals_fixed = []
for sample in eeg_data:
    eeg = np.array(sample['eeg'])  # shape: [128, T]

    if eeg.shape[1] < target_length:
        # Pad with zeros to reach target length
        pad_width = target_length - eeg.shape[1]
        eeg = np.pad(eeg, ((0, 0), (0, pad_width)), mode='constant')
    elif eeg.shape[1] > target_length:
        # Truncate to target length (not needed in your case, but safe)
        eeg = eeg[:, :target_length]
        
    eeg_signals_fixed.append(eeg)

 
eeg_data_array = np.stack(eeg_signals_fixed)   
print("Final EEG array shape:", eeg_data_array.shape)

  eeg = np.array(sample['eeg'])  # shape: [128, T]


Final EEG array shape: (11965, 128, 532)


In [6]:
from scipy.signal import butter, filtfilt

In [7]:
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return b, a

def apply_bandpass_filter(data, lowcut=0.5, highcut=45.0, fs=128.0, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    return filtfilt(b, a, data)

In [8]:
#Preallocate output
filtered_eeg_data = np.zeros_like(eeg_data_array)

 #Loop through trials and channels
for i in range(eeg_data_array.shape[0]):         # For each sample
    for ch in range(eeg_data_array.shape[1]):    # For each channel
        signal = eeg_data_array[i, ch]
        filtered = apply_bandpass_filter(signal, fs=128.0)
        filtered_eeg_data[i, ch] = filtered

print("Filtered EEG shape:", filtered_eeg_data.shape)

Filtered EEG shape: (11965, 128, 532)


In [15]:
np.save('filtered_eeg_data.npy', filtered_eeg_data)


In [None]:
 
cleaned_eeg_data = []
for eeg in filtered_eeg_data:
    if np.isnan(eeg).any() or np.isinf(eeg).any():
        continue  # Skip corrupted samples
    if eeg.shape != (128, 532):
        continue  # Skip if shape is not expected
    cleaned_eeg_data.append(eeg)

cleaned_eeg_data = np.stack(cleaned_eeg_data)
print("Cleaned EEG shape:", cleaned_eeg_data.shape)   


Cleaned EEG shape: (11965, 128, 532)


In [17]:
def epoch_eeg_data(eeg_array, epoch_length=128):
    num_samples, num_channels, total_timepoints = eeg_array.shape
    epochs_per_sample = total_timepoints // epoch_length

    eeg_array = eeg_array[:, :, :epochs_per_sample * epoch_length]  # Trim last partial second
    eeg_epochs = eeg_array.reshape(num_samples, num_channels, epochs_per_sample, epoch_length)
    eeg_epochs = eeg_epochs.transpose(0, 2, 1, 3)  # [samples, epochs, channels, time]
    eeg_epochs = eeg_epochs.reshape(-1, num_channels, epoch_length)  # Flatten to [new_samples, 128, 128]
    return eeg_epochs

eeg_epochs = epoch_eeg_data(cleaned_eeg_data, epoch_length=128)
print("Epoch EEG shape:", eeg_epochs.shape)  # [N*4, 128, 128]


Epoch EEG shape: (47860, 128, 128)


In [18]:
means = np.array(data['means'])   # shape: (128,)
stds = np.array(data['stddevs'])  # shape: (128,)


  means = np.array(data['means'])   # shape: (128,)
  stds = np.array(data['stddevs'])  # shape: (128,)


In [None]:
 
means = means.reshape(1, 128, 1)
stds = stds.reshape(1, 128, 1)

normalized_eeg = (eeg_epochs - means) / stds
print("Normalized EEG shape:", normalized_eeg.shape)   


Normalized EEG shape: (47860, 128, 128)


In [None]:
import numpy as np
import torch

# ------------ Configuration ------------
token_size = 4
embedding_dim = 1024
batch_size = 8   
device = torch.device("cpu")   

 
assert isinstance(eeg_epochs, np.ndarray), "EEG must be a NumPy array"
N, C, T = eeg_epochs.shape
assert C == 128 and T == 128, "EEG must be padded to (N, 128, 128)"
assert T % token_size == 0, "Time steps must be divisible by token size"
num_tokens = T // token_size

 
means = eeg_epochs.mean(axis=(0, 2), keepdims=True)  
stds = eeg_epochs.std(axis=(0, 2), keepdims=True) + 1e-6
normalized_eeg = (eeg_epochs - means) / stds

# ------------ Define Embedding Layer ------------
embedding_layer = torch.nn.Linear(token_size, embedding_dim).to(device)
embedding_layer.eval()   

# ------------ Process in Batches ------------
all_embeddings = []

try:
    with torch.no_grad():  # disable gradients to save memory
        for start in range(0, N, batch_size):
            end = min(start + batch_size, N)
            batch_np = normalized_eeg[start:end]  # shape (B, 128, 128)

            # Convert to torch tensor and move to device
            batch_tensor = torch.tensor(batch_np, dtype=torch.float32).to(device)

            # Tokenize: reshape to (B, 128, 32, 4)
            B = batch_tensor.shape[0]
            tokens = batch_tensor.reshape(B, C, num_tokens, token_size)

            # Flatten to (B*128*32, 4)
            tokens_flat = tokens.reshape(-1, token_size)

            # Apply embedding: (B*128*32, 1024)
            embedded_flat = embedding_layer(tokens_flat)

            # Reshape to (B, 128, 32, 1024)
            embedded = embedded_flat.view(B, C, num_tokens, embedding_dim)

            all_embeddings.append(embedded.cpu())  # store in CPU to avoid overflow

    # Combine all batches into one tensor: (N, 128, 32, 1024)
    final_embeddings = torch.cat(all_embeddings, dim=0)
    print("✅ Final Embeddings Shape:", final_embeddings.shape)

except Exception as e:
    print("❌ Error during processing:", e)


In [None]:
%pip install numpy matplotlib scipy


Note: you may need to restart the kernel to use updated packages.


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.fft import fft

 
assert normalized_eeg.ndim == 3, "normalized_eeg should be (N, 128, 128)"
N, C, T = normalized_eeg.shape

 
fs = 128  # sample rate in Hz
freqs = np.fft.fftfreq(T, d=1/fs)[:T//2]

# Select some samples and channels for plotting
num_samples_to_plot = 3
channels_to_plot = [0, 10, 20]   

for i in range(num_samples_to_plot):
    plt.figure(figsize=(15, 4))
    for j, ch in enumerate(channels_to_plot):
        signal = normalized_eeg[i, ch, :]
        fft_vals = fft(signal)
        power_spectrum = np.abs(fft_vals[:T // 2]) ** 2

        plt.subplot(1, len(channels_to_plot), j+1)
        plt.plot(freqs, power_spectrum)
        plt.title(f"Sample {i}, Channel {ch}")
        plt.xlabel("Frequency (Hz)")
        plt.ylabel("Power")
        plt.grid(True)
    plt.tight_layout()
    plt.show()
