<a href="https://colab.research.google.com/github/saamehsanaaee/MontbretiaCabinet-ISP/blob/main/GLM_fMRI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Data Import

In [None]:
# @title Install dependencies
!pip install nilearn --quiet

import os
import re
import tarfile
import requests
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
import matplotlib.pyplot as plt

#print(np.__version__)
#print(pd.__version__)

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.5/10.5 MB[0m [31m36.5 MB/s[0m eta [36m0:00:00[0m
[?25h

# Parameter and Data download

In [None]:
# parameter setting
N_SUBJECTS = 100
N_PARCELS = 360 # Data aggregated into ROIs from Glasser parcellation
TR = 0.72  # Time resolution, in seconds
HEMIS = ["Right", "Left"]
RUNS   = ["LR","RL"]
N_RUNS = 2

EXPERIMENTS = {
    'MOTOR'      : {'cond':['lf','rf','lh','rh','t','cue']},
    'WM'         : {'cond':['0bk_body','0bk_faces','0bk_places','0bk_tools',
                            '2bk_body','2bk_faces','2bk_places','2bk_tools']},
    'EMOTION'    : {'cond':['fear','neut']},
    'GAMBLING'   : {'cond':['loss','win']},
    'LANGUAGE'   : {'cond':['math','story']},
    'RELATIONAL' : {'cond':['match','relation']},
    'SOCIAL'     : {'cond':['ment','rnd']}
}

TargetExperiments = ['WM', 'EMOTION', 'LANGUAGE']

TargetConditions  = ["0bk_body", "0bk_faces", "0bk_places", "0bk_tools",
                     "2bk_body", "2bk_faces", "2bk_places", "2bk_tools",
                     "fear"    , "neut"     , "math"      , "story"    ]

# Data download
fname = "hcp_task.tgz"
url = "https://osf.io/2y3fw/download"

if not os.path.isfile(fname):
  try:
    r = requests.get(url)
  except requests.ConnectionError:
    print("Download FAILED: Connection Error!")
  else:
    if r.status_code != requests.codes.ok:
      print("Download FAILED!")
    else:
      with open(fname, "wb") as fid:
        fid.write(r.content)


HCP_DIR = "./hcp_task"

with tarfile.open(fname) as tfile:
  tfile.extractall('.')

SubjectIDs = np.loadtxt(os.path.join(HCP_DIR, 'subjects_list.txt'), dtype='str')
SubjectIDs = list(SubjectIDs)

# Regions and parcels

In [None]:
### doc about the regions file will be inserted (look at regions.npy)

regions = np.load(f"{HCP_DIR}/regions.npy").T

region_info = dict(
    name=regions[0].tolist(),
    network=regions[1],
    hemi=['Right']*int(N_PARCELS/2) + ['Left']*int(N_PARCELS/2),
)

### code will be updated

# Helper functions (e.g. load time series and evs)


In [None]:
# load time series
def load_single_timeseries(subject, experiment, run, remove_mean=True):
    """Load timeseries data for a single subject and single run.

    Arguments:
        subject (str):      subject ID to load
        experiment (str):   Name of experiment
        run (int):          (0 or 1)
        remove_mean (bool): If True, subtract the parcel-wise mean
                            (typically the mean BOLD signal is not of interest)

    Returns
        ts (n_parcel x n_timepoint array): Array of BOLD data values

    """
    bold_run  = RUNS[run]
    bold_path = f"{HCP_DIR}/subjects/{subject}/{experiment}/tfMRI_{experiment}_{bold_run}"
    bold_file = "data.npy"
    ts_path   = f"{bold_path}/{bold_file}"

    if not os.path.exists(ts_path):
        raise FileNotFoundError(f"Timeseries file not found: {ts_path}")
    ts = np.load(ts_path)

    if remove_mean:
        ts = ts - ts.mean(axis=1, keepdims=True)
    return ts

In [None]:
# load evs
def load_evs(subject, experiment, run):
    """Load EVs (explanatory variables) data for one task experiment.

    Arguments:
        subject (str): subject ID to load
        experiment (str): Name of experiment
        run (int): 0 or 1

    Returns:
        evs (list of lists): A list of frames associated with each condition

    """
    frames_list = []
    task_key = f"tfMRI_{experiment}_{RUNS[run]}"
    for cond in EXPERIMENTS[experiment]["cond"]:
        ev_file  = f"{HCP_DIR}/subjects/{subject}/{experiment}/{task_key}/EVs/{cond}.txt"
        ev_array = np.loadtxt(ev_file, ndmin=2, unpack=True)
        ev       = dict(zip(["onset", "duration", "amplitude"], ev_array))

        # Determine when trial starts, rounded down
        start = np.floor(ev["onset"] / TR).astype(int)
        # Use trial duration to determine how many frames to include for trial
        duration = np.ceil(ev["duration"] / TR).astype(int)
        # Take the range of frames that correspond to this specific trial
        frames = [s + np.arange(0, d) for s, d in zip(start, duration)]
        frames_list.append(frames)

    return frames_list


def load_evs_as_dict(subject, experiment, run):
    """Load EVs (explanatory variables) data for one task experiment.

    Arguments:
        subject (str): subject ID to load
        experiment (str): Name of experiment
        run (int): 0 or 1

    Returns:
        evs (dict): A dictionary of the data associated with each condition

    """
    evs = {}
    task_key = f"tfMRI_{experiment}_{RUNS[run]}"

    for cond  in EXPERIMENTS[experiment]["cond"]:
        ev_file = f"{HCP_DIR}/subjects/{subject}/{experiment}/{task_key}/EVs/{cond}.txt"
        if not os.path.exists(ev_file):
            raise FileNotFoundError(f"EV file not found: {ev_file}")
        ev_array  = np.loadtxt(ev_file, ndmin=2, unpack=True)
        evs[cond] = dict(zip(["onset", "duration", "amplitude"], ev_array))

    return evs

In [None]:
# create data frame
def create_dataframe(subject, experiment):
    """
    Creates a dataframe that contains the parcel-based
    BOLD signals from a subject for each condition.

    Arguments:
        subject (str): subject ID to load
        experiment (str): Name of experiment

    Returns:
        A dataframe of parcel-based BOLD data
        for one subject and one experiment

    """
    all_data = []

    for run in range(2): # Run can be 0 (LR) or 1 (RL)
        try:
            ts  = load_single_timeseries(subject, experiment, run)
            evs = load_evs_as_dict(subject, experiment, run)
        except FileNotFoundError as e:
            print(e)
            continue

        n_parcels, n_timepoints = ts.shape

        for condition, ev_data in evs.items():
            onset_times = ev_data["onset"]
            durations   = ev_data["duration"]
            amplitudes  = ev_data["amplitude"]

            for onset, duration, amplitude in zip(onset_times, durations, amplitudes):
                start_frame = int(onset / TR)
                end_frame   = start_frame + int(duration / TR)

                for time_point in range(start_frame, end_frame):
                    if time_point < n_timepoints: # Ensure it is within bounds
                        row = {
                            "sunject"      : subject   ,
                            "experiment"   : experiment,
                            "run"          : RUNS[run] ,
                            "condition"    : condition ,
                            "timepoint"    : time_point,
                            "EV_onset"     : onset     ,
                            "EV_duration"  : duration  ,
                            "EV_amplitude" : amplitude
                        }
                        # Add BOLD signal data for all parcels
                        row.update({f"parcel_{i}": ts[i, time_point] for i in range(n_parcels)})
                        all_data.append(row)

    df = pd.DataFrame(all_data)
    return df

In [None]:
def extract_stats(subject, experiment):
    """Aggregates all data for a subject into
    a dictionary that can be used along with
    "gather_all_subjects_stats()" to create
    the final dataframe.

    Arguments:
        subjects    (list of str): list of SubjectIDs
        experiments (list of str): list of TargetExperiments

    Returns:
        A dictionary of all the data points
        for a subject's specific experiment.


    """
    stats_dict = {"subject": subject}
    task_path  = f"{HCP_DIR}/subjects/{subject}/{experiment}/tfMRI_{experiment}_LR/EVs"

    if os.path.exists(task_path):
        for filename in os.listdir(task_path):
            if filename == "Stats.txt":
                filepath = os.path.join(task_path, filename)
                with open(filepath, "r") as file:
                    lines = file.readlines()
                    for line in lines:
                        match = re.match(r"([\w\s-]+): ([\d.]+)", line.strip())
                        if match:
                            key, value = match.groups()
                            stats_dict[f"{experiment}_{key.strip().replace(" ", "_")}"] = float(value)

    return stats_dict

In [None]:
def gather_all_subjects_stats(subjects, experiments):
    """Creates a dataframe containing all data from
    all subjects which stores the parcel-based BOLD signals.

    Arguments:
        subjects    (list of str): list of SubjectIDs
        experiments (list of str): list of TargetExperiments

    Returns:
        A dataframe of parcel-based BOLD data
        for all subjects and all experiments

    """
    all_stats = []

    for subject in subjects:
        subject_stats = {"subject": subject}
        for experiment in experiments:
            stats = extract_stats(subject, experiment)
            subject_stats.update(stats)

            # Get the dimensions of DataFrame for this subject and experiment
            try:
                df = create_dataframe(subject, experiment)
                subject_stats[f"{experiment}_num_rows"] = df.shape[0]
                subject_stats[f"{experiment}_num_cols"] = df.shape[1]
            except FileNotFoundError:
                subject_stats[f"{experiment}_num_rows"] = None
                subject_stats[f"{experiment}_num_cols"] = None

        all_stats.append(subject_stats)

    stats_df = pd.DataFrame(all_stats)
    return stats_df

In [None]:
def save_stats_to_csv(df, filename):
    """Saves the input dataframe as a csv in working directory.

    Arguments:
        df (dataframe)
        filename (str)
    """
    df.to_csv(filename, index=False)

# Functions for the matrix construction and modeling  (TO BE ADDED!!!)

# Create dataframes

In [None]:
# create grand dataframe for all subjects

# Preprocessing

In [None]:
# preprocessing

#Regressors


In [None]:
#@title create regressor
import nibabel as nib
from nilearn.glm.first_level import make_first_level_design_matrix
from nilearn.plotting import plot_design_matrix
from nilearn.signal import clean



# decide event or task based regressor (to confirm the delay in fMRI doesn't affect results)
# for event, check ev file

Mounted at /content/drive


In [None]:
# design matrix
"""
Examples of design matrices
===========================

Three examples of design matrices specification and computation for first-level
:term:`fMRI` data analysis (event-related design, block design,
:term:`FIR` design).

This examples requires matplotlib.

"""

# %%
# Define parameters
# -----------------
# At first, we define parameters related to the images acquisition.

t_r = 0.72
n_scans = 111

print(f"repetition time is {t_r} second")
print(f"the acquisition comprises {n_scans} scans")

frame_times = (
    np.arange(n_scans) * t_r
)  # here are the corresponding frame times

# %%
# Then we define parameters related to the experimental design.

# these are the types of the different trials
conditions = EXPERIMENTS.get('WM', {}).get('cond', [])
duration = [0.1, 0.0, 0.1, 0.1, 0.0, 0.1, 0.1, 0.0, 0.1]
# these are the corresponding onset times
onsets = [30.0, 70.0, 100.0, 10.0, 30.0, 90.0, 30.0, 40.0, 60.0]
# Next, we simulate 6 motion parameters jointly observed with fMRI acquisitions
rng = np.random.default_rng(42)
motion = np.cumsum(rng.standard_normal((n_scans, 6)), 0)
# The 6 parameters correspond to three translations and three
# rotations describing rigid body motion
add_reg_names = ["tx", "ty", "tz", "rx", "ry", "rz"]

# %%
# Create design matrices
# ----------------------
# The same parameters allow us to obtain a variety of design matrices.
# We first create an events object.
import pandas as pd

events = pd.DataFrame(
    {"trial_type": conditions, "onset": onsets, "duration": duration}
)

# %%
# We sample the events into a design matrix,
# also including additional regressors.
from nilearn.glm.first_level import make_first_level_design_matrix

hrf_model = "glover"
X1 = make_first_level_design_matrix(
    frame_times,
    events,
    drift_model="polynomial",
    drift_order=3,
    add_regs=motion,
    add_reg_names=add_reg_names,
    hrf_model=hrf_model,
)

# %%
# Now we compute a block design matrix. We add duration to create the blocks.
# For this we first define an event structure that includes the duration
# parameter.

duration = 7.0 * np.ones(len(conditions))
events = pd.DataFrame(
    {"trial_type": conditions, "onset": onsets, "duration": duration}
)

# %%
# Then we sample the design matrix.

X2 = make_first_level_design_matrix(
    frame_times,
    events,
    drift_model="polynomial",
    drift_order=3,
    hrf_model=hrf_model,
)

# %%
# Finally we compute a :term:`FIR` model

events = pd.DataFrame(
    {"trial_type": conditions, "onset": onsets, "duration": duration}
)
hrf_model = "FIR"
X3 = make_first_level_design_matrix(
    frame_times,
    events,
    hrf_model="fir",
    drift_model="polynomial",
    drift_order=3,
    fir_delays=np.arange(1, 6),
)

# %%
# Here are the three designs side by side.
#
# .. note::
#
#     The events with a duration of 0 seconds are be modeled
#     using a 'delta function' in the event-related design matrix.
#
import matplotlib.pyplot as plt

fig, (ax1, ax2, ax3) = plt.subplots(
    figsize=(10, 6), nrows=1, ncols=3, constrained_layout=True
)

plot_design_matrix(X1, axes=ax1)
ax1.set_title("Event-related design matrix", fontsize=12)
plot_design_matrix(X2, axes=ax2)
ax2.set_title("Block design matrix", fontsize=12)
plot_design_matrix(X3, axes=ax3)
ax3.set_title("FIR design matrix", fontsize=12)
plt.show()


# %%
# Correlation between regressors
# ------------------------------
# We can plot the correlation between the regressors of our design matrix.
# This is important to check as highly correlated regressors can affect
# the effficieny of
# `your design <https://imaging.mrc-cbu.cam.ac.uk/imaging/DesignEfficiency>`_.
#
from nilearn.plotting import plot_design_matrix_correlation

fig, (ax1, ax2, ax3) = plt.subplots(
    figsize=(16, 5), nrows=1, ncols=3, constrained_layout=True
)

plot_design_matrix_correlation(X1, axes=ax1)
ax1.set_title("Event-related correlation matrix", fontsize=12)
plot_design_matrix_correlation(X2, axes=ax2)
ax2.set_title("Block correlation matrix", fontsize=12)
plot_design_matrix_correlation(X3, axes=ax3, tri="diag")
ax3.set_title("FIR correlation matrix", fontsize=12)
plt.show()


# %%
# Parametric modulation
# ---------------------
# By default, the fMRI GLM will expect that all events
# for a given condition have a BOLD
# response with the same amplitude.
# Sometimes, we may have specific expectations
# about how strong the BOLD response
# will be on a given event.
# This can be incorporated into the model by using **parametric modulation**,
# wherein each event has a predicted amplitude.
# This can be used both to improve model fit and to test hypotheses regarding
# how the BOLD response scales with important features of events,
# such as trial intensity or response time.
#
# Here we will assume that when a trial
# is the same condition as the previous one,
# it will elicit a less intense response.

conditions = ["c0", "c0", "c0", "c1", "c1", "c1", "c3", "c3", "c3"]
modulation = [1.0, 0.5, 0.25, 1.0, 0.5, 0.25, 1.0, 0.5, 0.25]
modulated_events = pd.DataFrame(
    {
        "trial_type": conditions,
        "onset": onsets,
        "duration": duration,
        "modulation": modulation,
    }
)

hrf_model = "glover"
X4 = make_first_level_design_matrix(
    frame_times,
    modulated_events,
    drift_model="polynomial",
    drift_order=3,
    hrf_model=hrf_model,
)

# Let's compare it to the unmodulated block design
fig, (ax1, ax2) = plt.subplots(
    figsize=(10, 6), nrows=1, ncols=2, constrained_layout=True
)

plot_design_matrix(X2, axes=ax1)
ax1.set_title("Block design matrix", fontsize=12)
plot_design_matrix(X4, axes=ax2)
ax2.set_title("Modulated block design matrix", fontsize=12)
plt.show()

In [None]:
# fit GLM

# input = design matrix & voxel-wised time-series
# output = coef (beta)

In [None]:
# contrast analysis
# set contrast for experimental vs control
# 2-back as 0.5; 0-back as -0.5

# understand whether activation in specific regions is above baseline

In [None]:
# activation flow modeling
# (decide later) whether to mute unrelated parcels --> let's try not muting first

# 2s as bin

# ?? parametric modulation
# ?? analyze temporal changes