In [1]:
import re, glob
import numpy as np
import pandas as pd
import nibabel as nib
import matplotlib.pyplot as plt
from pathlib import Path

from nilearn.glm.first_level import make_first_level_design_matrix, FirstLevelModel
from nilearn.glm.contrasts import compute_fixed_effects
from nilearn.plotting import plot_stat_map, plot_design_matrix
from nilearn.glm import threshold_stats_img

In [None]:
# --- PATHS ---
# FMRIPREP_ROOT = Path("/neurospin/motif-stroke/7T_protocol/pilots/derivatives/fmriprep")
FMRIPREP_ROOT = Path("/volatile/home/sb283337/Bureau/7T-fMRI-Motor-Stroke/data/fmriprep")
EVENTS_DIR    = Path("/volatile/home/sb283337/Bureau/7T-fMRI-Motor-Stroke/data/events")
LOGS_DIR      = Path("/volatile/home/sb283337/Bureau/7T-fMRI-Motor-Stroke/data/log")
SEQ_DIR       = Path("/volatile/home/sb283337/Bureau/7T-fMRI-Motor-Stroke/data/seq")
RESULTS_DIR   = Path("/volatile/home/sb283337/Bureau/7T-fMRI-Motor-Stroke/results_industrial")

# --- SETTINGS ---
TASK, SPACE, TR = "motif4limbs", "MNI152NLin2009cAsym", 1.6
BLANK_S = 1.0
PAUSE_S = 4.8
STIM_S  = 2.2 
RESP_DUR_S = 0.2
SMOOTHING_FWHM = 3.5

# --- CONTRASTS
CONTRASTS = {
    "task_gt_baseline": "0.25*(main_gauche + main_droite + pied_gauche + pied_droit)",
    "hand_vs_foot": "0.5*(main_gauche + main_droite) - 0.5*(pied_gauche + pied_droit)",
    "global_right_vs_left": "0.5*(main_droite + pied_droit) - 0.5*(main_gauche + pied_gauche)",
    "right_vs_left_hand": "main_droite - main_gauche",
    "right_vs_left_foot": "pied_droit - pied_gauche"

}


# A "Menu" of coordinates for different body parts (MNI Space)
ROI_COORDS = {
    "hand_knob":    (0, -30, 60),    # Classic view for Hand/Finger areas
    "medial_wall":  (0, -30, 70),    # High axial view for Feet (closer to the midline)
    "motor_strip":  (0, -25, 55),    # Slightly lower/forward to catch the full M1/S1 range
    "lateral_view": (-40, -20, 50)   # Side view if you want to check Left M1 specifically
}

# Mapping contrasts to their best viewing coordinates
CONTRAST_VIEWS = {
    "task_gt_baseline":     "motor_strip",   # Show everything that moved
    "hand_vs_foot":         "hand_knob",     # Hand knob is the best reference here
    "global_right_vs_left": "motor_strip",   # Show the whole left vs right brain split
    "right_vs_left_hand":   "hand_knob",     # Focus on the hand area
    "right_vs_left_foot":   "medial_wall"    # Focus on the foot area (near the top/middle)
}


METHOD = "events"  # "sequence" OR "log"
BASE_RESULTS_DIR = Path("results_V01")

In [3]:

def get_motif_files(sub_id):
    """Strictly finds motif4limbs tasks and extracts run/dir."""
    func_dir = FMRIPREP_ROOT / f"sub-{sub_id}" / "func"
    # Ensure we use the exact task name
    pattern = f"sub-{sub_id}_task-{TASK}_dir-*_run-*_space-{SPACE}_desc-preproc_bold.nii.gz"
    bolds = sorted(func_dir.glob(pattern))
    
    run_data = []
    for b in bolds:
        # Improved Regex to catch 'dir' and 'run'
        run_match = re.search(r"run-(\d+)", b.name)
        dir_match = re.search(r"dir-([a-z]+)", b.name)
        
        if run_match and dir_match:
            run_num = run_match.group(1)
            direc = dir_match.group(1)
            
            run_data.append({
                "run": run_num,
                "dir": direc,
                "bold": b,
                "mask": func_dir / b.name.replace("desc-preproc_bold.nii.gz", "desc-brain_mask.nii.gz"),
                "conf": func_dir / b.name.replace(f"space-{SPACE}_desc-preproc_bold.nii.gz", "desc-confounds_timeseries.tsv")
            })
    return run_data

