## Data Cleaning Test

In [44]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import pywt
from scipy import signal, stats
from sklearn.preprocessing import StandardScaler
import re
import contextlib

In [45]:
# Import data method
def load_eeg_data(filepath: str) -> tuple[pd.DataFrame, float]:
    """Load EEG data with automatic sample rate detection.
    
    Args:
        filepath: Path to CSV file containing EEG data
        
    Returns:
        Tuple containing:
        - DataFrame with EEG channels and sample index
        - Sampling frequency in Hz
    """
    # Helper function to find sample rate in a row
    def find_sample_rate(row: str) -> float | None:
        for value in row.split(';'):
            with contextlib.suppress(ValueError, TypeError):
                if 100 <= (sf := float(value)) <= 1000:
                    return sf
        return None

    # Read first two lines
    with open(filepath) as f:
        lines = [f.readline().strip(), f.readline().strip()]

    # Detect sample rate and header presence
    sfreq = find_sample_rate(lines[0]) or find_sample_rate(lines[1]) or 250.0
    skiprows = 2 if sfreq in (find_sample_rate(lines[0]), find_sample_rate(lines[1])) else 0

    # Load data with pandas
    df = pd.read_csv(
        filepath,
        sep=';',
        skiprows=skiprows,
        usecols=range(11),
        names=['SampleIndex', 'FP1', 'FP2', 'Channel3', 'Channel4', 'Channel5',
               'Channel6', 'Channel7', 'Channel8', 'Channel9', 'Channel10'],
        dtype=np.float32,
        engine='c'
    ).reset_index(drop=True)

    # Validate sample rate
    if not 100 <= sfreq <= 1000:
        raise ValueError(f"Invalid sample rate {sfreq}Hz detected")

    return df, float(sfreq)

def clean_eeg_data(df, sfreq):
    """Main cleaning pipeline"""
    # 1. Remove constant invalid values (-187500)
    eeg_channels = ['FP1', 'FP2', 'Channel3', 'Channel4', 'Channel5']
    df[eeg_channels] = df[eeg_channels].replace(-187500.02, np.nan)
    
    # 2. Handle missing values
    df = df.ffill().bfill()
    
    # 3. Remove non-EEG columns (using ACTUAL existing columns)
    df = df[['SampleIndex'] + eeg_channels]  # Now matches loaded columns
    
    # 4. Convert to μV to Volts
    df[eeg_channels] /= 1e6  # Convert from μV to V
    
    # 5. Outlier removal using Hampel filter
    for ch in eeg_channels:
        median = df[ch].rolling(window=100, center=True).median()
        mad = np.abs(df[ch] - median).rolling(window=100, center=True).median()
        df[ch] = np.where(np.abs(df[ch] - median) > 3*mad, median, df[ch])
    
    # 6. Bandpass filtering (1-40 Hz) with Nyquist check
    nyquist = 0.5 * sfreq
    low = 1.0
    high = min(40.0, nyquist * 0.95)  # Ensure we stay below Nyquist
    
    # Validate frequency range
    if low >= high:
        raise ValueError(f"Invalid filter range: low={low}Hz, high={high}Hz (Nyquist={nyquist}Hz)")
    
    sos = signal.butter(2, [low, high], btype='bandpass', fs=sfreq, output='sos')
    for ch in eeg_channels:
        df[ch] = signal.sosfiltfilt(sos, df[ch])
    
    
    # 7. Notch filter (50 Hz)
    b, a = signal.iirnotch(50, 30, fs=sfreq)
    for ch in eeg_channels:
        df[ch] = signal.filtfilt(b, a, df[ch])
    
    return df

# Denoise Method
def wavelet_denoise(signal_data, wavelet='db4', level=3):
    """Improved wavelet denoising with auto-padding"""
    # Calculate required length (next multiple of 2^level)
    required_length = ((len(signal_data) + (2 ** level - 1)) // (2 ** level)) * (2 ** level)
    padded_signal = np.pad(signal_data, (0, required_length - len(signal_data)), 
                         mode='edge')
    
    # Now calculate max_level based on the padded signal length
    max_level = pywt.swt_max_level(len(padded_signal))
    adjusted_level = min(level, max_level)
    
    if adjusted_level < 1:
        raise ValueError(f"Cannot perform SWT with level {adjusted_level}. Need at least level 1.")
        
    # Perform SWT with adjusted level
    coeffs = pywt.swt(padded_signal, wavelet, level=adjusted_level)

    # Adaptive thresholding
    sigma = np.median(np.abs(coeffs[-1][1])) / 0.6745
    threshold = sigma * np.sqrt(2 * np.log(len(padded_signal)))
    
    # Apply threshold to detail coefficients
    denoised_coeffs = [coeffs[0]]  # Keep approximation coefficients
    for c in coeffs[1:]:
        denoised_coeffs.append(pywt.threshold(c, threshold, mode='soft'))
    
    # Reconstruct signal
    denoised = pywt.iswt(denoised_coeffs, wavelet)
    return denoised[:len(signal_data)]  # Remove padding


In [46]:
# Load data
raw_df, sfreq = load_eeg_data("EEG-EyeBlinks/EEG-IO/S00_data.csv")

print(raw_df.iloc[0].to_dict())  # First row of loaded data
print("Sample rate:", sfreq)  # Should show 250 for your data

# Clean data
clean_df = clean_eeg_data(raw_df, sfreq)

# Apply wavelet denoising to kept channel
eeg_channels = ['FP1', 'FP2', 'Channel3', 'Channel4', 'Channel5']

# Apply wavelet denoising
for ch in eeg_channels:
    clean_df[ch] = wavelet_denoise(clean_df[ch].values)

{'SampleIndex': 0.00390625, 'FP1': 8077.9873046875, 'FP2': -8250.5869140625, 'Channel3': -35346.33203125, 'Channel4': -187500.015625, 'Channel5': 187500.0, 'Channel6': 187500.0, 'Channel7': -187500.015625, 'Channel8': 187500.0, 'Channel9': 0.0, 'Channel10': 0.0}
Sample rate: 250.0


In [47]:
clean_df.head()

Unnamed: 0,SampleIndex,FP1,FP2,Channel3,Channel4,Channel5
0,0.003906,9.1e-05,6e-05,-0.000227,2.55027e-15,1.11377e-15
1,0.007812,0.000189,0.000119,-0.000181,2.700346e-15,9.736745e-16
2,0.011719,0.000269,0.000162,-9.4e-05,2.844937e-15,8.29934e-16
3,0.015625,0.00028,0.000167,-0.000195,2.955379e-15,6.643253e-16
4,0.019531,0.000269,0.000164,-0.000443,3.091443e-15,4.964823e-16
