Skip to content

Commit

Permalink
Fix variable references.
Browse files Browse the repository at this point in the history
  • Loading branch information
FredLoney committed Jun 16, 2017
1 parent d47549f commit f5106ff
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 77 deletions.
1 change: 1 addition & 0 deletions qipipe/helpers/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re

SUBJECT_FMT = '%s%03d'
"""
Expand Down
66 changes: 40 additions & 26 deletions qipipe/pipeline/qipipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def _run_with_dicom_input(actions, *inputs, **opts):
# preceding scan workflow's base directory and garble
# the results.
base_dir_opt = opts.pop('base_dir', None)
if base_dir:
if base_dir_opt:
base_dir = os.path.abspath(base_dir_opt)
else:
base_dir = os.getcwd()
Expand Down Expand Up @@ -709,7 +709,7 @@ def _create_workflow(self, subject, session, scan, actions, **opts):
if 'stage' in actions:
stg_inputs = ['subject', 'session', 'scan', 'in_dirs', 'opts']
stg_xfc = Function(input_names=stg_inputs,
output_names=['out_files'],
output_names=['time_series', 'volume_files'],
function=stage)
stg_node = pe.Node(stg_xfc, name='stage')
# It would be preferable to pass this QIPipelineWorkflow
Expand Down Expand Up @@ -808,15 +808,23 @@ def _create_workflow(self, subject, session, scan, actions, **opts):
mdl_node and not reg_node and not self.registration_resource
)
need_scan_ts = mask_node or roi_node or is_scan_modeling
if need_scan_ts and not stg_node:
scan_ts_xfc = XNATDownload(project=self.project,
resource='NIFTI',
file=SCAN_TS_FILE)
scan_ts = pe.Node(scan_ts_xfc,
name='download_scan_time_series')
exec_wf.connect(input_spec, 'subject', scan_ts, 'subject')
exec_wf.connect(input_spec, 'session', scan_ts, 'session')
exec_wf.connect(input_spec, 'scan', scan_ts, 'scan')
if need_scan_ts:
if stg_node:
scan_ts = stg_node
else:
dl_scan_ts_xfc = XNATDownload(project=self.project,
resource='NIFTI',
file=SCAN_TS_FILE)
dl_scan_ts = pe.Node(dl_scan_ts_xfc,
name='download_scan_time_series')
exec_wf.connect(input_spec, 'subject', dl_scan_ts, 'subject')
exec_wf.connect(input_spec, 'session', dl_scan_ts, 'session')
exec_wf.connect(input_spec, 'scan', dl_scan_ts, 'scan')
# Rename the download out_file to volume_files.
scan_ts_xfc = IdentityInterface(fields=['time_series'])
scan_ts_xfc = pe.Node(scan_ts_xfc)
exec_wf.connect(dl_scan_ts, 'out_file',
scan_ts_xfc, 'time_series')

# Registration and the scan time series require a staged
# node scan with output 'images'. If staging is enabled,
Expand All @@ -825,26 +833,31 @@ def _create_workflow(self, subject, session, scan, actions, **opts):
#
# The scan time series is required by mask and scan
# registration.
staged = None
scan_volumes = None
if reg_node:
if stg_node:
staged = stg_node
scan_volumes = stg_node
else:
dl_scan_xfc = XNATDownload(project=self.project,
dl_vols_xfc = XNATDownload(project=self.project,
resource='NIFTI',
file='volume*.nii.gz')
staged = pe.Node(dl_scan_xfc, name='staged')
exec_wf.connect(input_spec, 'subject', staged, 'subject')
exec_wf.connect(input_spec, 'session', staged, 'session')
exec_wf.connect(input_spec, 'scan', staged, 'scan')
dl_vols_node = pe.Node(dl_vols_xfc, name='scan_volumes')
exec_wf.connect(input_spec, 'subject', dl_vols_node, 'subject')
exec_wf.connect(input_spec, 'session', dl_vols_node, 'session')
exec_wf.connect(input_spec, 'scan', dl_vols_node, 'scan')
# Rename the download out_file to volume_files.
scan_volumes_xfc = IdentityInterface(fields=['volume_files'])
scan_volumes = pe.Node(scan_volumes_xfc)
exec_wf.connect(dl_vols_node, 'out_files',
dl_vols_node, 'volume_files')

# Registration and modeling require a mask and bolus arrival.
if mask_node:
exec_wf.connect(input_spec, 'subject', mask_node, 'subject')
exec_wf.connect(input_spec, 'session', mask_node, 'session')
exec_wf.connect(input_spec, 'scan', mask_node, 'scan')
if hasattr(mask_node.inputs, 'time_series'):
exec_wf.connect(scan_ts, 'out_file',
exec_wf.connect(scan_ts, 'time_series',
mask_node, 'time_series')
self.logger.debug('Connected the scan time series to mask.')

Expand Down Expand Up @@ -873,7 +886,7 @@ def _create_workflow(self, subject, session, scan, actions, **opts):
function=bolus_arrival_index_or_zero)
bolus_arv_node = pe.Node(bolus_arv_xfc,
name='bolus_arrival_index')
exec_wf.connect(scan_ts, 'out_file',
exec_wf.connect(scan_ts, 'time_series',
bolus_arv_node, 'time_series')
self.logger.debug('Connected the scan time series to the bolus'
' arrival calculation.')
Expand All @@ -884,14 +897,14 @@ def _create_workflow(self, subject, session, scan, actions, **opts):
exec_wf.connect(input_spec, 'subject', roi_node, 'subject')
exec_wf.connect(input_spec, 'session', roi_node, 'session')
exec_wf.connect(input_spec, 'scan', roi_node, 'scan')
exec_wf.connect(scan_ts, 'out_file', roi_node, 'time_series')
exec_wf.connect(scan_ts, 'time_series', roi_node, 'time_series')
self.logger.debug('Connected the scan time series to ROI.')

# If registration is enabled, then register the unregistered
# staged images.
if reg_node:
# There must be staged files.
if not staged:
if not scan_volumes:
raise NotFoundError('Registration requires a scan input')
exec_wf.connect(input_spec, 'subject', reg_node, 'subject')
exec_wf.connect(input_spec, 'session', reg_node, 'session')
Expand All @@ -900,14 +913,15 @@ def _create_workflow(self, subject, session, scan, actions, **opts):
# If the registration input files were downloaded from
# XNAT, then select only the unregistered files.
if stg_node:
exec_wf.connect(staged, 'out_files', reg_node, 'in_files')
exec_wf.connect(scan_volumes, 'volume_files',
reg_node, 'in_files')
else:
exc_regd_xfc = Function(input_names=['in_files', 'exclusions'],
output_names=['out_files'],
function=exclude_files)
exclude_registered = pe.Node(exc_regd_xfc,
name='exclude_registered')
exec_wf.connect(staged, 'out_files',
exec_wf.connect(scan_volumes, 'volume_files',
exclude_registered, 'in_files')
exec_wf.connect(input_spec, 'registered',
exclude_registered, 'exclusions')
Expand Down Expand Up @@ -978,7 +992,7 @@ def _create_workflow(self, subject, session, scan, actions, **opts):
# resource option. In that case, model the scan
# input. scan_ts is always created previously if
# is_scan_modeling is true.
exec_wf.connect(scan_ts, 'out_file', mdl_node, 'time_series')
exec_wf.connect(scan_ts, 'time_series', mdl_node, 'time_series')
elif reg_node:
# merge_reg is created in the registration processing.
exec_wf.connect(merge_reg, 'out_file', mdl_node, 'time_series')
Expand Down Expand Up @@ -1119,7 +1133,7 @@ def stage(subject, session, scan, in_dirs, opts):
:param scan: the scan number
:param in_dirs: the input DICOM directories
:param opts: the :meth:`qipipe.pipeline.staging.run` keyword options
:return: the 3D volume files
:return: the :meth:`qipipe.staging.run` result
"""
from qipipe.pipeline import staging