In [4]:
def build_events_sequence(run):
    """
    EXACT ORIGINAL LOGIC:
    - Onset depends on run ID (one file per run).
    - Uses specific BLANK_S and STIM_S increments to maintain timing.
    """
    seq_path = SEQ_DIR / f"stim_sequence_run-{int(run)}.csv"
    if not seq_path.exists():
        return None
    seq = pd.read_csv().sort_values(["block_id", "id"])

    rows = []
    current = 0.0 # Starting t0
    current_block = None

    for _, r in seq.iterrows():
        # Block transition logic
        if current_block is None:
            current_block = r["block_id"]
        elif r["block_id"] != current_block:
            current += PAUSE_S
            current_block = r["block_id"]

        # Blank window (shifts the onset)
        current += BLANK_S

        # Record the stimulus window
        rows.append({
            "onset": current,
            "duration": STIM_S,
            "trial_type": r["block_name"],
            "modulation": 1.0
        })

        # Move to end of stimulus
        current += STIM_S

    return pd.DataFrame(rows) if rows else None


def build_events_tsv(tsv_path, log_path):
    """
    SYNC WORKER:
    Takes two specific file paths, aligns them to the TTL, 
    and returns a Nilearn-ready DataFrame.
    """
    # 1. The actual LOAD happens here
    df_tsv = pd.read_csv(tsv_path, sep='\t')
    
    # Use default sep=',' because raw log is a .csv
    df_log = pd.read_csv(log_path)
    
    # 2. Find the TTL anchor (The "Starting Gun")
    # We look for 'TTL' or the code '116'
    ttl_rows = df_log[df_log['event'].isin(['TTL', '116'])]
    t0 = ttl_rows.iloc[0]['time']
    
    # 3. Apply the synchronization math
    events = pd.DataFrame()
    
    # Reset onset to 0.0 based on the TTL and convert to seconds
    events['onset'] = (df_tsv['onset'] - t0) / 1000.0
    
    # Convert duration to seconds
    events['duration'] = df_tsv['duration'] / 1000.0
    
    # Keep the condition names
    events['trial_type'] = df_tsv['trial_type']
    
    # 4. Filter for Nilearn
    # We remove 'fix' or 'blank' because 'nothing' is the implicit baseline
    forbidden = ['fix', 'blank', 'fixation', 'rest', 'Start', 'End']
    events = events[~events['trial_type'].isin(forbidden)].copy()
    
    # Constant modulation for the GLM
    events['modulation'] = 1.0

    return events

In [5]:
def build_the_design_matrix(bold_img, events, conf_path):
    """Matches your original build_design_matrix exactly."""
    n_scans = bold_img.shape[-1]
    tr = bold_img.header.get_zooms()[-1] # Uses header TR to ensure 9 drifts
    frame_times = np.arange(n_scans) * tr

    # Loading motion + outliers exactly as your original load_confounds
    conf = pd.read_csv(conf_path, sep="\t")
    motion = [c for c in ["trans_x","trans_y","trans_z","rot_x","rot_y","rot_z"] if c in conf.columns]
    outliers = [c for c in conf.columns if ("motion_outlier" in c) or ("non_steady_state" in c)]
    conf_sel = conf[motion + outliers].fillna(0.0)

    return make_first_level_design_matrix(
        frame_times=frame_times,
        events=events,
        hrf_model='glover',
        drift_model='cosine',
        high_pass=0.01,
        add_regs=conf_sel.values,
        add_reg_names=list(conf_sel.columns)
    )


