In [8]:

# sphinx_gallery_thumbnail_number = 2

# Authors: Robert Luke <mail@robertluke.net>
#
# License: BSD (3-clause)

# Import common libraries
from collections import defaultdict
from copy import deepcopy
from itertools import compress
from pprint import pprint

# Import Plotting Library
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import numpy as np
import os
import pywt
import mne_nirs
import mne



# Import StatsModels
import statsmodels.formula.api as smf
from mne import Epochs, events_from_annotations, set_log_level
from mne.preprocessing.nirs import (
    beer_lambert_law,
    optical_density,
    scalp_coupling_index,
    temporal_derivative_distribution_repair,
    
)

# Import MNE processing
from mne.viz import plot_compare_evokeds

# Import MNE-BIDS processing
from mne_bids import BIDSPath, read_raw_bids


# Import MNE-NIRS processing
from mne_nirs.channels import get_long_channels, picks_pair_to_idx
from mne_nirs.datasets import fnirs_motor_group
from mne_nirs.signal_enhancement import (enhance_negative_correlation, short_channel_regression)
from mne_nirs.channels import (get_long_channels,
                               get_short_channels,
                               picks_pair_to_idx)

from collections import defaultdict
import numpy as np
from itertools import compress
from sklearn.decomposition import PCA

from mne_nirs.experimental_design import make_first_level_design_matrix
from mne_nirs.statistics import run_glm, statsmodels_to_results
from nilearn.plotting import plot_design_matrix
from mne.annotations import Annotations
# Set general parameters
set_log_level("WARNING")  # Don't show info, as it is repetitive for many subjects

# Preprocessing

In [9]:
def reject_epochs(max_data, min_data, condition_name, threshold_factor=2, max_reject_ratio=0.25):
    thresholds = {
        "Control": {"mean_max": 4.16e-6, "mean_min": -4.23e-6, "std_max": 2.51e-6, "std_min": 2.32e-6},
        "Noise": {"mean_max": 6.08e-6, "mean_min": -3.61e-6, "std_max": 2.55e-6, "std_min": 1.54e-6},
        "Speech": {"mean_max": 7.16e-6, "mean_min": -3.88e-6, "std_max": 3.16e-6, "std_min": 1.5e-6},
    }

    mean_max = thresholds[condition_name]["mean_max"]
    mean_min = thresholds[condition_name]["mean_min"]
    std_max = thresholds[condition_name]["std_max"]
    std_min = thresholds[condition_name]["std_min"]

    upper_bound_max = mean_max + (threshold_factor * std_max)
    lower_bound_min = mean_min - (threshold_factor * std_min)

    rejection_scores = []  # (epoch_idx, num_bad_channels, total_violation)

    for epoch_idx in range(max_data.shape[0]):
        epoch_max = max_data[epoch_idx, :]
        epoch_min = min_data[epoch_idx, :]

        bad_max = epoch_max > upper_bound_max
        bad_min = epoch_min < lower_bound_min
        bad_channels = bad_max | bad_min

        num_bad_channels = np.sum(bad_channels)

        if num_bad_channels > 0:
            max_violation = np.sum(epoch_max[bad_max] - upper_bound_max)
            min_violation = np.sum(lower_bound_min - epoch_min[bad_min])
            total_violation = max_violation + min_violation

            rejection_scores.append((epoch_idx, num_bad_channels, total_violation))

    # Sort by: 1) num_bad_channels DESC, 2) total_violation DESC
    sorted_scores = sorted(rejection_scores, key=lambda x: (-x[1], -x[2]))

    max_allowed_rejections = int(max_reject_ratio * max_data.shape[0])
    rejected_epochs = [idx for idx, _, _ in sorted_scores[:max_allowed_rejections]]

    # Identify good epochs
    all_epochs = set(range(max_data.shape[0]))
    good_epochs = sorted(list(all_epochs - set(rejected_epochs)))
    bad_epochs = sorted(rejected_epochs)

    # Remove rejected epochs
    cleaned_max = np.delete(max_data, rejected_epochs, axis=0)
    cleaned_min = np.delete(min_data, rejected_epochs, axis=0)

    # Create dynamic variable names
    condition_upper = condition_name.upper()
    good_key = f"{condition_upper}_GOOD_IDX"
    bad_key = f"{condition_upper}_BAD_IDX"

    # Return cleaned data and dictionaries of indexes
    return cleaned_max, cleaned_min, {
        good_key: good_epochs,
        bad_key: bad_epochs
    }


