# converts epochs data to simulated data

The code in this notebook loads in epoched, preprocessed data. All but the first 3 epochs are dropped, then the sEEG data is replaced with simulated powerlaw noise.

In [None]:
import numpy as np
import mne
import os


In [None]:
def sim_powerlaw_custom(n_seconds, exponent, fs, random_state=None):
    """
    Simulate a 1/f^exponent time series using spectral synthesis.

    Parameters
    ----------
    n_seconds : float
        Length of signal in seconds.
    exponent : float
        Power law exponent (e.g., -1 = pink, -2 = brown).
    fs : float
        Sampling frequency.
    random_state : int or None
        Seed for reproducibility.

    Returns
    -------
    sig : 1d array
        Simulated time series.
    """
    rng = np.random.default_rng(random_state)
    n_samples = int(round(n_seconds * fs))

    # Frequencies for rfft
    freqs = np.fft.rfftfreq(n_samples, 1/fs)
    # Avoid divide by zero
    freqs[0] = 1.0

    # Generate random complex spectrum with magnitude ~ 1/f^(exponent/2)
    phases = rng.uniform(0, 2*np.pi, len(freqs))
    magnitudes = 1.0 / (freqs ** (abs(exponent) / 2.0))

    spectrum = magnitudes * (np.cos(phases) + 1j*np.sin(phases))

    # Ensure Hermitian symmetry for real signal
    sig = np.fft.irfft(spectrum, n_samples)

    # Normalize variance
    sig = sig / np.std(sig)
    return sig


def anonymize_epochs_with_powerlaw(
    epochs,
    exponent=-1.0,
    random_state=None,
    preserve_channel_std=False,
    preserve_channel_mean=False,
):
    """
    Replace data in an MNE Epochs object with custom simulated power-law noise.
    """
    rng = np.random.default_rng(random_state)

    epochs_sim = epochs.copy()
    orig_data = epochs_sim.get_data()  # shape: (n_epochs, n_channels, n_times)
    n_epochs, n_channels, n_times = orig_data.shape

    sfreq = float(epochs_sim.info['sfreq'])
    n_seconds = n_times / sfreq

    sim_data = np.zeros_like(orig_data, dtype=np.float64)

    for ep in range(n_epochs):
        for ch in range(n_channels):
            seed = int(rng.integers(0, 2**31 - 1))
            sim_ts = sim_powerlaw_custom(n_seconds, exponent, fs=sfreq, random_state=seed)

            # Safety: resample to exact length if mismatch
            if sim_ts.size != n_times:
                t_sim = np.linspace(0, n_seconds, num=sim_ts.size, endpoint=False)
                t_tgt = np.linspace(0, n_seconds, num=n_times, endpoint=False)
                sim_ts = np.interp(t_tgt, t_sim, sim_ts)

            sim_data[ep, ch, :] = sim_ts

    # Match per-channel statistics
    if preserve_channel_std or preserve_channel_mean:
        orig_std = orig_data.std(axis=(0, 2))
        orig_mean = orig_data.mean(axis=(0, 2))

        for ch in range(n_channels):
            if preserve_channel_std:
                std_sim = sim_data[:, ch, :].std()
                if std_sim > 0:
                    sim_data[:, ch, :] *= orig_std[ch] / std_sim
            if preserve_channel_mean:
                mean_sim = sim_data[:, ch, :].mean()
                sim_data[:, ch, :] += (orig_mean[ch] - mean_sim)

    epochs_sim._data = sim_data.astype(orig_data.dtype)
    return epochs_sim

def anonymize_and_trim_epochs(
    in_path,
    out_path,
    exponent=-1.0,
    random_state=None,
    preserve_channel_std=True,
    preserve_channel_mean=True,
    max_epochs=3,
    max_size_mb=100
):
    """
    Load epochs, keep first few, anonymize with 1/f noise, and save.

    Parameters
    ----------
    in_path : str
        Path to the input epochs .fif file.
    out_path : str
        Path to save anonymized epochs.
    exponent : float
        Exponent for power-law noise.
    random_state : int or None
        Random seed for reproducibility.
    preserve_channel_std : bool
        Scale simulated data to match original channel std.
    preserve_channel_mean : bool
        Shift simulated data to match original channel mean.
    max_epochs : int
        Keep only this many epochs.
    max_size_mb : int
        Maximum allowed size (in MB). Raises warning if exceeded.
    """
    print(f"Loading: {in_path}")
    epochs = mne.read_epochs(in_path, preload=True)

    # Keep only the first `max_epochs` epochs
    epochs = epochs[:max_epochs]

    # Anonymize
    epochs_anon = anonymize_epochs_with_powerlaw(
        epochs,
        exponent=exponent,
        random_state=random_state,
        preserve_channel_std=preserve_channel_std,
        preserve_channel_mean=preserve_channel_mean,
    )

    # Save
    epochs_anon.save(out_path, overwrite=True)
    size_mb = os.path.getsize(out_path) / (1024 * 1024)
    if size_mb > max_size_mb:
        print(f"⚠️ WARNING: {out_path} is {size_mb:.2f} MB (> {max_size_mb} MB)")
    else:
        print(f"✅ Saved {out_path} ({size_mb:.2f} MB)")

In [None]:
subjs = ['1', '2', '3', '4', '5', '6', '7']

for subj in subjs:
    path = subj+'_sim_epo.fif'
    save_path = './sim/'+subj+'_sim_epo.fif'
    anonymize_and_trim_epochs(path, save_path, exponent=-1.0, random_state=99)
