Skip to content

Commit

Permalink
Make the realigned 4D time series in the register processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
FredLoney committed May 24, 2017
1 parent 68dfcce commit 145523b
Showing 1 changed file with 103 additions and 132 deletions.
235 changes: 103 additions & 132 deletions qipipe/pipeline/qipipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,30 +213,11 @@ def _run_with_xnat_input(actions, *inputs, **opts):
raise ArgumentError("The XNAT scan object does not exist: %s" %
path)

# The workflow options are augmented from the base options.
wf_opts = dict(opts)

# If modeling will be performed on a specified registration
# resource, then check for an existing 4D registration time
# series.
if 'model' in actions:
reg_rsc = opts.get('registration_resource')
if reg_rsc:
reg_ts_name = reg_rsc + '_ts.nii.gz'
file_obj = xnat.find_one(prj, sbj, sess, scan=scan,
resource=reg_rsc,
file=reg_ts_name)
if file_obj:
wf_opts['realigned_time_series'] = reg_ts_name

# Make the workflow.
workflow = QIPipelineWorkflow(prj, sbj, sess, scan, actions, **wf_opts)
# If a registration resource was specified, then set the flag
# to check for registered scans.
has_reg_opt = opts.get('registration_resource') != None
workflow = QIPipelineWorkflow(prj, sbj, sess, scan, actions, **opts)

# Run the workflow.
workflow.run_with_scan_download(prj, sbj, sess, scan, actions,
has_reg_opt)
workflow.run_with_scan_download(prj, sbj, sess, scan, actions)



Expand Down Expand Up @@ -488,8 +469,7 @@ def run_with_dicom_input(self, actions, scan_input):
(scan_input.subject, scan_input.session,
scan_input.scan))

def run_with_scan_download(self, project, subject, session, scan, actions,
is_existing_registration_resource):
def run_with_scan_download(self, project, subject, session, scan, actions):
"""
Runs the execution workflow on downloaded scan image files.
Expand All @@ -498,9 +478,6 @@ def run_with_scan_download(self, project, subject, session, scan, actions,
:param session: the session name
:param scan: the scan number
:param actions: the workflow actions
:param is_existing_registration_resource: flag indicating
whether an existing registration resource name was
specified
"""
self.logger.debug("Processing the %s %s %s scan %d volumes..." %
(project, subject, session, scan))
Expand All @@ -516,7 +493,7 @@ def run_with_scan_download(self, project, subject, session, scan, actions,
# If the registration resource already exists in XNAT, then
# partition the scan image file names into those which are
# already registered and those which need to be registered.
if is_existing_registration_resource:
if self.registration_resource:
registered, unregistered = self._partition_registered(
xnat, project, subject, session, scan, scan_volumes
)
Expand All @@ -529,37 +506,37 @@ def run_with_scan_download(self, project, subject, session, scan, actions,
if registered:
if unregistered:
self.logger.debug("Skipping registration of %d"
" previously registered %s %s %s"
" Scan %d volumes:" %
(len(registered), project, subject,
session, scan))
" previously registered %s %s %s"
" scan %d volumes:" %
(len(registered), project, subject,
session, scan))
self.logger.debug("%s" % registered)
else:
self.logger.debug("Skipping %s %s %s scan %d"
" registration, since all volumes"
" are already registered." %
(project, subject, session, scan))
" registration, since all volumes"
" are already registered." %
(project, subject, session, scan))
if unregistered:
self.logger.debug("Registering %d %s %s %s scan %d"
" volumes:" %
(len(unregistered), project, subject,
session, scan))
" volumes:" %
(len(unregistered), project, subject,
session, scan))
self.logger.debug("%s" % sorted(unregistered))
elif unregistered and is_existing_registration_resource:
elif unregistered and self.registration_resource:
self.logger.error("The pipeline %s %s %s scan %d register"
" action is not specified but there are"
" %d unregistered volumes:" %
(project, subject, session, scan,
len(unregistered)))
self.logger.error("%s" % sorted(unregistered))
" action is not specified but there are"
" %d unregistered volumes:" %
(project, subject, session, scan,
len(unregistered)))
self.logger.error(str(sorted(unregistered)))
raise ArgumentError("The pipeline %s %s %s scan %d register"
" action is not specified but there are"
" unregistered volumes" %
(project, subject, session, scan))
else:
self.logger.debug("Processing %d %s %s %s scan %d volumes:" %
(len(registered), project, subject, session,
scan))
(len(registered), project, subject, session,
scan))
self.logger.debug("%s" % sorted(registered))