In [43]:
# cropping the signal before sci calculation
def preprocessing_glm_ROI(bids_path, subject_id, session_id, id):
    print(f"Processing subject {subject_id} session {session_id}...")
    # Read data with annotations in BIDS format
    raw_intensity = read_raw_bids(bids_path=bids_path, verbose=False)
    raw_intensity.annotations.set_durations({'Control' : 5, 'Noise' : 5, 'Speech' : 5.25})
    # Get the events locations
    Breaks, _ = mne.events_from_annotations(raw_intensity, {'Xstart': 4, 'Xend': 5})
    AllEvents, _ = mne.events_from_annotations(raw_intensity)
    ControlEvents, _ = mne.events_from_annotations(raw_intensity, {'Control': 1})
    NoiseEvents, _ = mne.events_from_annotations(raw_intensity, {'Noise': 2})
    SpeechEvents, _ = mne.events_from_annotations(raw_intensity, {'Speech': 3})
    Breaks = Breaks[:, 0] / raw_intensity.info['sfreq']
    LastEvent = AllEvents[-1, 0] / raw_intensity.info['sfreq']
    
    if len(Breaks) % 2 == 0:
        raise ValueError("Breaks array should have an odd number of elements.")
    
    original_duration = raw_intensity.times[-1] - raw_intensity.times[0]
    
    # Cropping dataset
    cropped_intensity = raw_intensity.copy().crop(Breaks[0], Breaks[1])
    for j in range(2, len(Breaks) - 1, 2):
        block = raw_intensity.copy().crop(Breaks[j], Breaks[j + 1])
        cropped_intensity.append(block)
    cropped_intensity.append(raw_intensity.copy().crop(Breaks[-1], LastEvent + 15.25))
    
    cropped_duration = cropped_intensity.times[-1] - cropped_intensity.times[0]
    #print(f"Cropped duration: {cropped_duration:.2f} seconds")
    
    if cropped_duration >= original_duration:
        print(f"WARNING: Cropping did not reduce duration!")
    
    raw_intensity_cropped = cropped_intensity.copy()

    
    
    # Remove break annotations
    #print("Removing break annotations for the orginal raw...")
    raw_intensity.annotations.delete(np.where(
        (raw_intensity.annotations.description == 'Xstart') | 
        (raw_intensity.annotations.description == 'Xend') | 
        (raw_intensity.annotations.description == 'BAD boundary') | 
        (raw_intensity.annotations.description == 'EDGE boundary')
    )[0])
    
    raw_intensity_cropped.annotations.delete(np.where(
        (raw_intensity_cropped.annotations.description == 'Xstart') | 
        (raw_intensity_cropped.annotations.description == 'Xend') | 
        (raw_intensity_cropped.annotations.description == 'BAD boundary') | 
        (raw_intensity_cropped.annotations.description == 'EDGE boundary')
    )[0]) 
    
    # Convert signal to optical density and determine bad channels
    raw_od = optical_density(raw_intensity)
    raw_od_cropped = optical_density(raw_intensity_cropped)
    
    # get the total number of short channels
    short_chs = get_short_channels(raw_od)
    tot_number_of_short_channels = len(short_chs.ch_names)
    
    # sci calculated
    sci = scalp_coupling_index(raw_od_cropped, l_freq=0.7, h_freq=1.45)
    bad_channels= list(compress(raw_od.ch_names, sci < 0.8))
    
    if len(bad_channels) > 55:
        print(f"❌ Too many bad channels ({len(bad_channels)}). Excluding subject from analysis.")
        return None
    
    raw_od.info["bads"] = bad_channels
    raw_intensity_cropped.info["bads"] = bad_channels
    
    raw_od = temporal_derivative_distribution_repair(raw_od)
    raw_od_cropped = temporal_derivative_distribution_repair(raw_od_cropped)

    
     # Get long channels
    long_chs = get_long_channels(raw_od)
    bad_long_chs = long_chs.info["bads"]
    
    # print the number of short bad channels
    len_bad_short_chs = len(bad_channels) - len(bad_long_chs)
    
    # Determine if there are short channels
    len_bad_short_chs = len(bad_channels) - len(bad_long_chs)
    num_good_short_channels = tot_number_of_short_channels - len_bad_short_chs
    
    # Determine if there are short channels
    if num_good_short_channels < 4:
        print("❌ No short channels found. Skipping the subject.")
        return None, None, None, None, None # Keep the data unchanged
    else:
        
        raw_od_corrected = short_channel_regression(raw_od)
        
    # short-channel regression subtracts a scaled version of the signal obtained from the nearest short channel from the signal obtained from the long channel. 
    # Convert to haemoglobin and filter
    raw_haemo_bef = beer_lambert_law(raw_od_corrected, ppf=0.1)
    
    raw_haemo_bef = get_long_channels(raw_haemo_bef, min_dist=0.02) 

    #low-pass
    raw_haemo_bef = raw_haemo_bef.filter(l_freq = None, h_freq = 0.2,  
                                 method="iir", iir_params =dict(order=5, ftype='butter'))
    #high-pass
    raw_haemo_bef= raw_haemo_bef.filter(l_freq =  0.05, h_freq = None, method="iir", iir_params =dict(order=5, ftype='butter'))
    
    # Create epochs before filtering
    all_events, all_event_dict = mne.events_from_annotations(raw_haemo_bef)
    epochs = mne.Epochs(
        raw_haemo_bef,
        all_events,
        event_id=all_event_dict,
        tmin=-5,
        tmax=15,
        reject=dict(hbo=100e-6),
        reject_by_annotation=True,
        proj=True,
        baseline=(None, 0),
        detrend=1,
        preload=True,
        verbose=None,
    )
    
    # EPOCH REJECTION: WE WANT TO CREATE NEW ANNOATIONS MARKING THE GOOD EPOCHS FOR EACH CONDITION
    
    
    # === EPOCH REJECTION ===
    bad_ch = epochs.info['bads']
    epochs.drop_channels(bad_ch)
    epochs_before_cleaning = epochs.copy()

    hbo_data = epochs.copy().pick("hbo")
    data = hbo_data.get_data()
    ev = epochs.events[:, 2]
    index_column = np.arange(0, len(ev)).reshape(-1, 1)
    updated_matrix = np.hstack((index_column, ev.reshape(-1, 1)))

    time_slice = data[:, :, 26:105]
    max_values = np.max(time_slice, axis=2)
    min_values = np.min(time_slice, axis=2)

    control_idx = updated_matrix[updated_matrix[:, 1] == 1][:, 0]
    noise_idx = updated_matrix[updated_matrix[:, 1] == 2][:, 0]
    speech_idx = updated_matrix[updated_matrix[:, 1] == 3][:, 0]

    control_max = max_values[control_idx, :]
    noise_max = max_values[noise_idx, :]
    speech_max = max_values[speech_idx, :]

    control_min = min_values[control_idx, :]
    noise_min = min_values[noise_idx, :]
    speech_min = min_values[speech_idx, :]

    cleaned_max, cleaned_min, idx_dict = reject_epochs(control_max, control_min, "Control")
    CONTROL_GOOD_IDX = idx_dict["CONTROL_GOOD_IDX"]
    CONTROL_BAD_IDX = idx_dict["CONTROL_BAD_IDX"]
    
    cleaned_max, cleaned_min, idx_dict = reject_epochs(noise_max, noise_min, "Noise")
    NOISE_GOOD_IDX = idx_dict["NOISE_GOOD_IDX"]
    NOISE_BAD_IDX = idx_dict["NOISE_BAD_IDX"]
    
    cleaned_max, cleaned_min, idx_dict = reject_epochs(speech_max, speech_min, "Speech")
    SPEECH_GOOD_IDX = idx_dict["SPEECH_GOOD_IDX"]
    SPEECH_BAD_IDX = idx_dict["SPEECH_BAD_IDX"]

    
    raw_haemo= raw_haemo_bef.copy()
    
    # --- CONTROL ---
    ControlEvents_good = ControlEvents[CONTROL_GOOD_IDX]
    onsets_control = raw_haemo.times[ControlEvents_good[:, 0]]
    durations_control = [0] * len(onsets_control)
    descriptions_control = ['Control'] * len(onsets_control)
    good_annotations_control = Annotations(onset=onsets_control, duration=durations_control, description=descriptions_control)

    # --- NOISE ---
    NoiseEvents_good = NoiseEvents[NOISE_GOOD_IDX]
    onsets_noise = raw_haemo.times[NoiseEvents_good[:, 0]]
    durations_noise = [0] * len(onsets_noise)
    descriptions_noise = ['Noise'] * len(onsets_noise)
    good_annotations_noise = Annotations(onset=onsets_noise, duration=durations_noise, description=descriptions_noise)

    # --- SPEECH ---
    SpeechEvents_good = SpeechEvents[SPEECH_GOOD_IDX]
    onsets_speech = raw_haemo.times[SpeechEvents_good[:, 0]]
    durations_speech = [0] * len(onsets_speech)
    descriptions_speech = ['Speech'] * len(onsets_speech)
    good_annotations_speech = Annotations(onset=onsets_speech, duration=durations_speech, description=descriptions_speech)

    # --- COMBINE AND SET ---
    combined_annotations = good_annotations_control + good_annotations_noise + good_annotations_speech
    raw_haemo.set_annotations(combined_annotations)
        
    
    
    # Continue with GLM analysis
    events, event_dict = mne.events_from_annotations(raw_haemo)
    isis, names = mne_nirs.experimental_design.longest_inter_annotation_interval(raw_haemo)
    design_matrix = make_first_level_design_matrix(
        raw_haemo,
        drift_model='cosine',
        high_pass=1/(2*max(isis)),
        hrf_model='spm',
        stim_dur=5.125
    )
    glm_est = run_glm(raw_haemo, design_matrix)


    # Define ROI channel pairs
    left = [[4, 2], [4, 3], [5, 2], [5, 3], [5, 4], [5, 5]]
    right = [[10, 9], [10, 10], [10, 11], [10, 12], [11, 11], [11, 12]]
    back = [[6, 6], [6, 8], [7, 6], [7, 7], [7, 8], [8, 7], [8, 8], [9, 8]] 
    front = [[1, 1], [2, 1], [3, 1], [3, 2], [12, 1]]
    noise= [[10, 11], [5, 3], [11, 11], [10, 9], [10, 12], [10, 10]]
    speech= [[10, 11], [10, 10], [10, 12], [11, 12], [1, 1], [10, 9], [8, 7], [7, 7], [8, 8]]
    common= [[10, 11], [10, 9], [10, 12], [10, 10]]
    only_noise= [[5, 3], [11, 11]]
    only_speech= [ [11, 12], [1, 1], [10, 9], [8, 7], [7, 7], [8, 8]]
    New_Speech= [[2, 1], [5, 5], [6, 8], [7,7], [8,8], [9,8], [10,9], [12, 1]]
    New_Noise= [[2, 1], [3, 1], [6, 8], [7, 6], [9, 8]]
    

    # Generate index picks for each ROI
    roi_picks = dict(
        Left= picks_pair_to_idx(raw_haemo, left, on_missing="ignore"),
        Right= picks_pair_to_idx(raw_haemo, right, on_missing="ignore"),
        Back= picks_pair_to_idx(raw_haemo, back, on_missing="ignore"),
        Front= picks_pair_to_idx(raw_haemo, front, on_missing="ignore"),
        Noise= picks_pair_to_idx(raw_haemo, noise, on_missing="ignore"),
        Speech= picks_pair_to_idx(raw_haemo, speech, on_missing="ignore"),
        Common= picks_pair_to_idx(raw_haemo, common, on_missing="ignore"),
        OnlyNoise= picks_pair_to_idx(raw_haemo, only_noise, on_missing="ignore"),
        OnlySpeech= picks_pair_to_idx(raw_haemo, only_speech, on_missing="ignore"),
        New_Speech= picks_pair_to_idx(raw_haemo, New_Speech, on_missing="ignore"),
        New_Noise= picks_pair_to_idx(raw_haemo, New_Noise, on_missing="ignore"),
    )
    
    cha= glm_est.to_dataframe()
    
    roi= glm_est.to_dataframe_region_of_interest(
        roi_picks, design_matrix.columns, demographic_info=True
    )
    
    # Define left vs right tapping contrast
    contrast_matrix = np.eye(design_matrix.shape[1])
    basic_conts = dict(
        [(column, contrast_matrix[i]) for i, column in enumerate(design_matrix.columns)]
    )
    contrast_LvR = basic_conts["Noise"] - basic_conts["Speech"]

    # Compute defined contrast
    contrast = glm_est.compute_contrast(contrast_LvR)
    con = contrast.to_dataframe()

    # Add the participant sub to the dataframes
    roi["ID"] = cha["ID"] = con["ID"] = id
    
    roi["Subject"] = cha["Subject"] = con["Subject"] = subject_id
    
    # Add the session to the dataframes
    roi["session"] = cha["session"] = con["session"] = session_id

    # Convert to uM for nicer plotting below.
    cha["theta"] = [t * 1.0e6 for t in cha["theta"]]
    roi["theta"] = [t * 1.0e6 for t in roi["theta"]]
    con["effect"] = [t * 1.0e6 for t in con["effect"]]
    
    
    
    return raw_haemo, roi, cha, con
    

