In [45]:
import random
import mne
import timeit
from const import BASE_WORKDIR, EVOKED_FULL_FNAME, COVARIANCE_FNAME, SUBJECTS_DIR
ITERATIONS = 10
RANDOM_CHANNELS_COUNT = 10

def generate_dipoles(subj):
    mne_lib = mne
    evoked = mne.read_evokeds(f"{BASE_WORKDIR}/{subj}/{EVOKED_FULL_FNAME}", verbose="ERROR")[0]
    cov = mne.read_cov(f"{BASE_WORKDIR}/{subj}/{COVARIANCE_FNAME}", verbose="ERROR")
    bem = mne.read_bem_solution(f"{SUBJECTS_DIR}/{subj}/bem/{subj}-5120-bem-sol.fif", verbose="ERROR")
    trans_path = f"{SUBJECTS_DIR}/{subj}/bem/{subj}-trans.fif"
    chs = evoked.info['ch_names']
    evoked = evoked.copy().crop(0, None)

    # fit dipoles using full sensor array
    dip_full, res = mne.fit_dipole(evoked, cov, bem, trans_path, n_jobs=-1, rank="info", verbose="ERROR")
    dip_full.save(f"{BASE_WORKDIR}/{subj}/full-dipoles.dip", overwrite=True)

    # fit dipoles using reduced number of sensors - for the sake of variability, select random locations.
    # altough received location do not have biological meaning, we are aiming to train regressor model to replicate ECD algorithm
    for iteration in range(ITERATIONS):
        random_indices = random.sample(range(len(chs)), RANDOM_CHANNELS_COUNT)
        random_channels = np.array(chs)[random_indices]
        random_evoked = evoked.copy().pick(picks=random_channels)
        random_evoked.save(f"{BASE_WORKDIR}/{subj}/random-{iteration}-ave.fif", overwrite=True)
        dip, res = mne.fit_dipole(random_evoked, cov, bem, trans_path, n_jobs=-1, rank="info", verbose="ERROR")
        dip.save(f"{BASE_WORKDIR}/{subj}/random-{iteration}-dipoles.dip", overwrite=True)

In [None]:
import numpy as np
from util import load_subjects_list

for subj in load_subjects_list():
    print(f"-> generate dipoles for {subj}")
    generate_dipoles(subj)

In [52]:
# Measure how long does it take to dipole fit one epoch (1200+-1 time points), full case
mne_lib = mne
subj = 'sub-V1004'
evoked = mne.read_evokeds(f"{BASE_WORKDIR}/{subj}/{EVOKED_FULL_FNAME}", verbose="ERROR")[0].crop(0, None)
cov = mne.read_cov(f"{BASE_WORKDIR}/{subj}/{COVARIANCE_FNAME}", verbose="ERROR")
bem = mne.read_bem_solution(f"{SUBJECTS_DIR}/{subj}/bem/{subj}-5120-bem-sol.fif", verbose="ERROR")
trans_path = f"{SUBJECTS_DIR}/{subj}/bem/{subj}-trans.fif"

print(evoked.data.shape)
elapsed_time = timeit.timeit('mne_lib.fit_dipole(evoked, cov, bem, trans_path, n_jobs=-1, rank="info", verbose="ERROR")', globals=locals(), number=1)
print(elapsed_time)

(273, 1201)
602.5141278570081


In [56]:
# Measure how long does it take to dipole fit one epoch (1200+-1 time points), subset case
mne_lib = mne
subj = 'sub-V1007'
evoked = mne.read_evokeds(f"{BASE_WORKDIR}/{subj}/{EVOKED_FULL_FNAME}", verbose="ERROR")[0].crop(0, None)
cov = mne.read_cov(f"{BASE_WORKDIR}/{subj}/{COVARIANCE_FNAME}", verbose="ERROR")
bem = mne.read_bem_solution(f"{SUBJECTS_DIR}/{subj}/bem/{subj}-5120-bem-sol.fif", verbose="ERROR")
trans_path = f"{SUBJECTS_DIR}/{subj}/bem/{subj}-trans.fif"
chs = evoked.info['ch_names']
random_indices = random.sample(range(len(chs)), RANDOM_CHANNELS_COUNT)
random_channels = np.array(chs)[random_indices]
random_evoked = evoked.copy().pick(picks=random_channels)

print(random_evoked.data.shape)
elapsed_time = timeit.timeit('mne_lib.fit_dipole(random_evoked, cov, bem, trans_path, n_jobs=-1, rank="info", verbose="ERROR")', globals=locals(), number=1)
print(elapsed_time)

(10, 1201)
990.5913933620031