# Set the workflow input.
Expand All @@ -573,8 +550,8 @@ def run_with_scan_download(self, project, subject, session, scan, actions,
self._run_workflow(self.workflow)
if unregistered:
self.logger.debug("Registered %d %s %s %s scan %d volumes:" %
(len(unregistered), project, subject, session,
scan))
(len(unregistered), project, subject,
session, scan))
self.logger.debug("%s" % unregistered)

def _set_roi_inputs(self, *inputs):
Expand Down Expand Up @@ -668,12 +645,19 @@ def _create_workflow(self, subject, session, scan, actions, **opts):
if 'register' in actions:
reg_inputs = ['subject', 'session','scan', 'reference_index',
'mask', 'in_files', 'opts']
# The registration function keyword options.

# The registration technique option is required
# for the registration action.
if not self.registration_technique:
raise ArgumentError('Missing the registration technique')

# The registration resource option is set if and
# only if we are resuming a previously interrupted
# registration process.
if not self.registration_resource:
reg_rsc = registration.generate_resource_name()
self.registration_resource = reg_rsc

# Spell out the registration workflow options rather
# than delegating to this qipipeline workflow as the
# parent, since Nipype Function arguments must be
Expand All @@ -683,17 +667,21 @@ def _create_workflow(self, subject, session, scan, actions, **opts):
reg_opts['technique'] = self.registration_technique
if 'recursive_registration' in opts:
reg_opts['recursive'] = opts['recursive_registration']

# The registration function.
reg_xfc = Function(input_names=reg_inputs,
output_names=['out_files'],
function=register)
reg_node = pe.Node(reg_xfc, name='register')
reg_node.inputs.opts = reg_opts

# The fixed reference volume index option.
reg_ref_opt = opts.pop('registration_reference', None)
if reg_ref_opt:
reg_node.inputs.reference_index = int(reg_ref_opt) - 1
self.logger.debug("The registration reference is %d." %
reg_node.inputs.reference_index)