# Analysis

In [44]:

bids_root = r"C:\Datasets\Test-retest study\bids_dataset"
subject_list = sorted([d for d in os.listdir(bids_root) if d.startswith("sub-")])
subject_list = [s.replace("sub-", "") for s in subject_list]
#subject_list = subject_list[:1]  # Limit to first 3 subjects for testing

print("Detected subjects:", subject_list)
subjects_df = pd.DataFrame(columns=["subject", "session"])




#optode_subject_sessions = {optode: pd.DataFrame(columns=["subject", "session"]) for optode in optodes}

df_roi = pd.DataFrame()  # To store region of interest results
df_cha = pd.DataFrame()  # To store channel level results
df_con = pd.DataFrame()  # To store channel level contrast results
id = 0
# Loop through subjects and sessions
for sub in subject_list:
    for ses in range(1, 3):
        
        bids_path = BIDSPath(
                subject=f"{sub}",
                session=f"{ses:02d}",
                task="auditory",
                datatype="nirs",
                root=bids_root,
                suffix="nirs",
                extension=".snirf",
            )
            
        raw_haemo, roi, cha, con = preprocessing_glm_ROI(bids_path, sub, ses, id)
        if raw_haemo is None:
            print(f"⚠️ No data for Subject {sub}, Session {ses:02d}. Skipping...")
            continue
        else:
            
            # Append individual results to all participants
            df_roi = pd.concat([df_roi, roi], ignore_index=True)
            df_cha = pd.concat([df_cha, cha], ignore_index=True)
            df_con = pd.concat([df_con, con], ignore_index=True)
            id= id + 1
                