In [6]:
def fit_run_and_save_nifti(bold_path, mask_path, dm, run_dir, c_name, c_expr,smoothing_fwhm=3.5):
    """
    Fits the GLM for one run and immediately saves the NIfTI files.
    """
    bold_img = nib.load(str(bold_path))
    tr = bold_img.header.get_zooms()[-1]
    
    # Standardize=False is usually better for 7T if you already scrubbed/cleaned data
    model = FirstLevelModel(
        t_r=tr, 
        mask_img=str(mask_path), 
        hrf_model='glover', 
        drift_model='cosine', 
        high_pass=0.01, 
        noise_model='ar1',
        smoothing_fwhm=smoothing_fwhm, 
        standardize=False, 
        minimize_memory=False
    ).fit(bold_img, design_matrices=dm)
    
    stats = model.compute_contrast(c_expr, output_type='all')
    
    # Save inputs for the next step (Fusion)
    stats['z_score'].to_filename(run_dir / f"{c_name}_zmap.nii.gz")
    stats['effect_size'].to_filename(run_dir / f"{c_name}_beta.nii.gz")
    stats['effect_variance'].to_filename(run_dir / f"{c_name}_variance.nii.gz")
    
    return stats

def save_brain_viz(nifti_input, output_png, title, coords=(0, -30, 60), threshold=1e-6):
    """
    Plots a NIfTI map. 
    By default, it uses a tiny threshold (1e-6) to show everything in a 'clean' map.
    """
    display = plot_stat_map(
        nifti_input,                   
        threshold=threshold,  # <--- Pass the extracted threshold here!
        cut_coords=coords,
        title=title, 
        colorbar=True,
        display_mode='ortho',
        black_bg=True             
    )
    plt.savefig(output_png)
    plt.close()


def subject_fusion_and_save_data(run_stats_list, mask_path, out_dir, c_name):
    """
    Computes Fixed Effects Fusion, applies FDR, and saves all NIfTIs.
    """
    out_dir.mkdir(parents=True, exist_ok=True)
    
    # 1. Gather maps
    eff_imgs = [s['effect_size'] for s in run_stats_list]
    var_imgs = [s['effect_variance'] for s in run_stats_list]
    
    # 2. Compute Fixed Effects (The Math)
    maps = compute_fixed_effects(eff_imgs, var_imgs, mask_path)
    contrast_map, var_map, t_map, z_map = maps[0], maps[1], maps[2], maps[3]
    
    # 3. Apply FDR (The Voxel Police)
    z_fdr, fdr_thresh = threshold_stats_img(z_map, alpha=0.05, height_control='fdr')
    print(f"[Statistics] FDR Threshold for {c_name}: Z > {fdr_thresh:.2f}")

    # 4. Save NIfTIs immediately (Safety First)
    contrast_map.to_filename(out_dir / f"{c_name}_TOTAL_beta.nii.gz")
    var_map.to_filename(out_dir / f"{c_name}_TOTAL_variance.nii.gz")
    t_map.to_filename(out_dir / f"{c_name}_TOTAL_tstat.nii.gz")
    z_map.to_filename(out_dir / f"{c_name}_TOTAL_zmap_uncorrected.nii.gz")
    z_fdr.to_filename(out_dir / f"{c_name}_TOTAL_zmap_FDR.nii.gz")

    # --- 5. RETURN DICTIONARY (Efficiency) ---
    return {
        'beta': contrast_map,
        'variance': var_map,
        't_stat': t_map,
        'z_uncorrected': z_map,
        'z_fdr': z_fdr,
        'fdr_threshold': fdr_thresh
                }



### What each one tells you (for your own retrieval)

