Skip to content

Commit

Permalink
RF: Update derivative cache spec, calculate per-BOLD, reuse boldref2f…
Browse files Browse the repository at this point in the history
…map (#3078)

## Changes proposed in this pull request

This PR continues work on #2207, addressing precomputed derivatives. The
IO spec was able to be simplified, as we only work with affine
transforms, and we need to use both the entities from the specific BOLD
to be corrected and the ID of the fieldmap used to correct it.
  • Loading branch information
effigies committed Aug 24, 2023
2 parents b7bf767 + ad77079 commit 9118abc
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 224 deletions.
5 changes: 4 additions & 1 deletion fmriprep/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ class environment(_Config):
class nipype(_Config):
"""Nipype settings."""

crashfile_format = "txt"
crashfile_format = "pklz"
"""The file format for crashfiles, either text or pickle."""
get_linked_libs = False
"""Run NiPype's tool to enlist linked libraries for every interface."""
Expand All @@ -314,6 +314,8 @@ class nipype(_Config):
"raise_insufficient": False,
}
"""Settings for NiPype's execution plugin."""
remove_unnecessary_outputs = False
"""Clean up unused outputs after running"""
resource_monitor = False
"""Enable resource monitor."""
stop_on_first_crash = True
Expand Down Expand Up @@ -357,6 +359,7 @@ def init(cls):
"crashdump_dir": str(execution.log_dir),
"crashfile_format": cls.crashfile_format,
"get_linked_libs": cls.get_linked_libs,
"remove_unnecessary_outputs": cls.remove_unnecessary_outputs,
"stop_on_first_crash": cls.stop_on_first_crash,
"check_version": False, # disable future telemetry
}
Expand Down
85 changes: 18 additions & 67 deletions fmriprep/data/io_spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,77 +23,28 @@
}
},
"transforms": {
"bold2boldref": {
"forward": {
"datatype": "func",
"extension": [
"h5",
"txt"
],
"from": "orig",
"to": "boldref",
"suffix": "xfm",
"mode": "image"
},
"reverse": {
"datatype": "func",
"extension": [
"h5",
"txt"
],
"from": "boldref",
"to": "orig",
"suffix": "xfm",
"mode": "image"
}
"hmc": {
"datatype": "func",
"from": "orig",
"to": "boldref",
"mode": "image",
"suffix": "xfm",
"extension": ".txt"
},
"boldref2anat": {
"forward": {
"datatype": "func",
"extension": [
"h5",
"txt"
],
"from": "boldref",
"to": "T1w",
"suffix": "xfm",
"mode": "image"
},
"reverse": {
"datatype": "func",
"extension": [
"h5",
"txt"
],
"from": "T1w",
"to": "boldref",
"suffix": "xfm",
"mode": "image"
}
"datatype": "func",
"from": "orig",
"to": "anat",
"mode": "image",
"suffix": "xfm",
"extension": ".txt"
},
"boldref2fmap": {
"forward": {
"datatype": "func",
"extension": [
"h5",
"txt"
],
"from": "boldref",
"to": "fmap",
"suffix": "xfm",
"mode": "image"
},
"reverse": {
"datatype": "func",
"extension": [
"h5",
"txt"
],
"from": "fmap",
"to": "boldref",
"suffix": "xfm",
"mode": "image"
}
"datatype": "func",
"from": "orig",
"mode": "image",
"suffix": "xfm",
"extension": ".txt"
}
}
},
Expand Down
65 changes: 49 additions & 16 deletions fmriprep/utils/bids.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,23 @@
from pathlib import Path

from bids.layout import BIDSLayout
from bids.utils import listify
from packaging.version import Version

from ..data import load
from ..data import load as load_data


def collect_derivatives(
derivatives_dir: Path,
subject_id: str,
entities: dict,
fieldmap_id: str | None,
spec: dict | None = None,
patterns: ty.List[str] | None = None,
):
"""Gather existing derivatives and compose a cache."""
if spec is None or patterns is None:
_spec, _patterns = tuple(
json.loads(load.readable("data/io_spec.json").read_text()).values()
json.loads(load_data.readable("io_spec.json").read_text()).values()
)

