/
workflow.py
192 lines (159 loc) · 6.89 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
"""
The workflow builder factory method.
All the checks and the construction of the workflow are done
inside this function that has pickleable inputs and output
dictionary (``retval``) to allow isolation using a
``multiprocessing.Process`` that allows aslprep to enforce
a hard-limited memory-scope.
"""
def build_workflow(config_file, retval):
"""Create the Nipype Workflow that supports the whole execution graph."""
from fmriprep.reports.core import generate_reports
from fmriprep.utils.bids import check_pipeline_version
from niworkflows.utils.bids import collect_participants
from niworkflows.utils.misc import check_valid_fs_license
from aslprep import config, data
from aslprep.utils.misc import check_deps
from aslprep.workflows.base import init_aslprep_wf
config.load(config_file)
build_log = config.loggers.workflow
version = config.environment.version
retval["return_code"] = 1
retval["workflow"] = None
banner = [f"Running ASLPrep version {version}"]
notice_path = data.load.readable("NOTICE")
if notice_path.exists():
banner[0] += "\n"
banner += [f"License NOTICE {'#' * 50}"]
banner += [f"ASLPrep {version}"]
banner += notice_path.read_text().splitlines(keepends=False)[1:]
banner += ["#" * len(banner[1])]
build_log.log(25, f"\n{' ' * 9}".join(banner))
# warn if older results exist: check for dataset_description.json in output folder
msg = check_pipeline_version(
"ASLPrep",
version,
config.execution.aslprep_dir / "dataset_description.json",
)
if msg is not None:
build_log.warning(msg)
# Please note this is the input folder's dataset_description.json
dset_desc_path = config.execution.bids_dir / "dataset_description.json"
if dset_desc_path.exists():
from hashlib import sha256
desc_content = dset_desc_path.read_bytes()
config.execution.bids_description_hash = sha256(desc_content).hexdigest()
# First check that bids_dir looks like a BIDS folder
subject_list = collect_participants(
config.execution.bids_dir, participant_label=config.execution.participant_label
)
# Called with reports only
if config.execution.reports_only:
from aslprep.data import load as load_data
build_log.log(25, "Running --reports-only on participants %s", ", ".join(subject_list))
retval["return_code"] = generate_reports(
subject_list,
config.execution.aslprep_dir,
config.execution.run_uuid,
config=load_data("reports-spec.yml"),
packagename="aslprep",
)
return retval
# Build main workflow
init_msg = [
"Building ASLPrep's workflow:",
f"BIDS dataset path: {config.execution.bids_dir}.",
f"Participant list: {subject_list}.",
f"Run identifier: {config.execution.run_uuid}.",
f"Output spaces: {config.execution.output_spaces}.",
]
if config.execution.derivatives:
init_msg += [f"Searching for derivatives: {config.execution.derivatives}."]
if config.execution.fs_subjects_dir:
init_msg += [f"Pre-run FreeSurfer's SUBJECTS_DIR: {config.execution.fs_subjects_dir}."]
build_log.log(25, f"\n{' ' * 11}* ".join(init_msg))
retval["workflow"] = init_aslprep_wf()
# Check for FS license after building the workflow
if not check_valid_fs_license():
build_log.critical(
"""\
ERROR: a valid license file is required for FreeSurfer to run. ASLPrep looked for an existing \
license file at several paths, in this order: 1) command line argument ``--fs-license-file``; \
2) ``$FS_LICENSE`` environment variable; and 3) the ``$FREESURFER_HOME/license.txt`` path. Get it \
(for free) by registering at https://surfer.nmr.mgh.harvard.edu/registration.html"""
)
retval["return_code"] = 126 # 126 == Command invoked cannot execute.
return retval
# Check workflow for missing commands
missing = check_deps(retval["workflow"])
if missing:
build_log.critical(
"Cannot run ASLPrep. Missing dependencies:%s",
"\n\t* ".join([""] + [f"{cmd} (Interface: {iface})" for iface, cmd in missing]),
)
retval["return_code"] = 127 # 127 == command not found.
return retval
config.to_filename(config_file)
build_log.info(
"ASLPrep workflow graph with %d nodes built successfully.",
len(retval["workflow"]._get_all_nodes()),
)
retval["return_code"] = 0
return retval
def build_boilerplate(config_file, workflow):
"""Write boilerplate in an isolated process."""
from aslprep import config
config.load(config_file)
logs_path = config.execution.aslprep_dir / "logs"
boilerplate = workflow.visit_desc()
citation_files = {ext: logs_path / f"CITATION.{ext}" for ext in ("bib", "tex", "md", "html")}
if boilerplate:
# To please git-annex users and also to guarantee consistency
# among different renderings of the same file, first remove any
# existing one
for citation_file in citation_files.values():
try:
citation_file.unlink()
except FileNotFoundError:
pass
citation_files["md"].write_text(boilerplate)
if not config.execution.md_only_boilerplate and citation_files["md"].exists():
from shutil import copyfile
from subprocess import CalledProcessError, TimeoutExpired, check_call
from aslprep.data import load as load_data
# Generate HTML file resolving citations
cmd = [
"pandoc",
"-s",
"--bibliography",
str(load_data("boilerplate.bib")),
"--citeproc",
"--metadata",
'pagetitle="ASLPrep citation boilerplate"',
str(citation_files["md"]),
"-o",
str(citation_files["html"]),
]
config.loggers.cli.info("Generating an HTML version of the citation boilerplate...")
try:
check_call(cmd, timeout=10)
except (FileNotFoundError, CalledProcessError, TimeoutExpired):
config.loggers.cli.warning("Could not generate CITATION.html file:\n%s", " ".join(cmd))
# Generate LaTex file resolving citations
cmd = [
"pandoc",
"-s",
"--bibliography",
str(load_data("boilerplate.bib")),
"--natbib",
str(citation_files["md"]),
"-o",
str(citation_files["tex"]),
]
config.loggers.cli.info("Generating a LaTeX version of the citation boilerplate...")
try:
check_call(cmd, timeout=10)
except (FileNotFoundError, CalledProcessError, TimeoutExpired):
config.loggers.cli.warning("Could not generate CITATION.tex file:\n%s", " ".join(cmd))
else:
copyfile(load_data("boilerplate.bib"), citation_files["bib"])