# Explore Sense42 Raw EEG Data
Load and inspect the parquet EEG files and their associated label `.txt` files.

In [None]:
import os
import glob
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
from collections import defaultdict

In [None]:
DATA_DIR = "data/sense42_raw_eeg_extracted_rawonly"

# List all files
all_files = sorted(os.listdir(DATA_DIR))
parquet_files = [f for f in all_files if f.endswith(".parquet")]
txt_files = [f for f in all_files if f.endswith(".txt")]
json_files = [f for f in all_files if f.endswith(".json")]

print(f"Total files: {len(all_files)}")
print(f"  .parquet : {len(parquet_files)}")
print(f"  .txt     : {len(txt_files)}")
print(f"  .json    : {len(json_files)}")

## 1. Dataset Structure Overview

In [None]:
# Parse filenames to understand the dataset structure
participants = set()
segments_per_participant = defaultdict(set)
label_types = set()

for f in all_files:
    if not f.startswith("P"):
        continue
    parts = f.split("_seg")
    pid = parts[0]  # e.g. P001
    participants.add(pid)

    seg_rest = parts[1]  # e.g. 04_eeg_raw_mental.txt
    seg_id = seg_rest.split("_")[0]  # e.g. 04
    segments_per_participant[pid].add(seg_id)

    # Extract label type
    basename = os.path.splitext(f)[0]  # strip extension
    after_raw = basename.split("eeg_raw")[-1]  # e.g. "_mental" or ""
    label = after_raw.lstrip("_") if after_raw else "raw"
    label_types.add(label if label else "raw")

print(f"Participants: {len(participants)} ({min(participants)} .. {max(participants)})")
seg_counts = [len(v) for v in segments_per_participant.values()]
print(f"Segments per participant: min={min(seg_counts)}, max={max(seg_counts)}, mean={np.mean(seg_counts):.1f}")
print(f"Label types: {sorted(label_types)}")

## 2. Load a Single Parquet File

In [None]:
# Pick the first parquet file as an example
sample_parquet = parquet_files[0]
sample_path = os.path.join(DATA_DIR, sample_parquet)
print(f"Loading: {sample_parquet}")
print(f"File size: {os.path.getsize(sample_path) / 1024 / 1024:.2f} MB")

# --- Method A: Read with pyarrow (fastest, lowest memory) ---
try:
    pf = pq.ParquetFile(sample_path)
    print(f"\n--- PyArrow ParquetFile metadata ---")
    print(f"Schema: {pf.schema_arrow}")
    print(f"Num columns: {pf.schema_arrow.num_fields if hasattr(pf.schema_arrow, 'num_fields') else 'N/A'}")
    print(f"Num row groups: {pf.metadata.num_row_groups}")
    print(f"Num rows: {pf.metadata.num_rows}")
except Exception as e:
    print(f"PyArrow read failed: {e}")

In [None]:
# --- Method B: Read into pandas DataFrame ---
try:
    df = pd.read_parquet(sample_path)
    print(f"Shape: {df.shape}")
    print(f"Columns: {list(df.columns)}")
    print(f"Dtypes:\n{df.dtypes}")
    print(f"\nMemory usage: {df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")
    display(df.head())
    display(df.describe())
except Exception as e:
    print(f"Pandas read failed: {e}")

In [None]:
# --- Method C: Convert to numpy array ---
try:
    df = pd.read_parquet(sample_path)
    eeg_array = df.to_numpy(dtype=np.float64)
    print(f"Numpy array shape: {eeg_array.shape}")
    print(f"Dtype: {eeg_array.dtype}")
    print(f"Min: {np.nanmin(eeg_array):.6f}, Max: {np.nanmax(eeg_array):.6f}, Mean: {np.nanmean(eeg_array):.6f}")
    print(f"Any NaNs: {np.isnan(eeg_array).any()}")
    print(f"\nFirst 3 rows x first 5 cols:\n{eeg_array[:3, :5]}")
except Exception as e:
    print(f"Numpy conversion failed: {e}")

## 3. Load the Corresponding Label Files

In [None]:
LABEL_NAMES = ["effort", "frustration", "mental", "performance", "temporal"]

def load_label(filepath):
    """Try multiple strategies to read a label value from a small binary/text file."""
    size = os.path.getsize(filepath)
    with open(filepath, "rb") as f:
        raw = f.read()

    # Strategy 1: try reading as plain text float
    try:
        return float(raw.strip())
    except (ValueError, UnicodeDecodeError):
        pass

    # Strategy 2: try interpreting as a packed float32
    if len(raw) == 4:
        val = np.frombuffer(raw, dtype=np.float32)
        return float(val[0])

    # Strategy 3: try interpreting as a packed float64
    if len(raw) == 8:
        val = np.frombuffer(raw, dtype=np.float64)
        return float(val[0])

    # Strategy 4: try interpreting as packed int32
    if len(raw) == 4:
        val = np.frombuffer(raw, dtype=np.int32)
        return int(val[0])

    return raw  # return raw bytes if nothing works


