Skip to content

Commit

Permalink
MAINT: Refactor the workflow to use Nipype iterables
Browse files Browse the repository at this point in the history
This move will make it easier to integrate SDC, and in particular,
the SDC scheduling @mattcieslak was preparing.
  • Loading branch information
oesteban committed Apr 20, 2020
1 parent 9d95e73 commit 630b715
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 157 deletions.
4 changes: 4 additions & 0 deletions dmriprep/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ class nipype(_Config):
"""Number of processes (compute tasks) that can be run in parallel (multiprocessing only)."""
omp_nthreads = os.cpu_count()
"""Number of CPUs a single process can access for multithreaded execution."""
parameterize_dirs = False
"""The node’s output directory will contain full parameterization of any iterable, otherwise
parameterizations over 32 characters will be replaced by their hash."""
plugin = 'MultiProc'
"""NiPype's execution plugin."""
plugin_args = {
Expand Down Expand Up @@ -302,6 +305,7 @@ def init(cls):
'crashfile_format': cls.crashfile_format,
'get_linked_libs': cls.get_linked_libs,
'stop_on_first_crash': cls.stop_on_first_crash,
'parameterize_dirs': cls.parameterize_dirs,
}
})

Expand Down
151 changes: 75 additions & 76 deletions dmriprep/workflows/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from ..interfaces import DerivativesDataSink, BIDSDataGrabber
from ..interfaces.reports import SubjectSummary, AboutSummary
from ..utils.bids import collect_data
from .dwi import init_dwi_preproc_wf
from .dwi.base import init_early_b0ref_wf


def init_dmriprep_wf():
Expand All @@ -40,42 +40,42 @@ def init_dmriprep_wf():
wf = init_dmriprep_wf()
"""
dmriprep_wf = Workflow(name='dmriprep_wf')
dmriprep_wf = Workflow(name="dmriprep_wf")
dmriprep_wf.base_dir = config.execution.work_dir

freesurfer = config.workflow.run_reconall
if freesurfer:
fsdir = pe.Node(
BIDSFreeSurferDir(
derivatives=config.execution.output_dir,
freesurfer_home=os.getenv('FREESURFER_HOME'),
freesurfer_home=os.getenv("FREESURFER_HOME"),
spaces=config.workflow.spaces.get_fs_spaces()),
name='fsdir_run_%s' % config.execution.run_uuid.replace('-', '_'),
name=f"fsdir_run_{config.execution.run_uuid.replace('-', '_')}",
run_without_submitting=True)
if config.execution.fs_subjects_dir is not None:
fsdir.inputs.subjects_dir = str(config.execution.fs_subjects_dir.absolute())

for subject_id in config.execution.participant_label:
single_subject_wf = init_single_subject_wf(subject_id)

single_subject_wf.config['execution']['crashdump_dir'] = str(
config.execution.output_dir / "dmriprep" / "-".join(("sub", subject_id))
single_subject_wf.config["execution"]["crashdump_dir"] = str(
config.execution.output_dir / "dmriprep" / f"sub-{subject_id}"
/ "log" / config.execution.run_uuid
)

for node in single_subject_wf._get_all_nodes():
node.config = deepcopy(single_subject_wf.config)
if freesurfer:
dmriprep_wf.connect(fsdir, 'subjects_dir',
single_subject_wf, 'inputnode.subjects_dir')
dmriprep_wf.connect(fsdir, "subjects_dir",
single_subject_wf, "fsinputnode.subjects_dir")
else:
dmriprep_wf.add_nodes([single_subject_wf])

# Dump a copy of the config file into the log directory
log_dir = config.execution.output_dir / 'dmriprep' / 'sub-{}'.format(subject_id) \
/ 'log' / config.execution.run_uuid
log_dir = config.execution.output_dir / "dmriprep" / f"sub-{subject_id}" \
/ "log" / config.execution.run_uuid
log_dir.mkdir(exist_ok=True, parents=True)
config.to_filename(log_dir / 'dmriprep.toml')
config.to_filename(log_dir / "dmriprep.toml")

return dmriprep_wf

Expand All @@ -102,7 +102,7 @@ def init_single_subject_wf(subject_id):
from dmriprep.config.testing import mock_config
from dmriprep.workflows.base import init_single_subject_wf
with mock_config():
wf = init_single_subject_wf('THP0005')
wf = init_single_subject_wf("THP0005")
Parameters
----------
Expand All @@ -115,24 +115,24 @@ def init_single_subject_wf(subject_id):
FreeSurfer's ``$SUBJECTS_DIR``
"""
name = "single_subject_%s_wf" % subject_id
name = f"single_subject_{subject_id}_wf"
subject_data = collect_data(
config.execution.layout,
subject_id)[0]

