# EEG - Flow

## 4. Fit ICA decompositions

Last edit: 21.06.2023 19:30

This steps fits 2 ICA decomposition per file. Multiple files are run in parallel, similar to step 1.

In [None]:
import multiprocessing as mp
import time
from itertools import product
from typing import List

from mne import set_log_level as set_log_level_mne

from eeg_flow import set_log_level
from eeg_flow.utils import parallel
from eeg_flow.tasks import fit_icas

In [None]:
set_log_level_mne("WARNING")
set_log_level("INFO")

The parameters of the file to process are defined below. Locks are created to prevent someone else from running the same task and from writing the same derivatives.

In [None]:
PARTICIPANTS_WITH_GROUPS: List[str] = []  # List of str "Pxx-Gy", e.g. ["P02-G2"]
TASKS: List[str]                    = []  # ["oddball"], ["UT"] or ["oddball", "UT"]
RUNS: List[int]                     = []  # [1], [2] or [1, 2]

inputs = [
    (t[0].split("-") + list(t[1:]))
    for t in product(PARTICIPANTS_WITH_GROUPS, TASKS, RUNS)
]
print(inputs)

The variable `inputs` contains is a list of list. Each sublist defines one file by its participant, group, task and run attribute. Each sublist is one set of input variable for `fit_icas` which will be picked up by a worker (process) and executed. For each execution, the created derivatives are:
- ICA decomposition used to clean-up mastoids (`_step4_1st_ica.fif`)
- ICA decomposition used to clean-up signal (`_step4_2nd_ica.fif`)
- ICLabel proposed labels `_step4_iclabel.xlsx`)

In [None]:
%%time
current_time = time.strftime("%H:%M:%S")
print("Start time", current_time)

assert len(inputs) != 0  # sanity-check
# each job will need 2 threads and mp.cpu_count() usually returns
# the number of threads (usually 2 per core), thus let's spawn 1
# process per core maximum.
n_jobs = min(len(inputs), (mp.cpu_count() // 2) - 2)
parallel(fit_icas, n_jobs, inputs)