# Load labels for the same segment as the sample parquet
# e.g. P001_seg01_eeg_raw.parquet -> P001_seg01
seg_prefix = sample_parquet.replace("_eeg_raw.parquet", "")
print(f"Segment: {seg_prefix}")
print()

for label in LABEL_NAMES:
    # Try .txt first
    label_file = os.path.join(DATA_DIR, f"{seg_prefix}_eeg_raw_{label}.txt")
    if os.path.exists(label_file):
        val = load_label(label_file)
        print(f"  {label:15s} = {val}  (size: {os.path.getsize(label_file)} bytes)")
    else:
        print(f"  {label:15s} : FILE NOT FOUND")

# Also the base raw label
base_label = os.path.join(DATA_DIR, f"{seg_prefix}_eeg_raw.txt")
if os.path.exists(base_label):
    val = load_label(base_label)
    print(f"  {'raw':15s} = {val}  (size: {os.path.getsize(base_label)} bytes)")

## 4. Batch-Load All Parquet Files into a Dict

In [None]:
def parse_filename(filename):
    """Parse a parquet filename into (participant_id, segment_id)."""
    # e.g. P001_seg04_eeg_raw.parquet
    base = filename.replace("_eeg_raw.parquet", "")
    parts = base.split("_seg")
    return parts[0], parts[1]


# Load a subset (first N parquet files) to avoid memory issues
N = 5  # increase this to load more
dataset = {}

for pf_name in parquet_files[:N]:
    pid, seg = parse_filename(pf_name)
    path = os.path.join(DATA_DIR, pf_name)
    seg_prefix = pf_name.replace("_eeg_raw.parquet", "")

    try:
        df = pd.read_parquet(path)
        eeg = df.to_numpy(dtype=np.float64)
    except Exception as e:
        print(f"  SKIP {pf_name}: {e}")
        continue

    # Load labels
    labels = {}
    for label in LABEL_NAMES:
        lpath = os.path.join(DATA_DIR, f"{seg_prefix}_eeg_raw_{label}.txt")
        if os.path.exists(lpath):
            labels[label] = load_label(lpath)

    dataset[(pid, seg)] = {
        "eeg": eeg,
        "labels": labels,
        "columns": list(df.columns),
    }
    print(f"Loaded {pf_name}: shape={eeg.shape}, labels={labels}")

print(f"\nLoaded {len(dataset)} segments into memory.")

## 5. Quick Data Summary Across All Parquet Files (metadata only)

In [None]:
# Scan parquet metadata without loading data into memory
summary_rows = []

for pf_name in parquet_files:
    path = os.path.join(DATA_DIR, pf_name)
    pid, seg = parse_filename(pf_name)
    try:
        meta = pq.read_metadata(path)
        schema = pq.read_schema(path)
        summary_rows.append({
            "participant": pid,
            "segment": seg,
            "num_rows": meta.num_rows,
            "num_cols": schema.num_fields if hasattr(schema, 'num_fields') else len(schema),
            "file_size_mb": os.path.getsize(path) / 1024 / 1024,
        })
    except Exception as e:
        summary_rows.append({
            "participant": pid,
            "segment": seg,
            "num_rows": None,
            "num_cols": None,
            "file_size_mb": os.path.getsize(path) / 1024 / 1024,
            "error": str(e),
        })

summary_df = pd.DataFrame(summary_rows)
print("=== Parquet File Metadata Summary ===")
display(summary_df.describe())
display(summary_df.head(10))

## 6. Load the Extraction Summary JSON

In [None]:
import json

json_path = os.path.join(DATA_DIR, "extraction_summary.json")
if os.path.exists(json_path):
    try:
        with open(json_path, "r") as f:
            extraction_summary = json.load(f)
        print(f"Keys: {list(extraction_summary.keys()) if isinstance(extraction_summary, dict) else type(extraction_summary)}")
        # Pretty-print first level
        for k, v in (extraction_summary.items() if isinstance(extraction_summary, dict) else [("root", extraction_summary)]):
            if isinstance(v, (dict, list)):
                print(f"  {k}: {type(v).__name__} with {len(v)} items")
            else:
                print(f"  {k}: {v}")
    except Exception as e:
        print(f"Failed to load JSON: {e}")
        # Try reading raw bytes
        with open(json_path, "rb") as f:
            raw = f.read(500)
        print(f"Raw bytes (first 500): {raw}")
else:
    print("extraction_summary.json not found")