### Imports and Prerequisite Functions

In [None]:
import mne
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from sklearn.model_selection import StratifiedKFold, cross_val_score
from toeplitzlda.classification import ToeplitzLDA
import pyxdf

import warnings

warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=RuntimeWarning)
mne.set_log_level('WARNING')

def load_and_preprocess_raw(header_file, filter_band=(0.5, 16)):
    non_eeg_channels = ["EOGvu", "x_EMGl", "x_GSR", "x_Respi", "x_Pulse", "x_Optic"]
    raw = mne.io.read_raw_brainvision(header_file, misc=non_eeg_channels, preload=True)
    raw.set_montage("standard_1020")
    raw.filter(*filter_band, method="iir")
    raw.pick_types(eeg=True)
    return raw

def load_and_preprocess_xdf(xdf_file, filter_band):
    # Load without dejittering anything yet
    data, _ = pyxdf.load_xdf(xdf_file, dejitter_timestamps=False)

    # Get stream names
    stream_names = [stream['info']['name'][0] for stream in data]

    # Find EEG stream
    idx_eeg = stream_names.index("BrainAmp BUA -")
    eeg_stream = data[idx_eeg]

    eeg_data = np.array(eeg_stream['time_series']).T  # shape: (n_channels, n_samples)
    ts_eeg = np.array(eeg_stream['time_stamps'])
    info_eeg = eeg_stream['info']

    # Find Marker stream
    idx_markers = stream_names.index("AuditoryAphasiaAudioMarker")
    markers = [int(m[0]) for m in data[idx_markers]['time_series']]
    time_stamps = np.array(data[idx_markers]['time_stamps'])

    # Create info for MNE
    ch_names = [ch['label'][0] for ch in info_eeg['desc'][0]['channels'][0]['channel']]
    sfreq = float(info_eeg['nominal_srate'][0])
    ch_types = ['eeg'] * len(ch_names)
    info = mne.create_info(ch_names=ch_names, sfreq=sfreq, ch_types=ch_types)

    onsets = time_stamps - ts_eeg[0]

    # Create RawArray
    raw = mne.io.RawArray(eeg_data, info)

    annotations = mne.Annotations(
        onset=onsets,
        duration=[0] * len(time_stamps),
        description=markers
    )

    raw.set_annotations(annotations)

    # FIX the channel type
    if 'MkIdx' in raw.ch_names:
        raw.set_channel_types({'MkIdx': 'stim'})  # or 'misc'

    # Preprocessing
    raw.set_montage("standard_1020")
    raw.filter(*filter_band, method="iir")
    raw.pick_types(eeg=True)

    return raw

def epoch_raw(raw, decimate=10):
    id_map = {
        '101': 101,
        '102': 102,
        '103': 103,
        '104': 104,
        '105': 105,
        '106': 106,
        '111': 111,
        '112': 112,
        '113': 113,
        '114': 114,
        '115': 115,
        '116': 116,
        '201': 201,
        '202': 202,
        '203': 203,
        '204': 204,
        '205': 205,
        '206': 206,
    }

    evs, _ = mne.events_from_annotations(raw, event_id=id_map)
    target_ids = list(range(111, 117))
    non_target_ids = list(range(101, 107))
    event_id = {f"Word_{i - 110}/Target": i for i in target_ids}
    event_id.update({f"Word_{i - 100}/NonTarget": i for i in non_target_ids})
    epoch = mne.Epochs(raw, events=evs, event_id=event_id, decim=decimate,
                       proj=False, tmax=1, baseline=None)
    return epoch