if 'flair' in config.workflow.ignore:
subject_data['flair'] = []
if 't2w' in config.workflow.ignore:
subject_data['t2w'] = []
if "flair" in config.workflow.ignore:
subject_data["flair"] = []
if "t2w" in config.workflow.ignore:
subject_data["t2w"] = []

anat_only = config.workflow.anat_only

# Make sure we always go through these two checks
if not anat_only and not subject_data['dwi']:
if not anat_only and not subject_data["dwi"]:
raise Exception(f"No DWI data found for participant {subject_id}. "
"All workflows require DWI images.")

if not subject_data['t1w']:
if not subject_data["t1w"]:
raise Exception(f"No T1w images found for participant {subject_id}. "
"All workflows require T1w images.")

Expand Down Expand Up @@ -165,34 +165,34 @@ def init_single_subject_wf(subject_id):
"""
spaces = config.workflow.spaces
reportlets_dir = str(config.execution.work_dir / 'reportlets')
reportlets_dir = str(config.execution.work_dir / "reportlets")

inputnode = pe.Node(niu.IdentityInterface(fields=['subjects_dir']),
name='inputnode')
fsinputnode = pe.Node(niu.IdentityInterface(fields=["subjects_dir"]),
name="fsinputnode")

bidssrc = pe.Node(BIDSDataGrabber(subject_data=subject_data, anat_only=anat_only),
name='bidssrc')
name="bidssrc")

bids_info = pe.Node(BIDSInfo(
bids_dir=config.execution.bids_dir, bids_validate=False), name='bids_info')
bids_dir=config.execution.bids_dir, bids_validate=False), name="bids_info")

summary = pe.Node(SubjectSummary(std_spaces=spaces.get_spaces(nonstandard=False),
nstd_spaces=spaces.get_spaces(standard=False)),
name='summary', run_without_submitting=True)
name="summary", run_without_submitting=True)

about = pe.Node(AboutSummary(version=config.environment.version,
command=' '.join(sys.argv)),
name='about', run_without_submitting=True)
command=" ".join(sys.argv)),
name="about", run_without_submitting=True)

ds_report_summary = pe.Node(
DerivativesDataSink(base_directory=reportlets_dir,
desc='summary', keep_dtype=True),
name='ds_report_summary', run_without_submitting=True)
desc="summary", keep_dtype=True),
name="ds_report_summary", run_without_submitting=True)

ds_report_about = pe.Node(
DerivativesDataSink(base_directory=reportlets_dir,
desc='about', keep_dtype=True),
name='ds_report_about', run_without_submitting=True)
desc="about", keep_dtype=True),
name="ds_report_about", run_without_submitting=True)

# Preprocessing of T1w (includes registration to MNI)
anat_preproc_wf = init_anat_preproc_wf(
Expand All @@ -205,77 +205,76 @@ def init_single_subject_wf(subject_id):
output_dir=str(config.execution.output_dir),
reportlets_dir=reportlets_dir,
skull_strip_fixed_seed=config.workflow.skull_strip_fixed_seed,
skull_strip_mode='force',
skull_strip_mode="force",
skull_strip_template=Reference.from_string(
config.workflow.skull_strip_template)[0],
spaces=spaces,
t1w=subject_data['t1w'],
t1w=subject_data["t1w"],
)

workflow.connect([
(inputnode, anat_preproc_wf, [('subjects_dir', 'inputnode.subjects_dir')]),
(bidssrc, bids_info, [(('t1w', fix_multi_T1w_source_name), 'in_file')]),
(inputnode, summary, [('subjects_dir', 'subjects_dir')]),
(bidssrc, summary, [('t1w', 't1w'),
('t2w', 't2w'),
('dwi', 'dwi')]),
(bids_info, summary, [('subject', 'subject_id')]),
(bids_info, anat_preproc_wf, [(('subject', _prefix), 'inputnode.subject_id')]),
(bidssrc, anat_preproc_wf, [('t1w', 'inputnode.t1w'),
('t2w', 'inputnode.t2w'),
('roi', 'inputnode.roi'),
('flair', 'inputnode.flair')]),
(bidssrc, ds_report_summary, [(('t1w', fix_multi_T1w_source_name), 'source_file')]),
(summary, ds_report_summary, [('out_report', 'in_file')]),
(bidssrc, ds_report_about, [(('t1w', fix_multi_T1w_source_name), 'source_file')]),
(about, ds_report_about, [('out_report', 'in_file')]),
(fsinputnode, anat_preproc_wf, [("subjects_dir", "inputnode.subjects_dir")]),
(bidssrc, bids_info, [(("t1w", fix_multi_T1w_source_name), "in_file")]),
(fsinputnode, summary, [("subjects_dir", "subjects_dir")]),
(bidssrc, summary, [("t1w", "t1w"),
("t2w", "t2w"),
("dwi", "dwi")]),
(bids_info, summary, [("subject", "subject_id")]),
(bids_info, anat_preproc_wf, [(("subject", _prefix), "inputnode.subject_id")]),
(bidssrc, anat_preproc_wf, [("t1w", "inputnode.t1w"),
("t2w", "inputnode.t2w"),
("roi", "inputnode.roi"),
("flair", "inputnode.flair")]),
(bidssrc, ds_report_summary, [(("t1w", fix_multi_T1w_source_name), "source_file")]),
(summary, ds_report_summary, [("out_report", "in_file")]),
(bidssrc, ds_report_about, [(("t1w", fix_multi_T1w_source_name), "source_file")]),
(about, ds_report_about, [("out_report", "in_file")]),
])

# Overwrite ``out_path_base`` of smriprep's DataSinks
for node in workflow.list_node_names():
if node.split('.')[-1].startswith('ds_'):
workflow.get_node(node).interface.out_path_base = 'dmriprep'
if node.split(".")[-1].startswith("ds_"):
workflow.get_node(node).interface.out_path_base = "dmriprep"

if anat_only:
return workflow

# Append the dMRI section to the existing anatomical excerpt
# That way we do not need to stream down the number of bold datasets
anat_preproc_wf.__postdesc__ = (anat_preproc_wf.__postdesc__ or '') + f"""
anat_preproc_wf.__postdesc__ = (anat_preproc_wf.__postdesc__ or "") + f"""
Diffusion data preprocessing
: For each of the {len(subject_data["dwi"])} dwi scans found per subject
(across all sessions), the following preprocessing was performed."""