Expand Down
31 changes: 17 additions & 14 deletions qipipe/pipeline/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
"""The FNIRT registration configuration sections."""


def run(subject, session, scan, reference, *images, **opts):
def run(subject, session, scan, reference, *in_files, **opts):
"""
Runs the registration workflow on the given session scan images.
:param subject: the subject name
:param session: the session name
:param scan: the scan number
:param reference: the initial fixed reference image
:param images: the image files to register
:param in_files: the image files to register
:param opts: the :class:`RegistrationWorkflow` initializer
and :meth:`RegistrationWorkflow.run` options
:return: the realigned image file path array
Expand All @@ -48,7 +48,7 @@ def run(subject, session, scan, reference, *images, **opts):
# Make the workflow.
reg_wf = RegistrationWorkflow(**opts)
# Run the workflow.
return reg_wf.run(subject, session, scan, reference, *images,
return reg_wf.run(subject, session, scan, reference, *in_files,
**run_opts)


Expand Down Expand Up @@ -161,7 +161,7 @@ def __init__(self, **opts):
self.workflow = self._create_realignment_workflow(**opts)
"""The registration realignment workflow."""

def run(self, subject, session, scan, reference, *images, **opts):
def run(self, subject, session, scan, reference, *in_files, **opts):
"""
Runs the registration workflow on the given session scan images.
Expand Down Expand Up @@ -307,12 +307,13 @@ def _create_execution_workflow(self, reference, dest, recursive=False):
os.makedirs(dest)