def load_data(participant, task="Default", filter_band=(0.5, 16)):
    data_dir = Path.cwd() / "session_data" / participant / "run" / f"sub-{participant}" / "ses-S001" / "eeg"
    
    xdf_files = sorted(data_dir.glob(f"sub-{participant}_ses-S001_task-{task}_run-*_eeg.xdf"))

    if not xdf_files:
        print(f"No XDF files found in {data_dir}")
        return []
    
        # Load the data, preprocess and slice it into epochs
    epochs = list()
    for xdf_file in xdf_files:
        raw_data = load_and_preprocess_xdf(xdf_file, filter_band)
        epochs.append(epoch_raw(raw_data))

    # Overwrite epochs list to save memory
    epochs = mne.concatenate_epochs(epochs)

    # Combine 6 epochs into a single iteration (6 stimuli together form a single iteration)
    iterations = [epochs[i:i+6] for i in np.arange(0, epochs.events.shape[0],6)]

    # Assert that each iteration contains exactly 1 Target
    assert all([len(iteration["Target"]) == 1 for iteration in iterations]), "Number of targets in single iterations is unequal to 1."

    # 15 iterations form a single trial
    trials = [iterations[i:i+15] for i in np.arange(0,len(iterations),15)]

    return trials
    
def get_jumping_means(epo, boundaries):
    shape_orig = epo.get_data().shape
    X = np.zeros((shape_orig[0], shape_orig[1], len(boundaries)-1))
    for i in range(len(boundaries)-1):
        idx = epo.time_as_index((boundaries[i], boundaries[i+1]))
        idx_range = list(range(idx[0], idx[1]))
        X[:,:,i] = epo.get_data()[:,:,idx_range].mean(axis=2)
    return X

def compute_auc_per_config(trials, clf_ival_boundaries):
    results = np.zeros(len(trials))
    for t, trial in enumerate(trials):
        features, labels = list(), list()
        for iteration in trial:
            for s, stimulus in enumerate(iteration):
                features.extend([
                    get_jumping_means(iteration[s], clf_ival_boundaries).squeeze().flatten()
                ])
            labels.extend([
                1 if event > 107 else 0
                for event in iteration.events[:,2]
            ])
        X = np.array(features)
        # print(f"X shape: {X.shape}")
        y = np.array(labels)
        n_channels = trial[0].get_data().shape[1]
        clf = ToeplitzLDA(n_channels=n_channels, data_is_channel_prime=False)
        skf = StratifiedKFold(n_splits=15, shuffle=False)
        auc_scores = cross_val_score(clf, X, y, cv=skf, scoring='roc_auc')
        results[t] = auc_scores.mean()

    return results

In [None]:
participants = ["VPpdla", "VPpdlb", "VPpdlc", "VPpdld", "VPpdlf"]
step = 0.05
clf_ival_boundaries = np.arange(0.1, 0.81, step)
print(clf_ival_boundaries)

### Validation testing

In [None]:
# Classification of validation phase data
val_auc_scores = np.zeros((5,24))
for i, participant in enumerate(participants):
    print(f"Loading data of {participant}")
    trials = load_data(participant, task="Validation")
    print("Starting classification...")
    val_auc_scores[i,:] = compute_auc_per_config(trials, clf_ival_boundaries)

In [None]:
from scipy.stats import wilcoxon

optimal_aucs = list()
default_aucs = list()


# Statistical testing for each participant
for i, participant in enumerate(participants):
    split_auc = np.split(val_auc_scores[i,:], 4) # -> assume Optimal -> Default -> Optimal -> Default
    optimal_auc = np.concatenate([split_auc[0], split_auc[2]])
    default_auc = np.concatenate([split_auc[1], split_auc[3]])
    stat, p_value = wilcoxon(optimal_auc, default_auc, alternative="greater")
    print(participant)
    print(f"Optimal-EQ - AUC mean: {np.mean(optimal_auc):.3f}", f"AUC std: {np.std(optimal_auc):.3f}")
    print(f"Default-EQ - AUC mean: {np.mean(default_auc):.3f}", f"AUC std: {np.std(default_auc):.3f}")
    print(p_value, p_value < 0.05)
    print("--------------------------------------")

    optimal_aucs.append(optimal_auc)
    default_aucs.append(default_auc)

In [None]:
import matplotlib.pyplot as plt


# 
fig, axs = plt.subplots(2, 3, figsize=(14, 8), sharex=True, sharey=True)
axs = axs.flat
axs[5].set_axis_off()