* **`beta_map`**: The "Signal." Use this to see the **percent signal change**.  "how strong was the activation?"
* **`var_map`**: The "Noise." Areas with high movement or artifacts will show up as bright spots here. Itâ€™s a great diagnostic tool to see if a run was low quality.
* **`t_map`**: The "Raw Ratio." This is simply . Itâ€™s the traditional way of looking at fMRI stats before converting to Z-scores.
* **`z_map`**: The "Final Word." This is the map you use for your **figures**. It scales the T-map so that you can use a standard threshold (like 3.1 for ).



In [7]:
# ==============================================================================
# THE MASTER DRIVER (Full 5-Map Industrial Version)
# ==============================================================================
SUB = "02"             


data_runs = get_motif_files(SUB) 

print(f"ðŸš€ Starting Industrial Pipeline for Subject {SUB}...")
print(f"ðŸ“‚ Method: {METHOD.upper()}")

for c_name, c_expr in CONTRASTS.items():
    print(f"\n" + "="*60)
    print(f"PROCESSING CONTRAST: {c_name}")
    print("="*60)

    # PATH: results_V01 / sub-01 / log_method / contrast_name
    SUB_METHOD_DIR = BASE_RESULTS_DIR / f"sub-{SUB}" / f"{METHOD}_method" / c_name
    SUB_METHOD_DIR.mkdir(parents=True, exist_ok=True)
    
    view_key = CONTRAST_VIEWS.get(c_name, "hand_knob")
    coords = ROI_COORDS[view_key]
    run_stats_list = []
    
    # --- STEP A: PROCESS INDIVIDUAL RUNS ---
    for run_data in data_runs:
        run_id = run_data['run']
        run_dir = SUB_METHOD_DIR / f"run-{run_id}_dir-{run_data['dir']}"
        run_dir.mkdir(exist_ok=True)
        
  
        if METHOD == "sequence":
            # The original theoretical method
            ev = build_events_sequence(int(run_id))
        else:
            # THE INDUSTRIAL LOG METHOD
            # We define the specific paths for this subject and run
            tsv_files = list(EVENTS_DIR.glob(f"*sub-{int(SUB)}_run-{int(run_id)}*_events.tsv"))
            log_files = list(LOGS_DIR.glob(f"*sub-{int(SUB)}_run-{int(run_id)}*.csv"))
            
            ev = build_events_tsv(tsv_files[0], log_files[0])
            

        # Build Design Matrix & Fit
        dm = build_the_design_matrix(nib.load(str(run_data['bold'])), ev, run_data['conf'])
        
        print(f"  > [Run {run_id}] Fitting GLM...")
        stats = fit_run_and_save_nifti(run_data['bold'], run_data['mask'], dm, run_dir, c_name, c_expr,SMOOTHING_FWHM)
        run_stats_list.append(stats)
        
        # QC Plot for the individual run
        save_brain_viz(stats['z_score'], run_dir / f"{c_name}_run_viz.png", 
                       title=f"Run {run_id} QC: {c_name}", coords=coords, threshold=2.0)

    # --- STEP B: FUSE & CHECKPOINT ---
    if not run_stats_list: 
        continue

    print(f"  > [Subject Fusion] Combining {len(run_stats_list)} runs...")
    total_dir = SUB_METHOD_DIR / "combined_total"
    
    results = subject_fusion_and_save_data(run_stats_list, data_runs[0]['mask'], total_dir, c_name)

    # --- STEP C: THE 5 DIAGNOSTIC MAPS ---
    print(f"  > Generating 5 Diagnostic Maps...")

 # 1. BETA (Raw Effect Size - No threshold)
    save_brain_viz(results['beta'], total_dir / "TOTAL_01_beta.png", 
                   f"Beta (Effect): {c_name}", coords=coords, threshold=0)

    # 2. VARIANCE (Noise Distribution - No threshold)
    save_brain_viz(results['variance'], total_dir / "TOTAL_02_variance.png", 
                   f"Var (Noise): {c_name}", coords=coords)

    save_brain_viz(results['t_stat'], total_dir / "TOTAL_03_tstat.png", 
                   f"T-Stat (>2.0): {c_name}", coords=coords)

    # 4. Z-MAP UNCORRECTED (The "Standard" 3.1)
    save_brain_viz(results['z_uncorrected'], total_dir / "TOTAL_04_zmap_un.png", 
                   f"Z (Uncorr >3.1): {c_name}", coords=coords)

    # 5. Z-MAP FDR (The "Scientific Truth" - Dynamic Extracted Threshold)
    # We use the threshold extracted by threshold_stats_img inside your fusion function
    fdr_val = results['fdr_threshold']
    save_brain_viz(results['z_fdr'], total_dir / "TOTAL_05_zmap_FDR.png", 
                   f"Z (FDR q<0.05, Z > {fdr_val:.2f}): {c_name}", 
                   coords=coords, threshold=fdr_val)

    del results # Memory safety

