Skip to content

Commit

Permalink
Make actions a required positional workflow initialization argument r…
Browse files Browse the repository at this point in the history
…ather than a keyword.
  • Loading branch information
FredLoney committed Apr 26, 2015
1 parent 9635993 commit b9227b7
Showing 1 changed file with 40 additions and 36 deletions.
76 changes: 40 additions & 36 deletions qipipe/pipeline/qipipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
VOLUME_FILE_PAT = re.compile("volume(\d{3}).nii.gz$")
"""The volume image file name pattern."""

ACTIONS = ['stage', 'roi', 'register', 'model']
"""The list of all available actions."""

def run(*inputs, **opts):
"""
Creates a :class:`qipipe.pipeline.qipipeline.QIPipelineWorkflow`
Expand All @@ -54,44 +57,44 @@ def run(*inputs, **opts):
and :class:`QIPipelineWorkflow` initializer options,
as well as the following keyword options:
:keyword collection: the AIRC collection name
:keyword actions: the workflow actions to perform
:keyword resume: flag indicating whether to resume processing on
existing sessions (default False)
"""
# Tailor the actions.
actions = opts.get('actions', None)
if not actions:
actions = opts['actions'] = _default_actions(**opts)
actions = opts.pop('actions', _default_actions(**opts))
if 'stage' in actions:
_run_with_dicom_input(*inputs, **opts)
_run_with_dicom_input(actions, *inputs, **opts)
elif 'roi' in actions:
if len(actions) > 1:
raise ArgumentError("The ROI pipeline can only be run with"
" staging or stand-alone")
_run_with_roi_input(*inputs, **opts)
_run_with_roi_input(actions, *inputs, **opts)
else:
_run_with_xnat_input(*inputs, **opts)
_run_with_xnat_input(actions, *inputs, **opts)


def _default_actions(**opts):
"""
Returns the default actions from the given options, as follows:
* The default actions always include ``model``.
* If the *registration* resource option is set, then the actions
consist of ``register`` and ``model``.
* If the ``registration`` resource option is not set, then ``stage``
is included in the actions.
* Otherwise, all of the actions in :const:`ACTIONS` are performed.
:param opts: the :meth:`run` options
:return: the default actions
"""
if 'registration' in opts:
return ['register', 'model']
else:
return ['stage', 'roi', 'register', 'model']
return ACTIONS


def _run_with_dicom_input(*inputs, **opts):
def _run_with_dicom_input(actions, *inputs, **opts):
"""
:param actions: the actions to perform
:param inputs: the DICOM directories to process
:param opts: the :meth:`run` options as well as the following keyword
options:
Expand Down Expand Up @@ -119,19 +122,19 @@ def _run_with_dicom_input(*inputs, **opts):
# The workflow options are augmented from the base options.
wf_opts = dict(opts)
# Scan 1 is scan type T1. Only T1 can do more than staging.
if scan != 1:
wf_opts['actions'] = ['stage']
wf_actions = actions if scan == 1 else ['stage']
# Create a new workflow for the current scan type.
wf_gen = QIPipelineWorkflow(**wf_opts)
wf_gen = QIPipelineWorkflow(wf_actions, **wf_opts)
# Run the workflow on each {volume: [DICOM files]} item.
wf_gen.run_with_dicom_input(collection, sbj, sess, scan, vol_dcm_dict, dest)

# Make the TCIA subject map.
map_ctp(collection, *subjects, dest=dest)


def _run_with_roi_input(*inputs, **opts):
def _run_with_roi_input(actions, *inputs, **opts):
"""
:param actions: the actions to perform
:param inputs: the session directories to process
:param opts: the :meth:`QIPipelineWorkflow.run_with_roi_input` options as
well as the following keyword options:
Expand Down Expand Up @@ -159,18 +162,19 @@ def _run_with_roi_input(*inputs, **opts):
if _scan_resource_exists(prj, sbj, sess, 1, SCAN_TS_RSC):
wf_opts['scan_time_series'] = SCAN_TS_RSC
# Make the workflow.
wf_gen = QIPipelineWorkflow(**wf_opts)
wf_gen = QIPipelineWorkflow(actions, **wf_opts)
# The input (lesion number, slice index, file path) tuples.
rois = iter_roi(collection.name, sess_dir)
# Run the workflow on the input.
wf_gen.run_with_roi_input(collection.name, sbj, sess, 1, *rois)


def _run_with_xnat_input(*inputs, **opts):
def _run_with_xnat_input(actions, *inputs, **opts):
"""
Run the pipeline with a XNAT download. Each input is a XNAT scan
path, e.g. ``/QIN/Breast012/Session03/scan/1``.
:param actions: the actions to perform
:param inputs: the XNAT scan resource paths
:param opts: the :class:`QIPipelineWorkflow` initializer options
"""
Expand Down Expand Up @@ -203,20 +207,20 @@ def _run_with_xnat_input(*inputs, **opts):
wf_opts['mask'] = MASK_RSC

