Data Augmentation

In [None]:
# Data augmentation
class EEGAugmentation:
    def __init__(self, noise_std=0.01, time_shift_max=50, freq_shift_max=2):
        self.noise_std = noise_std
        self.time_shift_max = time_shift_max
        self.freq_shift_max = freq_shift_max
    
    def add_gaussian_noise(self, data, std=None):
        if std is None:
            std = self.noise_std
        noise = np.random.normal(0, std, data.shape)
        return data + noise
    
    def time_shift(self, data):
        shift = np.random.randint(-self.time_shift_max, self.time_shift_max)
        if shift > 0:
            return np.concatenate([data[:, shift:], data[:, :shift]], axis=1)
        elif shift < 0:
            return np.concatenate([data[:, shift:], data[:, :shift]], axis=1)
        return data
    
    def amplitude_scale(self, data, scale_range=(0.75, 1.25)):
        scale = np.random.uniform(scale_range[0], scale_range[1])
        return data * scale
    
    def frequency_shift(self, data, fs=250, max_shift=2):
        fft_data = np.fft.fft(data, axis=1)
        shift = np.random.randint(-max_shift, max_shift + 1)
        if shift != 0:
            fft_data = np.roll(fft_data, shift, axis=1)
        return np.real(np.fft.ifft(fft_data, axis=1))
    
    def channel_dropout(self, data, dropout_prob=0.1):
        mask = np.random.random(data.shape[0]) > dropout_prob
        augmented_data = data.copy()
        augmented_data[~mask] = 0
        return augmented_data
    
    def mixup(self, data1, data2, alpha=0.2):
        lam = np.random.beta(alpha, alpha)
        return lam * data1 + (1 - lam) * data2
    
    def time_warping(self, data, sigma=0.2):
        time_steps = data.shape[1]
        random_warps = np.random.normal(size=time_steps) * sigma
        cumulative_warps = np.cumsum(random_warps)
        warped_indices = np.clip(np.arange(time_steps) + cumulative_warps, 0, time_steps - 1).astype(int)
        return data[:, warped_indices]
    
    def augment_batch(self, data, p=0.5):
        augmented_data = data.copy()
        batch_size = len(data)
        
        for i in range(batch_size):
            if np.random.random() < p:
                # Apply random combination of augmentations
                if np.random.random() < 0.4:
                    augmented_data[i] = self.add_gaussian_noise(augmented_data[i])
                if np.random.random() < 0.3:
                    augmented_data[i] = self.time_shift(augmented_data[i])
                if np.random.random() < 0.3:
                    augmented_data[i] = self.amplitude_scale(augmented_data[i])
                if np.random.random() < 0.2:
                    augmented_data[i] = self.frequency_shift(augmented_data[i])
                if np.random.random() < 0.1:
                    augmented_data[i] = self.channel_dropout(augmented_data[i])
                if np.random.random() < 0.2:
                    augmented_data[i] = self.time_warping(augmented_data[i])
                
        return augmented_data

In [None]:
# Data augmentation
augmenter = EEGAugmentation(noise_std=0.015, time_shift_max=25)

augmented_versions = []
for _ in range(3):  # Create 3 augmented versions
    X_aug = augmenter.augment_batch(X_first_session, p=0.6)
    augmented_versions.append(X_aug)

# Combine augmented data
X_train_combined = np.concatenate([X_first_session] + augmented_versions)
y_train_combined = np.concatenate([y_train] * (1 + len(augmented_versions)))

print(f"Combined training data shape: {X_train_combined.shape}")

# Final dataset
X = np.concatenate((X_train_combined, X_second_session))
y = np.concatenate((y_train_combined, y_test))

print(f"Total dataset shape: {X.shape}")
print(f"Label distribution: {np.bincount(y.astype(int))}")