if spec is None:
Expand All @@ -59,23 +61,20 @@ def collect_derivatives(

# search for both boldrefs
for k, q in spec["baseline"].items():
q["subject"] = subject_id
item = layout.get(return_type='filename', **q)
query = {**q, **entities}
item = layout.get(return_type='filename', **query)
if not item:
continue
derivs_cache["%s_boldref" % k] = item[0] if len(item) == 1 else item

for xfm in spec['transforms']:
for k, q in spec['transforms'][xfm].items():
# flip the X2Y naming for opposite direction
if k == 'reverse':
xfm = '2'.join(xfm.split('2')[::-1])
q = q.copy()
q['subject'] = subject_id
item = layout.get(return_type='filename', **q)
if not item:
continue
derivs_cache[xfm] = item[0] if len(item) == 1 else item
for xfm, q in spec['transforms'].items():
query = {**q, **entities}
if xfm == "boldref2fmap":
query["to"] = fieldmap_id
item = layout.get(return_type='filename', **q)
if not item:
continue
derivs_cache[xfm] = item[0] if len(item) == 1 else item
return derivs_cache


Expand Down Expand Up @@ -291,3 +290,37 @@ def check_pipeline_version(pipeline_name, cvers, data_desc):
dvers = desc.get("PipelineDescription", {}).get("Version", "0+unknown")
if Version(cvers).public != Version(dvers).public:
return "Previous output generated by version {} found.".format(dvers)


def extract_entities(file_list):
"""
Return a dictionary of common entities given a list of files.
Examples
--------
>>> extract_entities("sub-01/anat/sub-01_T1w.nii.gz")
{'subject': '01', 'suffix': 'T1w', 'datatype': 'anat', 'extension': '.nii.gz'}
>>> extract_entities(["sub-01/anat/sub-01_T1w.nii.gz"] * 2)
{'subject': '01', 'suffix': 'T1w', 'datatype': 'anat', 'extension': '.nii.gz'}
>>> extract_entities(["sub-01/anat/sub-01_run-1_T1w.nii.gz",
... "sub-01/anat/sub-01_run-2_T1w.nii.gz"])
{'subject': '01', 'run': [1, 2], 'suffix': 'T1w', 'datatype': 'anat', 'extension': '.nii.gz'}
"""
from collections import defaultdict

from bids.layout import parse_file_entities

entities = defaultdict(list)
for e, v in [
ev_pair for f in listify(file_list) for ev_pair in parse_file_entities(f).items()
]:
entities[e].append(v)

def _unique(inlist):
inlist = sorted(set(inlist))
if len(inlist) == 1:
return inlist[0]
return inlist

return {k: _unique(v) for k, v in entities.items()}
38 changes: 23 additions & 15 deletions fmriprep/workflows/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,29 +133,21 @@ def init_single_subject_fit_wf(subject_id: str):
bids_filters=config.execution.bids_filters,
)[0]

deriv_cache = {}
anatomical_cache = {}
if config.execution.derivatives:
from smriprep.utils.bids import collect_derivatives as collect_anat_derivatives

from fmriprep.utils.bids import collect_derivatives as collect_func_derivatives

std_spaces = spaces.get_spaces(nonstandard=False, dim=(3,))
std_spaces.append("fsnative")
for deriv_dir in config.execution.derivatives:
deriv_cache.update(
anatomical_cache.update(
collect_anat_derivatives(
derivatives_dir=deriv_dir,
subject_id=subject_id,
std_spaces=std_spaces,
freesurfer=config.workflow.run_reconall,
)
)
deriv_cache.update(
collect_func_derivatives(
derivatives_dir=deriv_dir,
subject_id=subject_id,
)
)

inputnode = pe.Node(niu.IdentityInterface(fields=['subjects_dir']), name='inputnode')

Expand Down Expand Up @@ -184,7 +176,7 @@ def init_single_subject_fit_wf(subject_id: str):
skull_strip_mode=config.workflow.skull_strip_t1w,
skull_strip_template=Reference.from_string(config.workflow.skull_strip_template)[0],
spaces=spaces,
precomputed=deriv_cache,
precomputed=anatomical_cache,
omp_nthreads=config.nipype.omp_nthreads,
sloppy=config.execution.sloppy,
skull_strip_fixed_seed=config.workflow.skull_strip_fixed_seed,
Expand Down Expand Up @@ -214,7 +206,8 @@ def init_single_subject_fit_wf(subject_id: str):

from sdcflows import fieldmaps as fm

fmap_estimators = None
fmap_estimators = []
estimator_map = {}

if any(
(
Expand Down Expand Up @@ -332,7 +325,7 @@ def init_single_subject_fit_wf(subject_id: str):
if any(estimator.method == fm.EstimatorType.ANAT for estimator in fmap_estimators):
# fmt:off
workflow.connect([
(anat_preproc_wf, fmap_select_std, [
(anat_fit_wf, fmap_select_std, [
("outputnode.std2anat_xfm", "std2anat_xfm"),
("outputnode.template", "keys")]),
])
Expand Down Expand Up @@ -378,7 +371,7 @@ def init_single_subject_fit_wf(subject_id: str):

# fmt:off
workflow.connect([
(anat_preproc_wf, syn_preprocessing_wf, [
(anat_fit_wf, syn_preprocessing_wf, [
("outputnode.t1w_preproc", "inputnode.in_anat"),
("outputnode.t1w_mask", "inputnode.mask_anat"),
]),
Expand All @@ -397,9 +390,24 @@ def init_single_subject_fit_wf(subject_id: str):

for bold_file in subject_data['bold']:
fieldmap_id = estimator_map.get(listify(bold_file)[0])

functional_cache = {}
if config.execution.derivatives:
from fmriprep.utils.bids import collect_derivatives, extract_entities

entities = extract_entities(bold_file)

for deriv_dir in config.execution.derivatives:
functional_cache.update(
collect_derivatives(
derivatives_dir=deriv_dir,
entities=entities,
fieldmap_id=fieldmap_id,
)
)
func_fit_wf = init_bold_fit_wf(
bold_series=bold_file,
precomputed=deriv_cache,
precomputed=functional_cache,
fieldmap_id=fieldmap_id,
omp_nthreads=config.nipype.omp_nthreads,
)
Expand Down
Loading

0 comments on commit 9118abc

Please sign in to comment.