Detected subjects: ['01', '02', '03', '04', '05', '07', '08', '10', '11', '12', '13', '16', '17', '19', '21', '24']
Processing subject 01 session 1...
Processing subject 01 session 2...
Processing subject 02 session 1...
Processing subject 02 session 2...
Processing subject 03 session 1...
Processing subject 03 session 2...
Processing subject 04 session 1...
Processing subject 04 session 2...
Processing subject 05 session 1...
Processing subject 05 session 2...
Processing subject 07 session 1...
Processing subject 07 session 2...
Processing subject 08 session 1...
Processing subject 08 session 2...
Processing subject 10 session 1...
Processing subject 10 session 2...
Processing subject 11 session 1...
Processing subject 11 session 2...
Processing subject 12 session 1...
Processing subject 12 session 2...
Processing subject 13 session 1...
Processing subject 13 session 2...
Processing subject 16 session 1...
Processing subject 16 session 2...
Processing subject 17 session 1...
Processin

In [None]:
import pandas as pd
import statsmodels.formula.api as smf

# Step 1: Filter for session 1 and hbo only
ch_summary = df_cha.query("Condition in ['Control', 'Noise', 'Speech'] and Chroma == 'hbo' and session == 1").copy()
#ch_summary = df_cha.query("Condition in ['Control', 'Noise', 'Speech'] and Chroma == 'hbo'").copy()

