Skip to content

Commit

Permalink
scan is set at staging workflow execution rather than creation.
Browse files Browse the repository at this point in the history
  • Loading branch information
FredLoney committed Apr 20, 2015
1 parent 20e3038 commit b652d7d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 36 deletions.
10 changes: 4 additions & 6 deletions qipipe/pipeline/qipipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ def run(*inputs, **opts):
existing sessions (default False)
"""
# Tailor the actions.
actions = opts.get('actions', _default_actions(**opts))
opts['actions'] = actions
actions = opts.get('actions', None)
if not actions:
actions = opts['actions'] = _default_actions(**opts)
if 'stage' in actions:
_run_with_dicom_input(*inputs, **opts)
elif 'roi' in actions:
Expand Down Expand Up @@ -552,10 +553,7 @@ def _create_workflow(self, **opts):

# The staging workflow.
if 'stage' in actions:
scan = opts.pop('scan', None)
if not scan:
raise ArgumentError("The required staging argument scan is missing")
stg_wf = StagingWorkflow(scan, base_dir=base_dir).workflow
stg_wf = StagingWorkflow(base_dir=base_dir).workflow
self._logger.info("Enabled staging.")
else:
stg_wf = None
Expand Down
60 changes: 32 additions & 28 deletions qipipe/pipeline/staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ..staging import staging_helper


def set_workflow_inputs(exec_wf, collection, subject, session,
def set_workflow_inputs(exec_wf, collection, subject, session, scan,
vol_dcm_dict, dest=None):
"""
Sets the given execution workflow inputs.
Expand All @@ -23,6 +23,7 @@ def set_workflow_inputs(exec_wf, collection, subject, session,
:param collection: the AIRC collection name
:param subject: the subject name
:param session: the session name
:param scan: the scan number
:param vol_dcm_dict: the *{volume number: [dicom files]}* dictionary
:param dest: the TCIA staging destination directory (default is
the current working directory)
Expand All @@ -41,6 +42,7 @@ def set_workflow_inputs(exec_wf, collection, subject, session,
input_spec.inputs.collection = collection
input_spec.inputs.subject = subject
input_spec.inputs.session = session
input_spec.inputs.scan = scan

# Set the volume iterator inputs.
iter_volume = exec_wf.get_node('iter_volume')
Expand Down Expand Up @@ -120,6 +122,8 @@ class StagingWorkflow(WorkflowBase):
- *session*: the session name
- *scan*: the scan number
The staging workflow has two iterables:
- the *iter_volume* node with input fields *volume* and *dest*
Expand Down Expand Up @@ -165,12 +169,11 @@ class StagingWorkflow(WorkflowBase):
.. _DcmStack: http://nipy.sourceforge.net/nipype/interfaces/generated/nipype.interfaces.dcmstack.html
"""

def __init__(self, scan, **opts):
def __init__(self, **opts):
"""
If the optional configuration file is specified, then the workflow
settings in that file override the default settings.
:param scan: the scan number
:parameter opts: the following keword options:
:keyword project: the XNAT project (default ``QIN``)
:keyword base_dir: the workflow execution directory
Expand All @@ -193,58 +196,58 @@ def __init__(self, scan, **opts):
:class:`qipipe.pipeline.staging.StagingWorkflow`.
"""

def set_inputs(self, collection, subject, session, vol_dcm_dict,
def set_inputs(self, collection, subject, session, scan, vol_dcm_dict,
dest=None):
"""
Sets the staging workflow inputs.
:param collection: the collection name
:param subject: the subject name
:param session: the session name
:param scan: the scan number
:param vol_dcm_dict: the *{volume: [dicom files]}* dictionary
:param dest: the TCIA staging destination directory (default is
the current working directory)
"""
set_workflow_inputs(self.workflow, collection, subject, session,
vol_dcm_dict, dest)
scan, vol_dcm_dict, dest)

def run(self):
"""Executes the staging workflow."""
self._run_workflow(self.workflow)

def _create_workflow(self, scan, base_dir=None):
def _create_workflow(self, base_dir=None):
"""
Makes the staging workflow described in
:class:`qipipe.pipeline.staging.StagingWorkflow`.
:param scan: the scan number
:param base_dir: the workflow execution directory
(default is a new temp directory)
:return: the new workflow
"""
self._logger.debug("Creating the %d DICOM processing workflow..." %
scan)
self._logger.debug('Creating the DICOM processing workflow...')

# The Nipype workflow object.
workflow = pe.Workflow(name='staging', base_dir=base_dir)

# The workflow input.
in_fields = ['collection', 'subject', 'session']
in_fields = ['collection', 'subject', 'session', 'scan']
input_spec = pe.Node(IdentityInterface(fields=in_fields),
name='input_spec')
self._logger.debug("The %s workflow input node is %s with fields %s" %
(workflow.name, input_spec.name, in_fields))

# Create the session, if necessary. The gate blocks upload until the
# session is created.
find_session_xfc = XNATFind(project=project(), modality='MR', create=True)
find_session = pe.Node(find_session_xfc, name='find_session')
workflow.connect(input_spec, 'subject', find_session, 'subject')
workflow.connect(input_spec, 'session', find_session, 'session')
session_gate_xfc = Gate(fields=['session', 'xnat_id'])
session_gate = pe.Node(session_gate_xfc, name='session_gate')
workflow.connect(input_spec, 'session', session_gate, 'session')
workflow.connect(find_session, 'xnat_id', session_gate, 'xnat_id')
# Create the scan, if necessary. The gate blocks upload until the
# scan is created.
find_scan_xfc = XNATFind(project=project(), modality='MR', create=True)
find_scan = pe.Node(find_scan_xfc, name='find_scan')
workflow.connect(input_spec, 'subject', find_scan, 'subject')
workflow.connect(input_spec, 'session', find_scan, 'session')
workflow.connect(input_spec, 'scan', find_scan, 'scan')
scan_gate_xfc = Gate(fields=['scan', 'xnat_id'])
scan_gate = pe.Node(scan_gate_xfc, name='scan_gate')
workflow.connect(input_spec, 'scan', scan_gate, 'scan')
workflow.connect(find_scan, 'xnat_id', scan_gate, 'xnat_id')

# The volume iterator.
iter_volume_fields = ['volume', 'dest']
Expand All @@ -254,7 +257,6 @@ def _create_workflow(self, scan, base_dir=None):
" fields %s" % (workflow.name, iter_volume.name,
iter_volume_fields))


# The DICOM file iterator.
iter_dicom_fields = ['volume', 'dicom_file']
iter_dicom = pe.Node(IdentityInterface(fields=iter_dicom_fields),
Expand All @@ -277,12 +279,13 @@ def _create_workflow(self, scan, base_dir=None):
workflow.connect(fix_dicom, 'out_file', compress_dicom, 'in_file')
workflow.connect(iter_volume, 'dest', compress_dicom, 'dest')
# Upload the compressed DICOM files.
upload_dicom_xfc = XNATCopy(project=project(), scan=scan,
resource='DICOM', skip_existing=True)
upload_dicom_xfc = XNATCopy(project=project()resource='DICOM',
skip_existing=True)
upload_dicom = pe.JoinNode(upload_dicom_xfc, joinsource='iter_dicom',
joinfield='in_files', name='upload_dicom')
workflow.connect(input_spec, 'subject', upload_dicom, 'subject')
workflow.connect(session_gate, 'session', upload_dicom, 'session')
workflow.connect(input_spec, 'session', upload_dicom, 'session')
workflow.connect(scan_gate, 'scan', upload_dicom, 'scan')
workflow.connect(compress_dicom, 'out_file', upload_dicom, 'in_files')

# Stack the scan volume into a 3D NiFTI file.
Expand Down Expand Up @@ -323,12 +326,13 @@ def _create_workflow(self, scan, base_dir=None):
# workflow.connect(stack, 'out_file', upload_3d_gate, 'out_file')

# Upload the 3D NiFTI stack files to XNAT.
upload_3d_xfc = XNATCopy(project=project(), scan=scan,
resource='NIFTI', skip_existing=True)
upload_3d_xfc = XNATCopy(project=project(), resource='NIFTI',
skip_existing=True)
upload_3d = pe.Node(upload_3d_xfc, name='upload_3d')
workflow.connect(input_spec, 'subject', upload_3d, 'subject')
workflow.connect(session_gate, 'session', upload_3d, 'session')
# # 3D upload is gated by DICOM upload.
workflow.connect(input_spec, 'session', upload_3d, 'session')
workflow.connect(scan_gate, 'scan', upload_3d, 'scan')
# 3D upload is gated by DICOM upload.
# workflow.connect(upload_3d_gate, 'out_file', upload_3d, 'in_files')
workflow.connect(stack, 'out_file', upload_3d, 'in_files')

Expand Down
4 changes: 2 additions & 2 deletions test/unit/pipeline/test_staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ def _test_collection(self, collection):
for sbj, sess, scan, vol_dcm_dict in iter_stage(collection, *inputs,
dest=dest):
work_dir = os.path.join(work, 'scan', str(scan))
stg_wf = staging.StagingWorkflow(scan, base_dir=work_dir)
stg_wf.set_inputs(collection, sbj, sess, vol_dcm_dict,
stg_wf = staging.StagingWorkflow(base_dir=work_dir)
stg_wf.set_inputs(collection, sbj, sess, scan, vol_dcm_dict,
dest=dest)
stg_wf.run()
# Verify the result.
Expand Down

0 comments on commit b652d7d

Please sign in to comment.