for i in range(5):
    axs[i].hist(default_aucs[i], bins=10, alpha=0.5, label='Default-EQ', color='darkorange')
    axs[i].hist(optimal_aucs[i], bins=10, alpha=0.5, label='Optimal-EQ', color='royalblue')
    axs[i].set_title(f'{participants[i]}')
    axs[i].set_xlabel('AUC')
    axs[i].grid(True)
    axs[i].tick_params(labelbottom=True)
    if i == 0 or i == 3:
        axs[i].set_ylabel('Frequency')
        # 

axs[0].legend()
plt.suptitle('AUC Distributions of Optimal-EQ vs. Default-EQ per participant')
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()


### Loudness Testing

In [None]:
# Classify optimization data
opt_auc_scores = np.zeros((5,60))
for i, participant in enumerate(participants):
    print(f"Loading data of {participant}")
    trials = load_data(participant, task="Default")
    print("Starting classification...")
    opt_auc_scores[i,:] = compute_auc_per_config(trials, clf_ival_boundaries)

In [None]:
import re
import pandas as pd

# Parse log files for history of words played

def parse_log_file(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    gain_pattern = re.compile(r'Gain configuration (\d+) sampled: \[([^\]]+)\]')
    trial_pattern = re.compile(r'Trial (\d+)')
    word_pattern = re.compile(
        r'Word (?P<word>\w+?)_(?:target|non_target) played from direction (?P<direction>\d) - (?P<time>[0-9.]+)'
    )

    parsed_data = []
    current_trial = None
    current_gain = None
    seen_pairs = set()

    for line in lines:
        if "end optimization" in line.lower():
            break  # Stop parsing here

        trial_match = trial_pattern.search(line)
        if trial_match:
            current_trial = int(trial_match.group(1))
            seen_pairs = set()  # reset per trial

        gain_match = gain_pattern.search(line)
        if gain_match:
            config_id, gains = gain_match.groups()
            current_gain = list(map(float, gains.split()))

        word_match = word_pattern.search(line)
        if word_match and current_trial is not None and current_gain is not None:
            word = word_match.group("word")
            direction = int(word_match.group("direction"))
            if (word, direction) not in seen_pairs:
                parsed_data.append({
                    "trial": current_trial,
                    "word": word,
                    "direction": direction,
                    "gains": current_gain.copy()
                })
                seen_pairs.add((word, direction))

    return pd.DataFrame(parsed_data)


for i, participant in enumerate(participants):
    run_path = log_file_path = Path.cwd() / "session_data" / participant / "run" 
    log_file_path = run_path / "opt_log.log"
    
    df = parse_log_file(log_file_path)

    # Account for wrong blocks
    if participant == "VPpdla":
        block = 10
        start_idx = (block - 1) * 36  
        end_idx = start_idx + 36  
        df = df.drop(index=range(start_idx, end_idx)).reset_index(drop=True)
    elif participant == "VPpdlb":
        block = 1
        start_idx = (block - 1) * 36  
        end_idx = start_idx + 36  
        df = df.drop(index=range(start_idx, end_idx)).reset_index(drop=True)
    elif participant == "VPpdlc":
        block = 2
        start_idx = (block - 1) * 36  
        end_idx = start_idx + 36  
        df = df.drop(index=range(start_idx, end_idx)).reset_index(drop=True)

    df.to_csv(run_path / "parsed_word_data.csv", index=False)

In [None]:
import os
import wave
import numpy as np
import pyloudnorm as pyln
from eq import EQ

def load_and_eq_word(word, direction, gains, audio_base_path, eq, fs=44100):
    """
    Loads a word audio file using the wave module, applies EQ, and calculates LUFS.

    Parameters:
        word (str): The word (e.g., "Knuffel")
        direction (int): Direction index (e.g., 1 → 1.wav)
        gains (list of float): EQ gain values
        audio_base_path (str): Path to the folder containing word folders
        fs (int): Expected sampling rate

    Returns:
        processed_audio (np.ndarray): EQ'd audio (int16)
        lufs (float): Integrated loudness in LUFS
    """
    filepath = os.path.join(audio_base_path, word, f"{direction}.wav")
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"Audio file not found: {filepath}")

    # Open wave file
    with wave.open(filepath, 'rb') as wf:
        n_channels = wf.getnchannels()
        sampwidth = wf.getsampwidth()
        sr = wf.getframerate()
        n_frames = wf.getnframes()
        raw_data = wf.readframes(n_frames)

    if sr != fs:
        raise ValueError(f"Expected sampling rate {fs}, got {sr}")

    if sampwidth != 2:
        raise ValueError("Only 16-bit PCM WAV files are supported.")

    # Convert byte data to int16 numpy array
    audio_np = np.frombuffer(raw_data, dtype=np.int16).reshape(-1, n_channels)
    eq.update_filter_gains(gains)
    processed_audio = eq.apply_eq(audio_np, n_channels=n_channels)

    # Compute LUFS (normalize to float [-1, 1] first)
    meter = pyln.Meter(fs, block_size= 0.100)
    mono = processed_audio.mean(axis=1) / 32768.0
    lufs = meter.integrated_loudness(mono)

    return processed_audio, lufs