# Collect the realigned images.
collect_realigned = pe.JoinNode(IdentityInterface(fields=['in_files']),
joinsource='iter_input',
joinfield='in_files',
name='collect_realigned')
collect_realigned_xfc = IdentityInterface(fields=['realigned_files'])
collect_realigned = pe.JoinNode(
collect_realigned_xfc, joinsource='iter_input',
joinfield='realigned_files', name='collect_realigned'
)
exec_wf.connect(self.workflow, 'output_spec.out_file',
collect_realigned, 'in_files')
collect_realigned, 'realigned_files')

# Make the profile.
cr_prf_fields = ['technique', 'configuration', 'sections', 'dest']
Expand All @@ -328,7 +329,8 @@ def _create_execution_workflow(self, reference, dest, recursive=False):
# Merge the profile and registration result into one list.
concat_output = pe.Node(Merge(2), name='concat_output')
exec_wf.connect(cr_prf, 'out_file', concat_output, 'in1')
exec_wf.connect(collect_realigned, 'in_files', concat_output, 'in2')
exec_wf.connect(collect_realigned, 'realigned_files',
concat_output, 'in2')

# Upload the registration result into the XNAT registration
# resource.
Expand All @@ -339,7 +341,8 @@ def _create_execution_workflow(self, reference, dest, recursive=False):
exec_wf.connect(input_spec, 'session', upload_reg, 'session')
exec_wf.connect(input_spec, 'scan', upload_reg, 'scan')
exec_wf.connect(input_spec, 'resource', upload_reg, 'resource')
exec_wf.connect(collect_realigned, 'in_files', upload_reg, 'in_files')
exec_wf.connect(collect_realigned, 'realigned_files',
upload_reg, 'in_files')

# FIXME: copying the realigned images individually with a Copy
# submits a separate SGE job for each copy, contrary to the
Expand All @@ -363,7 +366,8 @@ def _create_execution_workflow(self, reference, dest, recursive=False):
# The execution output.
output_spec = pe.Node(IdentityInterface(fields=['out_files']),
name='output_spec')
exec_wf.connect(collect_realigned, 'out_files', output_spec, 'out_files')
exec_wf.connect(collect_realigned, 'realigned_files',
output_spec, 'out_files')

self.logger.debug("Created the %s workflow." % exec_wf.name)
# If debug is set, then diagram the workflow graph.
Expand Down Expand Up @@ -436,9 +440,8 @@ def _create_realignment_workflow(self, **opts):
# TODO - isolate and fix this Nipype defect.
reg_xfc = Registration(float=True, **metric_inputs)
register = pe.Node(reg_xfc, name='register')
realign_wf.connect(input_spec, 'reference', register, 'fixed_image')
realign_wf.connect(input_spec, 'in_file', register, 'moving_image')
realign_wf.connect(input_spec, 'reference',
register, 'moving_image')
realign_wf.connect(input_spec, 'mask',
register, 'moving_image_mask')
realign_wf.connect(input_spec, 'mask',
Expand Down

0 comments on commit f5106ff

Please sign in to comment.