# Step 2: Ensure 'Condition' is categorical with 'Control' as the reference
ch_summary["Condition"] = pd.Categorical(
    ch_summary["Condition"], categories=["Control", "Noise", "Speech"], ordered=True
)

# Step 3: Fit per-channel models and collect results
results = []

for ch in ch_summary["ch_name"].unique():
    print(f"Fitting model for channel {ch}...")
    ch_data = ch_summary[ch_summary["ch_name"] == ch].copy()

    try:
        model = smf.mixedlm(
            "theta ~ Condition",  # Compare Noise & Speech vs Control
            ch_data,
            groups=ch_data["Subject"],
        ).fit(method="powell")

        # Extract parameters and p-values directly
        for param_name in model.params.index:
            if "Condition[T.Noise]" in param_name or "Condition[T.Speech]" in param_name:
                results.append({
                    "Channel": ch,
                    "Comparison": param_name,
                    "Beta": model.params[param_name],
                    "StdErr": model.bse[param_name],
                    "z": model.tvalues[param_name],
                    "p": model.pvalues[param_name]
                })

    except Exception as e:
        print(f"Model failed for channel {ch}: {e}")

# Step 4: Create DataFrame from results
significant = pd.DataFrame(results)

# Step 5: Filter for significance
significant = significant[significant["p"] < 0.05]

