In [17]:
import polars as pl
import numpy as np
from scipy import stats
from tqdm import tqdm
import os
from numpy.lib.stride_tricks import sliding_window_view

In [None]:
def extract_features(group: pl.DataFrame) -> pl.DataFrame:
    # Get constant features
    event_id = group['event_id'][0]
    prev_marker = group['prev_marker'][0]
    marker = group['marker'][0]
    
    # Get EEG columns dynamically
    eeg_cols = [col for col in group.columns if col not in 
                ['event_id', 'time', 'marker', 'prev_marker', 'orig_marker']]
    
    # Window configuration
    window_size = 50
    step_size = 25
    features = {
        'event_id': event_id,
        'prev_marker': prev_marker,
        'marker': marker
    }

    # Process each EEG channel
    for col in eeg_cols:
        # Convert to numpy array with proper numeric type
        signal = group[col].cast(pl.Float64).to_numpy()
        
        # Handle missing values and pad/truncate
        signal = np.nan_to_num(signal, nan=0.0)
        if len(signal) < 2000:
            signal = np.pad(signal, (0, 2000 - len(signal)))
        else:
            signal = signal[:2000]
        
        # Create sliding windows
        windows = sliding_window_view(signal, window_size)[::step_size]
        
        # Calculate features for each window
        for window_idx, window_signal in enumerate(windows):
            with np.errstate(divide='ignore', invalid='ignore'):
                diff = np.diff(window_signal)
                var_diff = np.var(diff, ddof=1)
                var_signal = np.var(window_signal, ddof=1)
                
                # Add window-specific features
                features.update({
                    f"{col}_mean_window{window_idx}": np.mean(window_signal),
                    f"{col}_std_window{window_idx}": np.std(window_signal, ddof=1),
                    f"{col}_activity_window{window_idx}": var_signal,
                })
    
    return pl.DataFrame([features])

def create_ml_dataset_polars(df_path: str) -> pl.DataFrame:
    # Initialize temporary file counter
    temp_counter = 0
    temp_files = []
    
    # Create LazyFrame with proper schema
    lf = pl.scan_parquet(df_path).with_columns(
        pl.col(['prev_marker', 'marker']).cast(pl.Utf8),
        pl.all().exclude(['event_id', 'time', 'marker', 'prev_marker', 'orig_marker']).cast(pl.Float64)
    )
    
    # Get total events
    total_events = lf.select(pl.n_unique('event_id')).collect().item()
    
    with tqdm(total=total_events, desc="Processing events") as pbar:
        for event_group in lf.collect(streaming=True).partition_by(
            "event_id", as_dict=True, maintain_order=True
        ).values():
            try:
                # Process each event group
                processed = (
                    event_group
                    .sort('time')
                    .group_by('event_id')
                    .map_groups(extract_features)
                )
                
                # Write to temporary file
                temp_file = f"temp_{temp_counter}.parquet"
                processed.write_parquet(temp_file)
                temp_files.append(temp_file)
                temp_counter += 1
                
                pbar.update(1)
                
            except Exception as e:
                print(f"Error processing event: {str(e)}")
                continue
    
    # Combine all temporary files
    if temp_files:
        # Create a LazyFrame from all temp files
        lazy_frames = [pl.scan_parquet(f) for f in temp_files]
        
        # Write directly to final Parquet without collecting to memory
        (
            pl.concat(lazy_frames)
            .sink_parquet(
                "final_dataset.parquet",
                compression="zstd",
                statistics=True
            )
        )
        
        # Cleanup temp files
        for f in temp_files:
            os.remove(f)
            
        return pl.scan_parquet("final_dataset.parquet").collect()
    return pl.DataFrame()


# Usage
final_ml_dataset = create_ml_dataset_polars("/home/owner/Documents/DEV/BrainLabyrinth/data/combined.parquet")

# Show result
print(final_ml_dataset.head())
print("Final shape:", final_ml_dataset.shape)

Processing events: 100%|██████████| 2772/2772 [1:39:27<00:00,  2.15s/it]


: 

In [1]:
import polars as pl

pl.read_parquet('ML_dataset.parquet')

: 

In [None]:
final_ml_dataset.shape

In [11]:
final_ml_dataset.columns

['prev_marker',
 'marker',
 'Fp1_mean',
 'Fp1_std',
 'Fp1_ptp',
 'Fp1_skew',
 'Fp1_kurtosis',
 'Fp1_zcr',
 'Fp1_activity',
 'Fp1_mobility',
 'Fp1_complexity',
 'Fpz_mean',
 'Fpz_std',
 'Fpz_ptp',
 'Fpz_skew',
 'Fpz_kurtosis',
 'Fpz_zcr',
 'Fpz_activity',
 'Fpz_mobility',
 'Fpz_complexity',
 'Fp2_mean',
 'Fp2_std',
 'Fp2_ptp',
 'Fp2_skew',
 'Fp2_kurtosis',
 'Fp2_zcr',
 'Fp2_activity',
 'Fp2_mobility',
 'Fp2_complexity',
 'F7_mean',
 'F7_std',
 'F7_ptp',
 'F7_skew',
 'F7_kurtosis',
 'F7_zcr',
 'F7_activity',
 'F7_mobility',
 'F7_complexity',
 'F3_mean',
 'F3_std',
 'F3_ptp',
 'F3_skew',
 'F3_kurtosis',
 'F3_zcr',
 'F3_activity',
 'F3_mobility',
 'F3_complexity',
 'Fz_mean',
 'Fz_std',
 'Fz_ptp',
 'Fz_skew',
 'Fz_kurtosis',
 'Fz_zcr',
 'Fz_activity',
 'Fz_mobility',
 'Fz_complexity',
 'F4_mean',
 'F4_std',
 'F4_ptp',
 'F4_skew',
 'F4_kurtosis',
 'F4_zcr',
 'F4_activity',
 'F4_mobility',
 'F4_complexity',
 'F8_mean',
 'F8_std',
 'F8_ptp',
 'F8_skew',
 'F8_kurtosis',
 'F8_zcr',
 'F8_activit