# Every post-stage action requires a 4D scan time series.
ts_actions = (action for action in opts[actions] if action != 'stage')
ts_actions = (action for action in actions if action != 'stage')
if any(ts_actions):
if _scan_resource_exists(prj, sbj, sess, scan, SCAN_TS_RSC):
wf_opts['scan_time_series'] = SCAN_TS_RSC

# If modeling will be performed on a specified registration
# resource, then check for an existing 4D registration time series.
if 'model' in opts['actions'] and 'registration' in opts:
if 'model' in actions and 'registration' in opts:
reg_ts_rsc = opts['registration'] + '_ts'
if _scan_resource_exists(prj, sbj, sess, scan, reg_ts_rsc):
wf_opts['realigned_time_series'] = reg_ts_rsc

# Execute the workflow.
wf_gen = QIPipelineWorkflow(**wf_opts)
wf_gen = QIPipelineWorkflow(actions, **wf_opts)
wf_gen.run_with_scan_download(xnat, prj, sbj, sess, scan)


Expand Down Expand Up @@ -288,6 +292,8 @@ class QIPipelineWorkflow(WorkflowBase):
- *session*: the session name
- *scan*: the scan number
In addition, if the staging or registration workflow is enabled
then the *iter_volume* node iterables input includes the
following fields:
Expand All @@ -301,20 +307,20 @@ class QIPipelineWorkflow(WorkflowBase):
- The staging workflow input is the QIN workflow input.
- The mask and reference workflow input images is the newly or
previously staged scan NiFTI image files.
- The mask workflow input is the newly created or previously staged
scan NiFTI image files.
- The modeling workflow input images is the combination of previously
and newly realigned image files.
- The modeling workflow input is the combination of the previously
uploaded and newly realigned image files.
The pipeline execution workflow is also available as the *workflow*
instance variable. The workflow input node is named *input_spec*
with the same input fields as the
:class:`qipipe.staging.RegistrationWorkflow` workflow *input_spec*.
The pipeline workflow is available as the
:attr:`qipipe.pipeline.qipipeline.QIPipelineWorkflow.workflow`
instance variable.
"""

def __init__(self, **opts):
def __init__(self, actions, **opts):
"""
:param actions: the actions to perform
:param opts: the :class:`qipipe.staging.WorkflowBase`
initialization options as well as the following options:
:keyword base_dir: the workflow execution directory
Expand All @@ -337,7 +343,7 @@ def __init__(self, **opts):
self.modeling_resource = None
"""The modeling XNAT resource name."""

self.workflow = self._create_workflow(**opts)
self.workflow = self._create_workflow(actions, **opts)
"""
The pipeline execution workflow. The execution workflow is executed
by calling the :meth:`run_with_dicom_input` or
Expand Down Expand Up @@ -443,15 +449,15 @@ def _partition_registered(self, xnat, project, subject, session, scan, files):

return reg_files, unreg_files

def _create_workflow(self, **opts):
def _create_workflow(self, actions, **opts):
"""
Builds the reusable pipeline workflow described in
:class:`qipipe.pipeline.qipipeline.QIPipeline`.
:param actions: the actions to perform
:param opts: the constituent workflow initializer options
:return: the Nipype workflow
"""
self._logger.debug("Building the QIN pipeline execution workflow...")

# This is a long method body with the following stages:
#
Expand All @@ -460,7 +466,7 @@ def _create_workflow(self, **opts):
# 3. Tie together the constituent workflows.
#
# The constituent workflows are created in back-to-front order,
# i.e. modeling, registration, reference, mask, staging.
# i.e. modeling, registration, mask, roi, staging.
# This order makes it easier to determine whether to create
# an upstream workflow depending on the presence of downstream
# workflows, e.g. the mask is not created if registration
Expand All @@ -469,6 +475,7 @@ def _create_workflow(self, **opts):
# By contrast, the workflows are tied together in front-to-back
# order.

self._logger.debug("Building the QIN pipeline execution workflow...")
# The work directory used for the master workflow and all
# constituent workflows.
if 'base_dir' in opts:
Expand All @@ -480,7 +487,6 @@ def _create_workflow(self, **opts):
exec_wf = pe.Workflow(name='qipipeline', base_dir=base_dir)

# The workflow options.
actions = opts.get('actions', ['stage', 'roi', 'register', 'model'])
mask_rsc = opts.get('mask')
scan_ts_rsc = opts.get('scan_time_series')
reg_rsc = opts.get('registration')
Expand Down Expand Up @@ -603,8 +609,6 @@ def _create_workflow(self, **opts):
# Stitch together the workflows:

# If staging is enabled, then stage the DICOM input.
# Otherwise, if the registration, mask or reference
# workflow is enabled, then download the scan files.
if stg_wf:
for field in input_spec.inputs.copyable_trait_names():
exec_wf.connect(input_spec, field,
Expand Down

0 comments on commit b9227b7

Please sign in to comment.