# View significant results
significant


Fitting model for channel S1_D1 hbo...
Fitting model for channel S2_D1 hbo...




Fitting model for channel S3_D1 hbo...
Fitting model for channel S3_D2 hbo...
Fitting model for channel S4_D2 hbo...
Fitting model for channel S4_D3 hbo...
Fitting model for channel S5_D2 hbo...
Fitting model for channel S5_D3 hbo...




Fitting model for channel S5_D4 hbo...
Fitting model for channel S5_D5 hbo...
Fitting model for channel S6_D6 hbo...




Fitting model for channel S6_D8 hbo...
Fitting model for channel S7_D6 hbo...
Fitting model for channel S7_D7 hbo...
Fitting model for channel S7_D8 hbo...




Fitting model for channel S8_D7 hbo...
Fitting model for channel S8_D8 hbo...
Fitting model for channel S9_D8 hbo...
Fitting model for channel S10_D9 hbo...
Fitting model for channel S10_D10 hbo...
Fitting model for channel S10_D11 hbo...
Fitting model for channel S10_D12 hbo...
Fitting model for channel S11_D11 hbo...
Fitting model for channel S11_D12 hbo...
Fitting model for channel S12_D1 hbo...




Unnamed: 0,Channel,Comparison,Beta,StdErr,z,p
2,S2_D1 hbo,Condition[T.Noise],-1.921488,0.768891,-2.499037,0.012453
3,S2_D1 hbo,Condition[T.Speech],-2.385879,0.768891,-3.103011,0.001916
4,S3_D1 hbo,Condition[T.Noise],-2.190981,0.812607,-2.696236,0.007013
5,S3_D1 hbo,Condition[T.Speech],-2.124317,0.812607,-2.614198,0.008944
7,S3_D2 hbo,Condition[T.Speech],-3.426156,1.09726,-3.122466,0.001793
13,S5_D2 hbo,Condition[T.Speech],-3.398747,1.199594,-2.833248,0.004608
19,S5_D5 hbo,Condition[T.Speech],-2.608841,1.285709,-2.029108,0.042447
22,S6_D8 hbo,Condition[T.Noise],-2.008476,0.771238,-2.604222,0.009208
23,S6_D8 hbo,Condition[T.Speech],-3.039126,0.771238,-3.94058,8.1e-05
24,S7_D6 hbo,Condition[T.Noise],-2.318189,1.165373,-1.989225,0.046676


In [45]:
grp_results = df_roi.query("Condition in ['Control','Noise', 'Speech']")

for roi in ["Left", "Right", "Back", "Front", "Noise", "Speech", "Common", "OnlyNoise", "OnlySpeech", "New_Speech", "New_Noise"]:
    print(f"Running mixed model for {roi}...")
    subset = grp_results[(grp_results["Chroma"] == "hbo") & (grp_results["ROI"] == roi)].copy()

    # Ensure categorical variables
    subset["Condition"] = subset["Condition"].astype("category")
    subset["session"] = subset["session"].astype("category")
    
    subset["Condition"] = subset["Condition"].cat.reorder_categories(["Control", "Noise", "Speech"], ordered=True)


    # Fit mixed model: Condition + Session as fixed, ID as random intercept
    model = smf.mixedlm("theta ~ Condition + session + Condition*session", subset, groups=subset["Subject"])
    result = model.fit( method="powell")
    print(result.summary())


