Skip to content

Commit

Permalink
Subsume set_inputs in the run methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
FredLoney committed Jun 6, 2017
1 parent cccbcc7 commit b3b9c0f
Showing 1 changed file with 54 additions and 50 deletions.
104 changes: 54 additions & 50 deletions qipipe/pipeline/staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from nipype.interfaces.utility import (IdentityInterface, Function)
from nipype.interfaces.dcmstack import DcmStack
import qixnat
from ..interfaces import (StickyIdentityInterface, FixDicom, Compress, XNATFind, XNATUpload)
from ..interfaces import (
StickyIdentityInterface, FixDicom, Compress, XNATFind, XNATUpload
)
from .workflow_base import WorkflowBase
from ..helpers.logging import logger
from ..staging import iterator
Expand Down Expand Up @@ -65,14 +67,10 @@ def run(subject, session, scan, *in_dirs, **opts):

# Make the scan workflow.
scan_wf = ScanStagingWorkflow(**opts)

# Sort the volumes.
vol_dcm_dict = sort(collection, scan, *in_dirs)
# Set the inputs.
scan_wf.set_inputs(collection, subject, session, scan, vol_dcm_dict, dest)

# Execute the workflow.
return scan_wf.run()
return scan_wf.run(collection, subject, session, scan, vol_dcm_dict, dest)


