### 🧠 Step 1: Sliding Window Epoching (1s windows with 50% overlap)
This replaces the older method of using one fixed-length chunk per trial. We now create multiple overlapping 1-second windows from the 10s trial to increase data samples and temporal resolution. This enhances model training diversity.

In [None]:
import numpy as np
import mne

def sliding_window_epochs(raw, window_size_sec=1.0, overlap=0.5):
    sfreq = int(raw.info['sfreq'])
    window_size = int(window_size_sec * sfreq)
    step_size = int(window_size * (1 - overlap))
    data = raw.get_data()
    n_samples = data.shape[1]
    epochs = []
    for start in range(0, n_samples - window_size + 1, step_size):
        stop = start + window_size
        epochs.append(data[:, start:stop])
    return epochs


In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import os
import mne
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score,confusion_matrix
import tensorflow
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.utils import plot_model, to_categorical
from tensorflow.keras.layers import Dense, Activation, Conv1D, MaxPooling1D, GlobalAveragePooling1D, Flatten, Dropout, BatchNormalization, Input,UpSampling1D
from tensorflow.keras.layers import concatenate, Lambda, Conv2D, MaxPooling2D, GlobalAveragePooling2D,LSTM
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import EarlyStopping

### 🔁 Step 2: Time-Domain Data Augmentation
This step introduces artificial variations to the training data using:
- Random time shifts (jitter)
- Gaussian noise
- Random channel dropout

This helps the model become more robust and prevents overfitting.

In [None]:
import numpy as np

def augment_batch(batch, jitter_max=6, noise_std=0.005, dropout_rate=0.1):
    augmented = []
    for trial in batch:
        x = np.copy(trial)
        # Time jitter (shift up to ±6 samples)
        shift = np.random.randint(-jitter_max, jitter_max + 1)
        x = np.roll(x, shift, axis=1)
        # Add Gaussian noise
        x += noise_std * np.random.randn(*x.shape)
        # Channel dropout
        if dropout_rate > 0:
            mask = np.random.rand(x.shape[0]) < dropout_rate
            x[mask, :] = 0
        augmented.append(x)
    return np.array(augmented)


In [None]:
fo = 'Digit/'

In [None]:
sizearr = []
X = np.zeros((230,14,1280))
Y = np.zeros((230,))
ctr = 0
for fi in os.listdir(fo):
    data = mne.io.read_raw_edf(os.path.join(fo,fi))
    raw_data = data[2:16][0]*1000
    raw_data = raw_data[:,0:1280]
    #a = raw_data.shape
    
    _,cls = fi.split('_')
    Y[ctr] = int(cls[0])
    X[ctr,:,:] = raw_data
    ctr = ctr+1

    #sizearr.append(a[1])

In [None]:
X_new = np.zeros((36110,32,14))
Y_new = np.zeros((36110,))
npt = 32
stride = 8
ctr = 0
for i in range(0,230):
    y = Y[i]
    a= X[i,:,:]
    a = a.transpose()
    val = 0
    while val<=(len(a)-npt):
        x = a[val:val+npt,:]
        X_new[ctr,:,:] = x
        Y_new[ctr] = y
        val = val+stride
        ctr = ctr+1

In [None]:

# STEP 2: Data Augmentation in the Time Domain

import numpy as np

def add_jitter(X, sigma=0.01):
    return X + np.random.normal(loc=0., scale=sigma, size=X.shape)

def window_slicing(X, window_ratio=0.9):
    n_samples, time_len, channels = X.shape
    new_time_len = int(time_len * window_ratio)
    starts = np.random.randint(0, time_len - new_time_len + 1, size=n_samples)
    X_sliced = np.array([x[start:start + new_time_len] for x, start in zip(X, starts)])
    # Pad sliced windows back to original length
    X_padded = np.zeros((n_samples, time_len, channels))
    for i in range(n_samples):
        X_padded[i, :new_time_len, :] = X_sliced[i]
    return X_padded

# Apply augmentation
X_aug_jitter = add_jitter(X)
X_aug_slice = window_slicing(X)

# Combine original and augmented data
X_combined = np.concatenate([X, X_aug_jitter, X_aug_slice], axis=0)
y_combined = np.concatenate([y, y, y], axis=0)

print(f"Original shape: {X.shape}, Augmented shape: {X_combined.shape}")


In [None]:
X_train, X_test, Y_train, Y_test = train_test_split(X_new, Y_new, test_size=0.2, random_state=1)
i1 = Input(shape=(32,14))
x1 = BatchNormalization()(i1)
x1 = Conv1D(128, kernel_size=10,strides=1,activation='relu',padding='same')(x1)
x1 = BatchNormalization()(x1)
x1 = MaxPooling1D(2)(x1)
x1 = LSTM(256,activation='tanh')(x1)
x1 = BatchNormalization()(x1)
x1 = Dense(128, activation='relu')(x1)
x1 = Dropout(0.5)(x1)
output = Dense(10, activation='softmax')(x1)
model = Model(inputs=i1, outputs=output)# summarize layers
model.compile(loss='categorical_crossentropy',optimizer='adam',metrics=['accuracy'])
es = EarlyStopping(monitor='val_accuracy', verbose=1, patience=10)
model.fit(X_train, y=to_categorical(Y_train),validation_split=0.2,epochs=500, batch_size=128,verbose=1,callbacks=[es])
pred = model.predict(X_test)
Y_pred = np.argmax(pred,axis=1)
print(accuracy_score(Y_pred,Y_test))
sns.heatmap(confusion_matrix(Y_test,Y_pred), annot=True,fmt='g')