Running mixed model for Left...
                   Mixed Linear Model Regression Results
Model:                    MixedLM       Dependent Variable:       theta    
No. Observations:         96            Method:                   REML     
No. Groups:               16            Scale:                    4.8431   
Min. group size:          6             Log-Likelihood:           -211.5980
Max. group size:          6             Converged:                Yes      
Mean group size:          6.0                                              
---------------------------------------------------------------------------
                                 Coef.  Std.Err.   z    P>|z| [0.025 0.975]
---------------------------------------------------------------------------
Intercept                        -0.163    0.588 -0.277 0.782 -1.315  0.989
Condition[T.Noise]               -0.865    0.778 -1.112 0.266 -2.390  0.660
Condition[T.Speech]              -0.329    0.778 -0.423 0.672 -1.854  1.196

In [47]:
from scipy.stats import ttest_rel
import pandas as pd

rois = ["Left", "Right", "Back", "Front", "Noise", "Speech", "Common", "OnlyNoise", "OnlySpeech", "New_Speech", "New_Noise"]

for roi in rois:
    df_roi_subset = df_roi[df_roi["ROI"] == roi]

    # Get subsets for each condition and chroma
    df_control = df_roi_subset[(df_roi_subset["Condition"] == "Control") & (df_roi_subset["Chroma"] == "hbo")]
    df_noise   = df_roi_subset[(df_roi_subset["Condition"] == "Noise") & (df_roi_subset["Chroma"] == "hbo")]
    df_speech  = df_roi_subset[(df_roi_subset["Condition"] == "Speech") & (df_roi_subset["Chroma"] == "hbo")]

    # --- Control vs Speech ---
    control_speech = pd.merge(
        df_control, df_speech,
        on=["Subject", "session"],
        suffixes=("_control", "_speech")
    )
    if not control_speech.empty:
        t_stat_cs, p_val_cs = ttest_rel(control_speech["theta_control"], control_speech["theta_speech"])
        print(f"🔹 Paired t-test (Control vs Speech) – ROI: {roi}")
        print(f"   t = {t_stat_cs:.4f}, p = {p_val_cs:.4f}")
    else:
        print(f"⚠️  No matching data for Control vs Speech in ROI: {roi}")

    # --- Control vs Noise ---
    control_noise = pd.merge(
        df_control, df_noise,
        on=["Subject", "session"],
        suffixes=("_control", "_noise")
    )
    if not control_noise.empty:
        t_stat_cn, p_val_cn = ttest_rel(control_noise["theta_control"], control_noise["theta_noise"])
        print(f"🔹 Paired t-test (Control vs Noise) – ROI: {roi}")
        print(f"   t = {t_stat_cn:.4f}, p = {p_val_cn:.4f}")
    else:
        print(f"⚠️  No matching data for Control vs Noise in ROI: {roi}")
    
    print("-" * 60)


🔹 Paired t-test (Control vs Speech) – ROI: Left
   t = 1.6444, p = 0.1102
🔹 Paired t-test (Control vs Noise) – ROI: Left
   t = 1.7396, p = 0.0919
------------------------------------------------------------
🔹 Paired t-test (Control vs Speech) – ROI: Right
   t = 0.7064, p = 0.4852
🔹 Paired t-test (Control vs Noise) – ROI: Right
   t = -0.3566, p = 0.7238
------------------------------------------------------------
🔹 Paired t-test (Control vs Speech) – ROI: Back
   t = 5.3728, p = 0.0000
🔹 Paired t-test (Control vs Noise) – ROI: Back
   t = 2.3721, p = 0.0241
------------------------------------------------------------
🔹 Paired t-test (Control vs Speech) – ROI: Front
   t = 2.5301, p = 0.0167
🔹 Paired t-test (Control vs Noise) – ROI: Front
   t = 1.9955, p = 0.0548
------------------------------------------------------------
🔹 Paired t-test (Control vs Speech) – ROI: Noise
   t = 0.9244, p = 0.3624
🔹 Paired t-test (Control vs Noise) – ROI: Noise
   t = -0.1757, p = 0.8617
-------------