-
Notifications
You must be signed in to change notification settings - Fork 15
/
misc.py
223 lines (159 loc) · 6.59 KB
/
misc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""Miscellaneous utilities."""
from __future__ import annotations
import os
import nibabel as nb
def check_deps(workflow):
"""Make sure dependencies are present in this system."""
from nipype.utils.filemanip import which
return sorted(
(node.interface.__class__.__name__, node.interface._cmd)
for node in workflow._get_all_nodes()
if (hasattr(node.interface, "_cmd") and which(node.interface._cmd.split()[0]) is None)
)
def _get_series_len(asl_fname):
"""Determine the number of volumes in an image, after removing outlier volumes."""
from aslprep.niworkflows.interfaces.registration import _get_vols_to_discard
img = nb.load(asl_fname)
if len(img.shape) < 4:
return 1
skip_vols = _get_vols_to_discard(img)
return img.shape[3] - skip_vols
def get_n_volumes(fname):
"""Get the number of volumes in a niimg file."""
img = nb.load(fname)
if img.ndim == 3:
n_volumes = 0
elif img.ndim == 4:
n_volumes = img.shape[3]
else:
raise ValueError(f"Image has {img.ndim} dimensions: {fname}")
return n_volumes
def _create_mem_gb(asl_fname):
"""Estimate the memory needed for different operations, based on the size of the data."""
asl_size_gb = os.path.getsize(asl_fname) / (1024**3)
asl_tlen = nb.load(asl_fname).shape[-1]
mem_gb = {
"filesize": asl_size_gb,
"resampled": asl_size_gb * 4,
"largemem": asl_size_gb * (max(asl_tlen / 100, 1.0) + 4),
}
return asl_tlen, mem_gb
def _get_wf_name(asl_fname):
"""Derive the workflow name for a supplied ASL file.
>>> _get_wf_name('/completely/made/up/path/sub-01_task-nback_asl.nii.gz')
'func_preproc_task_nback_wf'
>>> _get_wf_name('/completely/made/up/path/sub-01_task-nback_run-01_echo-1_asl.nii.gz')
'func_preproc_task_nback_run_01_echo_1_wf'
"""
from nipype.utils.filemanip import split_filename
fname = split_filename(asl_fname)[1]
fname_nosub = "_".join(fname.split("_")[1:])
name = "asl_preproc_" + fname_nosub.replace(".", "_").replace(" ", "").replace(
"-", "_"
).replace("_asl", "_wf")
return name
def _split_spec(in_target):
"""Split space-resolution specification into space, template, and remaining info."""
space, spec = in_target
template = space.split(":")[0]
return space, template, spec
def _select_template(template):
"""Select template file based on space/template specification."""
from aslprep.niworkflows.utils.misc import get_template_specs
template, specs = template
template = template.split(":")[0] # Drop any cohort modifier if present
specs = specs.copy()
specs["suffix"] = specs.get("suffix", "T1w")
# Sanitize resolution
res = specs.pop("res", None) or specs.pop("resolution", None) or "native"
if res != "native":
specs["resolution"] = res
return get_template_specs(template, template_spec=specs)[0]
# Map nonstandard resolutions to existing resolutions
specs["resolution"] = 2
try:
out = get_template_specs(template, template_spec=specs)
except RuntimeError:
specs["resolution"] = 1
out = get_template_specs(template, template_spec=specs)
return out[0]
def _aslist(in_value):
"""Convert input to a list.
TODO: Replace with listify from something like NiMARE.
"""
if isinstance(in_value, list):
return in_value
return [in_value]
def _is_native(in_value):
"""Determine if input dictionary references native-space data."""
return in_value.get("resolution") == "native" or in_value.get("res") == "native"
def _select_last_in_list(lst):
"""Select the last element in a list."""
return lst[-1]
def _conditional_downsampling(in_file, in_mask, zoom_th=4.0):
"""Downsample the input dataset for sloppy mode."""
from pathlib import Path
import nibabel as nb
import nitransforms as nt
import numpy as np
from scipy.ndimage.filters import gaussian_filter
img = nb.load(in_file)
zooms = np.array(img.header.get_zooms()[:3])
if not np.any(zooms < zoom_th):
return in_file, in_mask
out_file = Path("desc-resampled_input.nii.gz").absolute()
out_mask = Path("desc-resampled_mask.nii.gz").absolute()
shape = np.array(img.shape[:3])
scaling = zoom_th / zooms
newrot = np.diag(scaling).dot(img.affine[:3, :3])
newshape = np.ceil(shape / scaling).astype(int)
old_center = img.affine.dot(np.hstack((0.5 * (shape - 1), 1.0)))[:3]
offset = old_center - newrot.dot((newshape - 1) * 0.5)
newaffine = nb.affines.from_matvec(newrot, offset)
newref = nb.Nifti1Image(np.zeros(newshape, dtype=np.uint8), newaffine)
nt.Affine(reference=newref).apply(img).to_filename(out_file)
mask = nb.load(in_mask)
mask.set_data_dtype(float)
mdata = gaussian_filter(mask.get_fdata(dtype=float), scaling)
floatmask = nb.Nifti1Image(mdata, mask.affine, mask.header)
newmask = nt.Affine(reference=newref).apply(floatmask)
hdr = newmask.header.copy()
hdr.set_data_dtype(np.uint8)
newmaskdata = (newmask.get_fdata(dtype=float) > 0.5).astype(np.uint8)
nb.Nifti1Image(newmaskdata, newmask.affine, hdr).to_filename(out_mask)
return str(out_file), str(out_mask)
def select_target(subject_id, space):
"""Get the target subject ID, given a source subject ID and a target space."""
return subject_id if space == "fsnative" else space
def _select_first_in_list(lst):
"""Select the first element in a list.
If the input isn't a list, then it will return the whole input.
"""
return lst[0] if isinstance(lst, list) else lst
def _itk2lta(in_file, src_file, dst_file):
"""Convert ITK file to LTA file."""
from pathlib import Path
import nitransforms as nt
out_file = Path("out.lta").absolute()
lta_object = nt.linear.load(
in_file,
fmt="fs" if in_file.endswith(".lta") else "itk",
reference=src_file,
)
lta_object.to_filename(out_file, moving=dst_file, fmt="fs")
return str(out_file)
def _prefix(subid):
"""Add sub- prefix to subject ID, if necessary."""
return subid if subid.startswith("sub-") else f"sub-{subid}"
def readjson(jsonfile):
"""Read a JSON file into memory."""
import json
with open(jsonfile, "r") as f:
data = json.load(f)
return data
def get_template_str(template, kwargs):
"""Get template from templateflow, as a string."""
from templateflow.api import get as get_template
return str(get_template(template, **kwargs))