self.logger.info("Enabled registration with options %s." %
reg_opts)
else:
Expand Down Expand Up @@ -815,12 +803,12 @@ def _create_workflow(self, subject, session, scan, actions, **opts):
# a scan time series. If there is a scan time series resource
# option, then the scan time series will be downloaded.
# Otherwise, it will be created from the staged input.
need_scan_ts = (
mask_node or roi_node or (mdl_node and not reg_node)
is_scan_modeling = (
mdl_node and not reg_node and not self.registration_resource
)
need_scan_ts = mask_node or roi_node or is_scan_modeling
scan_ts = None
if need_scan_ts and not stg_node:
has_scan_ts = False
with qixnat.connect() as xnat:
has_scan_ts = _scan_file_exists(
xnat, self.project, subject, session, scan, 'NIFTI',
Expand Down Expand Up @@ -980,6 +968,28 @@ def _create_workflow(self, subject, session, scan, actions, **opts):
self.logger.debug('Connected bolus arrival to the'
' registration reference index.')

# Stack the registered images into a 4D time series.
reg_ts_name = self.registration_resource + '_ts'
merge_reg_xfc = MergeNifti(out_format=reg_ts_name)
merge_reg = pe.Node(merge_reg_xfc, name='merge_reg')
exec_wf.connect(reg_node, 'out_files', merge_reg, 'in_files')

# Upload the realigned time series to XNAT.
upload_reg_ts_xfc = XNATUpload(
project=self.project, resource=self.registration_resource,
modality='MR'
)
upload_reg_ts = pe.Node(upload_reg_ts_xfc,
name='upload_reg_time_series')
exec_wf.connect(input_spec, 'subject',
upload_reg_ts, 'subject')
exec_wf.connect(input_spec, 'session',
upload_reg_ts, 'session')
exec_wf.connect(input_spec, 'scan',
upload_reg_ts, 'scan')
exec_wf.connect(merge_reg, 'out_file',
upload_reg_ts, 'in_files')

# If the modeling workflow is enabled, then model the scan or
# realigned images.
if mdl_node:
Expand All @@ -997,88 +1007,49 @@ def _create_workflow(self, subject, session, scan, actions, **opts):
self.logger.debug('Connected bolus arrival to modeling.')

# Obtain the modeling input 4D time series.
if reg_node:
# Merge the realigned images to 4D.
reg_ts_name = self.registration_resource + '_ts'
merge_reg_xfc = MergeNifti(out_format=reg_ts_name)
merge_reg = pe.Node(merge_reg_xfc, name='merge_reg')

# If the registration resource name was specified,
# then download the previously realigned images.
if self.registration_resource:
reg_dl_xfc = XNATDownload(
project=self.project,
resource=self.registration_resource,
file='volume*.nii.gz'
if is_scan_modeling:
# There is no register action and no registration
# resource option. In that case, model the scan
# input. scan_ts is always created previously if
# is_scan_modeling is true.
exec_wf.connect(scan_ts, 'out_file', mdl_node, 'time_series')
elif reg_node:
# merge_reg is created in the registration processing.
exec_wf.connect(merge_reg, 'out_file', mdl_node, 'time_series')
self.logger.debug('Connected registration to modeling.')
else:
# Check for a previously created registration time
# series. Note that self.registration_resource is
# set since is_scan_modeling is false and reg_node
# is None.
reg_ts_name = self.registration_resource + '_ts.nii.gz'
with qixnat.connect() as xnat:
has_reg_ts = _scan_file_exists(
xnat, self.project, subject, session, scan,
self.registration_resource, reg_ts_name
)
download_reg = pe.Node(reg_dl_xfc,
name='download_realigned_images')
exec_wf.connect(input_spec, 'subject',
download_reg, 'subject')
exec_wf.connect(input_spec, 'session',
download_reg, 'session')
exec_wf.connect(input_spec, 'scan',
download_reg, 'scan')
if reg_node:
# Merge the previously and newly realigned images.
concat_reg = pe.Node(Merge(2),
name='concat_reg')
exec_wf.connect(download_reg, 'out_files',
concat_reg, 'in1')
exec_wf.connect(reg_node, 'out_files',
concat_reg, 'in2')
exec_wf.connect(concat_reg, 'out',
merge_reg, 'in_files')
else:
# All of the realigned files were downloaded.
exec_wf.connect(download_reg, 'out_files',
merge_reg, 'in_files')
elif reg_node:
# All of the realigned files were created by the
# registration workflow.
exec_wf.connect(reg_node, 'out_files',
merge_reg, 'in_files')
else:
raise ArgumentError(
'The pipeline cannot perform modeling on the'
' registration result, since the registration'
' workflow is disabled and no registration resource'
' was specified.')

# Upload the realigned time series to XNAT.
upload_reg_ts_xfc = XNATUpload(
project=self.project, resource=self.registration_resource,
modality='MR'
# The time series must have been created by the
# registration process.
if not has_reg_ts:
raise PipelineError(
"The %s %s scan %d registration resource %s does"
" not have the time series file %s" %
(subject, session. scan, self.registration_resource,
reg_ts_name)
)
# Download the registration time series.
dl_reg_ts_xfc = XNATDownload(
project=self.project,
resource=self.registration_resource,
file=reg_ts_name
)
upload_reg_ts = pe.Node(upload_reg_ts_xfc,
name='upload_reg_time_series')
exec_wf.connect(input_spec, 'subject',
upload_reg_ts, 'subject')
exec_wf.connect(input_spec, 'session',
upload_reg_ts, 'session')
exec_wf.connect(input_spec, 'scan',
upload_reg_ts, 'scan')
exec_wf.connect(merge_reg, 'out_file',
upload_reg_ts, 'in_files')

dl_reg_ts = pe.Node(dl_reg_ts_xfc,
name='download_reg_time_series')
exec_wf.connect(input_spec, 'subject', dl_reg_ts, 'subject')
exec_wf.connect(input_spec, 'session', dl_reg_ts, 'session')
exec_wf.connect(input_spec, 'scan', dl_reg_ts, 'scan')
# Pass the realigned time series to modeling.
exec_wf.connect(merge_reg, 'out_file',
mdl_node, 'time_series')
self.logger.debug('Connected registration to modeling.')
elif self.registration_resource:
# Download the XNAT time series file.
reg_ts_name = self.registration_resource + '_ts.nii.gz'
ts_dl_xfc = XNATDownload(project=self.project,
resource=self.registration_resource,
file=reg_ts_name)
reg_ts = pe.Node(ts_dl_xfc, name='download_reg_time_series')
exec_wf.connect(input_spec, 'subject', reg_ts, 'subject')
exec_wf.connect(input_spec, 'session', reg_ts, 'session')
exec_wf.connect(input_spec, 'scan', reg_ts, 'scan')
exec_wf.connect(reg_ts, 'out_file', mdl_node, 'time_series')
else:
# Model the scan input.
exec_wf.connect(scan_ts, 'out_file', mdl_node, 'time_series')
exec_wf.connect(dl_reg_ts, 'out_file', mdl_node, 'time_series')

# Set the configured workflow node inputs and plug-in options.
self._configure_nodes(exec_wf)
Expand Down Expand Up @@ -1192,7 +1163,7 @@ def stage(subject, session, scan, in_dirs, opts):
def register(subject, session, scan, in_files, opts, reference_index=0,
mask=None):
"""
A stub for the :meth:`register` method.
A facade for the :meth:`qipipe.pipeline.registration.run` method.
:Note: The *mask* and *reference_index* parameters are
registration options, but can'tbe included in the *opts*
Expand Down

0 comments on commit 145523b

Please sign in to comment.