print("\n" + "="*60)
print(f"âœ… PIPELINE COMPLETE: Sub-{SUB} / {METHOD}_method")

ðŸš€ Starting Industrial Pipeline for Subject 02...
ðŸ“‚ Method: EVENTS

PROCESSING CONTRAST: task_gt_baseline
[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 01] Fitting GLM...


  model = FirstLevelModel(


[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 03] Fitting GLM...


  model = FirstLevelModel(


[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 02] Fitting GLM...


  model = FirstLevelModel(


  > [Subject Fusion] Combining 3 runs...


  z_fdr, fdr_thresh = threshold_stats_img(z_map, alpha=0.05, height_control='fdr')


[Statistics] FDR Threshold for task_gt_baseline: Z > inf
  > Generating 5 Diagnostic Maps...


  display = plot_stat_map(



PROCESSING CONTRAST: hand_vs_foot
[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 01] Fitting GLM...


  model = FirstLevelModel(


[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 03] Fitting GLM...


  model = FirstLevelModel(


[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 02] Fitting GLM...


  model = FirstLevelModel(


  > [Subject Fusion] Combining 3 runs...
[Statistics] FDR Threshold for hand_vs_foot: Z > 3.49
  > Generating 5 Diagnostic Maps...

PROCESSING CONTRAST: global_right_vs_left
[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 01] Fitting GLM...


  model = FirstLevelModel(


[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 03] Fitting GLM...


  model = FirstLevelModel(


[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 02] Fitting GLM...


  model = FirstLevelModel(


  > [Subject Fusion] Combining 3 runs...


  z_fdr, fdr_thresh = threshold_stats_img(z_map, alpha=0.05, height_control='fdr')


[Statistics] FDR Threshold for global_right_vs_left: Z > inf
  > Generating 5 Diagnostic Maps...


  display = plot_stat_map(
  model = FirstLevelModel(



PROCESSING CONTRAST: right_vs_left_hand
[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 01] Fitting GLM...
[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 03] Fitting GLM...


  model = FirstLevelModel(


[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 02] Fitting GLM...


  model = FirstLevelModel(


  > [Subject Fusion] Combining 3 runs...


  z_fdr, fdr_thresh = threshold_stats_img(z_map, alpha=0.05, height_control='fdr')


[Statistics] FDR Threshold for right_vs_left_hand: Z > inf
  > Generating 5 Diagnostic Maps...


  display = plot_stat_map(



PROCESSING CONTRAST: right_vs_left_foot
[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 01] Fitting GLM...


  model = FirstLevelModel(


[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 03] Fitting GLM...


  model = FirstLevelModel(


[make_first_level_design_matrix] A 'modulation' column was found in the given events data and is used.
  > [Run 02] Fitting GLM...


  model = FirstLevelModel(


  > [Subject Fusion] Combining 3 runs...


  z_fdr, fdr_thresh = threshold_stats_img(z_map, alpha=0.05, height_control='fdr')


[Statistics] FDR Threshold for right_vs_left_foot: Z > inf
  > Generating 5 Diagnostic Maps...

âœ… PIPELINE COMPLETE: Sub-02 / events_method


  display = plot_stat_map(