for dwi_file in subject_data['dwi']:
dwi_preproc_wf = init_dwi_preproc_wf(dwi_file)

workflow.connect([
(anat_preproc_wf, dwi_preproc_wf,
[(('outputnode.t1w_preproc', _pop), 'inputnode.t1w_preproc'),
('outputnode.t1w_mask', 'inputnode.t1w_mask'),
('outputnode.t1w_dseg', 'inputnode.t1w_dseg'),
('outputnode.t1w_aseg', 'inputnode.t1w_aseg'),
('outputnode.t1w_aparc', 'inputnode.t1w_aparc'),
('outputnode.t1w_tpms', 'inputnode.t1w_tpms'),
('outputnode.template', 'inputnode.template'),
('outputnode.anat2std_xfm', 'inputnode.anat2std_xfm'),
('outputnode.std2anat_xfm', 'inputnode.std2anat_xfm'),
# Undefined if --fs-no-reconall, but this is safe
('outputnode.subjects_dir', 'inputnode.subjects_dir'),
('outputnode.subject_id', 'inputnode.subject_id'),
('outputnode.t1w2fsnative_xfm', 'inputnode.t1w2fsnative_xfm'),
('outputnode.fsnative2t1w_xfm', 'inputnode.fsnative2t1w_xfm')]),
layout = config.execution.layout
inputnode = pe.Node(niu.IdentityInterface(fields=["dwi_data"]),
name="inputnode")
inputnode.iterables = [(
"dwi_data", tuple([
(dwi, layout.get_bvec(dwi), layout.get_bval(dwi),
layout.get_metadata(dwi)["PhaseEncodingDirection"])
for dwi in subject_data["dwi"]
])
)]
split_info = pe.Node(niu.Function(
function=_unpack, output_names=["dwi_file", "bvec", "bval", "pedir"]),
name="split_info", run_without_submitting=True)

early_b0ref_wf = init_early_b0ref_wf()
workflow.connect([
(inputnode, split_info, [("dwi_data", "in_tuple")]),
(split_info, early_b0ref_wf, [("dwi_file", "inputnode.dwi_file"),
("bvec", "inputnode.in_bvec"),
("bval", "inputnode.in_bval")]),
])

return workflow


def _prefix(subid):
return '-'.join(('sub', subid.lstrip('sub-')))
return "-".join(("sub", subid.lstrip("sub-")))


def _pop(inlist):
if isinstance(inlist, (list, tuple)):
return inlist[0]
return inlist
def _unpack(in_tuple):
return in_tuple
9 changes: 0 additions & 9 deletions dmriprep/workflows/dwi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +0,0 @@
"""Pre-processing dMRI workflows."""

from .base import init_dwi_preproc_wf
from .util import init_dwi_reference_wf

__all__ = [
'init_dwi_preproc_wf',
'init_dwi_reference_wf',
]
Loading

0 comments on commit 630b715

Please sign in to comment.