class ScanStagingWorkflow(WorkflowBase):
Expand Down Expand Up @@ -119,11 +117,9 @@ def __init__(self, **opts):
:class:`qipipe.pipeline.staging.StagingWorkflow`.
"""

def set_inputs(self, collection, subject, session, scan, vol_dcm_dict,
dest):
def run(self, collection, subject, session, scan, vol_dcm_dict, dest):
"""
Sets the scan staging workflow inputs for the *input_spec* node
and the iterables.
Executes this scan staging workflow.
:param collection: the collection name
:param subject: the subject name
Expand Down Expand Up @@ -151,9 +147,21 @@ def set_inputs(self, collection, subject, session, scan, vol_dcm_dict,
# in lock-step.
iter_volume.synchronize = True

def run(self):
"""Executes this scan staging workflow."""
self._run_workflow(self.workflow)
# Execute the workflow.
wf_res = self._run_workflow(self.workflow)

# The magic incantation to get the Nipype workflow result.
output_res = next(n for n in wf_res.nodes() if n.name == 'output_spec')
results = output_res.inputs.get()['out_files']

self.logger.debug(
"Executed the %s workflow on the %s %s scan %d"
" with 3D volume results:\n%s" %
(self.workflow.name, subject, session, scan results)
)

# Return the staged 3D volume files.
return results

def _create_workflow(self):
"""
Expand Down Expand Up @@ -199,21 +207,22 @@ def _create_workflow(self):
workflow.connect(iter_volume, fld, stg_node, fld)

# Upload the processed DICOM and 3D volume NIfTI files.
# The upload out_files output is the volume files.
upload_fields = hierarchy_fields + ['project', 'in_dir']
upload_xfc = Function(input_names=upload_fields,
output_names=['out_files'],
function=upload)
upload_node = pe.Node(upload_xfc, name='upload')
upload_node.inputs.project = self.project
workflow.connect(input_spec, 'subject', upload_node, 'subject')
workflow.connect(input_spec, 'session', upload_node, 'session')
workflow.connect(input_spec, 'scan', upload_node, 'scan')
workflow.connect(input_spec, 'dest', upload_node, 'in_dir')
upload = pe.Node(upload_xfc, name='upload')
upload.inputs.project = self.project
workflow.connect(input_spec, 'subject', upload, 'subject')
workflow.connect(input_spec, 'session', upload, 'session')
workflow.connect(input_spec, 'scan', upload, 'scan')
workflow.connect(input_spec, 'dest', upload, 'in_dir')

# The output is the 3D NIfTI volume image files.
output_spec = pe.Node(IdentityInterface(fields=['out_files']),
name='output_spec')
workflow.connect(upload_node, 'out_files', output_spec, 'out_files')
output_spec = pe.Node(StickyIdentityInterface(fields=['out_files']),
name='output_spec')
workflow.connect(upload, 'out_files', output_spec, 'out_files')

# Instrument the nodes for cluster submission, if necessary.
self._configure_nodes(workflow)
Expand Down Expand Up @@ -330,11 +339,9 @@ def __init__(self, **opts):
:class:`qipipe.pipeline.staging.StagingWorkflow`.
"""

def set_inputs(self, collection, subject, session, scan, volume, dest,
*in_files):
def run(self, collection, subject, session, scan, volume, dest, *in_files):
"""
Sets the staging workflow inputs for the *input_spec* node
and the iterables.
Executes this volume staging workflow.
:param collection: the collection name
:param subject: the subject name
Expand All @@ -357,9 +364,21 @@ def set_inputs(self, collection, subject, session, scan, volume, dest,
iter_dicom = self.workflow.get_node('iter_dicom')
iter_dicom.iterables = ('dicom_file', in_files)

def run(self):
"""Executes this staging workflow."""
self._run_workflow(self.workflow)
# Execute the workflow.
wf_res = self._run_workflow(self.workflow)

# The magic incantation to get the Nipype workflow result.
output_res = next(n for n in wf_res.nodes() if n.name == 'output_spec')
result = output_res.inputs.get()['out_file']

self.logger.debug(
"Executed the %s workflow on the %s %s scan %d"
" with 3D volume result %s" %
(self.workflow.name, subject, session, scan results)
)

# Return the staged 3D volume files.
return result

def _create_workflow(self):
"""
Expand Down Expand Up @@ -411,23 +430,11 @@ def _create_workflow(self):
workflow.connect(fix_dicom, 'out_file', stack, 'dicom_files')
workflow.connect(vol_fmt, 'format', stack, 'out_format')

# Upload the 3D NIfTI stack file to XNAT.
upload_3d_xfc = XNATUpload(project=self.project, resource='NIFTI',
modality='MR')
upload_3d = pe.Node(upload_3d_xfc, name='upload_3d')
workflow.connect(input_spec, 'subject', upload_3d, 'subject')
workflow.connect(input_spec, 'session', upload_3d, 'session')
workflow.connect(input_spec, 'scan', upload_3d, 'scan')
workflow.connect(stack, 'out_file', upload_3d, 'in_files')

# The output is the 3D NIfTI stack file. Make an intermediate
# gate node to ensure that upload is completed before setting
# the output field.
output_gate_flds = ['image', 'xnat_files']
output_gate_xfc = StickyIdentityInterface(fields=output_gate_flds)
output_gate = pe.Node(output_gate_xfc, name='output_gate')
workflow.connect(stack, 'out_file', output_gate, 'image')
workflow.connect(upload_3d, 'xnat_files', output_gate, 'xnat_files')
# The output is the 3D NIfTI stack file.
output_flds = ['out_file']
output_xfc = StickyIdentityInterface(fields=output_flds)
output_spec = pe.Node(output_xfc, name='output_spec')
workflow.connect(stack, 'out_file', output_spec, 'out_file')

# Instrument the nodes for cluster submission, if necessary.
self._configure_nodes(workflow)
Expand Down Expand Up @@ -474,13 +481,10 @@ def stage_volume(collection, subject, session, scan, volume, in_files,

# Make the workflow.
stg_wf = VolumeStagingWorkflow(base_dir=base_dir, **opts)
# Set the inputs.
stg_wf.set_inputs(collection, subject, session, scan, volume,
out_dir, *in_files)
# Execute the workflow.
logger(__name__).debug("Staging %s %s scan %d volume %d in %s..." %
(subject, session, scan, volume, out_dir))
stg_wf.run()
stg_wf.run(collection, subject, session, scan, volume, out_dir, *in_files)
logger(__name__).debug("Staged %s %s scan %d volume %d in %s." %
(subject, session, scan, volume, out_dir))

Expand Down

0 comments on commit b3b9c0f

Please sign in to comment.