In [None]:
#Calculate LUFS for each participant

freqs = [300, 500, 750]
mod_indices = [[0],[1], [2]]
eq = EQ(fs = 44100, freqs = freqs, mod_indices = mod_indices)

lufs_all = np.zeros((5,60))

for i, participant in enumerate(participants):
    run_path = log_file_path = Path.cwd() / "session_data" / participant / "run" 

    lufs_arr = []

    df = pd.read_csv(run_path / "parsed_word_data.csv")  # adjust filename if needed

    for _, row in df.iterrows():
        word = row["word"]
        direction = row["direction"]
        gains = eval(row["gains"]) if isinstance(row["gains"], str) else row["gains"]  # ensure list
        try:
            processed_audio, _lufs = load_and_eq_word(
                word=word,
                direction=direction,
                eq=eq,
                gains=gains,
                audio_base_path="stereo/"  # replace with your actual path
            )
            lufs_arr.append(_lufs)
        except Exception as e:
            print(f"Error processing {word} dir {direction} in trial {row['trial']}: {e}")

    lufs_all[i,:] = np.array(lufs_arr).reshape(-1, 6).mean(axis=1)


In [None]:
# np.save('opt.npy', opt_auc_scores)
# np.save('val.npy', val_auc_scores)

opt_auc_scores = np.load("opt.npy")
val_auc_scores = np.load("val.npy")

In [None]:
fig, axes = plt.subplots(5, 2, figsize=(8, 20), sharex=False, sharey=False)
fig.suptitle("LUFS vs. AUC score and ΔLUFS vs. ΔAUC per participant", fontsize=16)

for i, participant in enumerate(participants):
    auc_scores = opt_auc_scores[i]
    lufs = lufs_all[i]
    # print(lufs)
    delta_auc = np.diff(auc_scores)
    delta_lufs = np.diff(lufs)

    # Plot 1: Mean LUFS vs AUC
    slope, intercept = np.polyfit(lufs, auc_scores, 1)
    x_vals = np.linspace(lufs.min(), lufs.max(), 100)
    y_vals = slope * x_vals + intercept
    r, p = pearsonr(lufs, auc_scores)

    ax1 = axes[i, 0] 
    ax1.scatter(lufs, auc_scores, c='royalblue')
    ax1.plot(x_vals, y_vals, color='red', linestyle='--', label='Fit line')
    ax1.set_title(f"LUFS vs. AUC\n(r = {r:.2f}, p = {p:.2f}, AUC $\sigma$ = {np.std(auc_scores):.2f})")
    ax1.set_xlabel(f"LUFS")
    ax1.set_ylabel(f"{participant}\nAUC Score")
    ax1.grid(True)

    # Plot 2: ΔLUFS vs ΔAUC
    slope, intercept = np.polyfit(delta_lufs, delta_auc, 1)
    x_vals = np.linspace(delta_lufs.min(), delta_lufs.max(), 100)
    y_vals = slope * x_vals + intercept
    dr, p = pearsonr(delta_lufs, delta_auc)

    ax2 = axes[i, 1] 
    ax2.scatter(delta_lufs, delta_auc, c='royalblue')
    ax2.plot(x_vals, y_vals,  color='red', linestyle='--', label='Fit line')
    ax2.set_title(f"ΔLUFS vs. ΔAUC\n(r = {dr:.2f}, p = {p:.2f}, ΔAUC $\sigma$ = {np.std(delta_auc):.2f})")
    ax2.set_xlabel("ΔLUFS")
    ax2.set_ylabel("ΔAUC")
    ax2.grid(True)

plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()


### Additional Plots


In [None]:
fig, axs = plt.subplots(2, 3, figsize=(14, 8), sharex=True, sharey=True)
axs = axs.flat
axs[5].set_axis_off()

for i, participant in enumerate(participants):
    run_path = log_file_path = Path.cwd() / "session_data" / participant / "run" 
    auc_per_config = np.load(run_path / "auc_per_config.npy")
    axs[i].hist(auc_per_config, bins=10, color='royalblue')
    axs[i].set_title(f'{participants[i]} ($\mu = {np.mean(auc_per_config):.2f}$, $\sigma$ = {np.std(auc_per_config):.2f})')
    axs[i].set_xlabel('AUC')
    axs[i].grid(True)
    axs[i].tick_params(labelbottom=True)
    if i == 0 or i == 3:
        axs[i].set_ylabel('Frequency')
        # 

axs[0].legend()
plt.suptitle('Distribution of AUC scores per trial for each participant')
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()

In [None]:
# GP plots (appendix)

import optimization

l_bounds = [1.0, 3.0, 2.5, 2.5, 2.5]
optimal_configs = [[-2.816, -3.061, -5.755],[-2.816,-1.592,3.796],[-2.081,0.857,-5.755],[-3.796,-0.367,5.755],[-6, -0.122,2.327]]

for i, participant in enumerate(participants):
    # participant = "VPpdlf"
    run_path = log_file_path = Path.cwd() / "session_data" / participant / "run" 

    configs = np.load(run_path / "configs.npy")
    auc_per_config = np.load(run_path / "auc_per_config.npy")

    gp, auc_scores_gp = optimization.fit_gp(configs, auc_per_config, lower_bound=l_bounds[i])
    optimization.plot_gp(gp, lower_bound=l_bounds[i], best_config=optimal_configs[i], participant=participant, slice_axis=1)

In [None]:
# Band gain plots

from scipy.stats import pearsonr


fig, axes = plt.subplots(5, 3, figsize=(12, 20), sharex='col', sharey='row')
fig.suptitle('Band gain vs. AUC score per participant', fontsize=16)

for i, participant in enumerate(participants):
    run_path = Path.cwd() / "session_data" / participant / "run"
    configs = np.load(run_path / "configs.npy")
    auc_scores = opt_auc_scores[i]  # shape: (n_configs,)

    for band in range(3):
        band_gains = configs[:, band]
        ax = axes[i, band] 

        # Scatter plot
        ax.scatter(band_gains, auc_scores, label='Data')

        # Fit line
        slope, intercept = np.polyfit(band_gains, auc_scores, deg=1)
        x_vals = np.linspace(band_gains.min(), band_gains.max(), 100)
        y_vals = slope * x_vals + intercept
        ax.plot(x_vals, y_vals, color='red', linestyle='--', label='Fit line')

        # Correlation coefficient
        r, p = pearsonr(band_gains, auc_scores)
        ax.set_title(f'Band {band + 1}\n(r = {r:.2f}, p = {p:.2f})')

        # Titles and labels
        if band == 0:
            ax.set_ylabel(f'{participant}\nAUC Score')
        ax.set_xlabel('Band gain (dB)')
        ax.grid(True)
        ax.tick_params(labelbottom=True)

plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()