Skip to content

Commit

Permalink
Only merge multi-volume scan volumes.
Browse files Browse the repository at this point in the history
  • Loading branch information
FredLoney committed Jun 16, 2017
1 parent 7229bb9 commit 8343bfc
Showing 1 changed file with 41 additions and 17 deletions.
58 changes: 41 additions & 17 deletions qipipe/pipeline/staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ..helpers.constants import (SCAN_TS_BASE, SCAN_TS_FILE, VOLUME_FILE_PAT)
from ..helpers.logging import logger
from ..staging import (iterator, image_collection)
from ..staging.ohsu import MULTI_VOLUME_SCAN_NUMBERS
from ..staging.sort import sort
from .pipeline_error import PipelineError

Expand Down Expand Up @@ -73,7 +74,8 @@ def run(subject, session, scan, *in_dirs, **opts):
' determined from the options')

# Make the scan workflow.
scan_wf = ScanStagingWorkflow(**opts)
is_multi_volume = scan in MULTI_VOLUME_SCAN_NUMBERS
scan_wf = ScanStagingWorkflow(is_multi_volume=is_multi_volume, **opts)
# Sort the volumes.
vol_dcm_dict = sort(collection, scan, *in_dirs)
# Execute the workflow.
Expand Down Expand Up @@ -110,15 +112,17 @@ class ScanStagingWorkflow(WorkflowBase):
- *out_file*: the 3D volume stack NIfTI image file
"""

def __init__(self, **opts):
def __init__(self, is_multi_volume=True, **opts):
"""
:param is_multi_volume: flag indicating whether to include
volume merge tasks
:param opts: the :class:`qipipe.pipeline.workflow_base.WorkflowBase`
initializer keyword arguments
"""
super(ScanStagingWorkflow, self).__init__(__name__, **opts)

# Make the workflow.
self.workflow = self._create_workflow()
self.workflow = self._create_workflow(is_multi_volume)
"""
The scan staging workflow sequence described in
:class:`qipipe.pipeline.staging.StagingWorkflow`.
Expand Down Expand Up @@ -184,10 +188,13 @@ def run(self, collection, subject, session, scan, vol_dcm_dict, dest):
# Return the (time series, volume files) result.
return time_series, volume_files

def _create_workflow(self):
def _create_workflow(self, is_multi_volume=True):
"""
Makes the staging workflow described in
:class:`qipipe.pipeline.staging.StagingWorkflow`.
:param is_multi_volume: flag indicating whether to include
volume merge tasks
:return: the new workflow
"""
self.logger.debug('Creating the scan staging workflow...')
Expand Down Expand Up @@ -231,13 +238,6 @@ def _create_workflow(self):
)
workflow.connect(stg_node, 'out_file', collect_vols, 'volume_files')

# Merge the volumes.
merge_vols_xfc = MergeNifti(out_format=SCAN_TS_BASE)
merge_vols = pe.Node(merge_vols_xfc, name='merge_volumes')
workflow.connect(input_spec, 'volume_tag', merge_vols, 'sort_order')
workflow.connect(collect_vols, 'volume_files', merge_vols, 'in_files')
self.logger.debug('Connected staging to scan time series merge.')

# Upload the processed DICOM and NIfTI files.
# The upload out_files output is the volume files.
upload_fields = (
Expand All @@ -255,7 +255,19 @@ def _create_workflow(self):
workflow.connect(input_spec, 'dest', upload_node, 'dcm_dir')
workflow.connect(collect_vols, 'volume_files',
upload_node, 'volume_files')
workflow.connect(merge_vols, 'out_file', upload_node, 'time_series')
if is_multi_volume:
# Merge the volumes.
merge_vols_xfc = MergeNifti(out_format=SCAN_TS_BASE)
merge_vols = pe.Node(merge_vols_xfc, name='merge_volumes')
workflow.connect(input_spec, 'volume_tag',
merge_vols, 'sort_order')
workflow.connect(collect_vols, 'volume_files',
merge_vols, 'in_files')
workflow.connect(merge_vols, 'out_file',
upload_node, 'time_series')
self.logger.debug('Connected staging to scan time series merge.')
else:
upload_node.inputs.time_series = None
self.logger.debug('Connected scan time series merge to upload.')

# The output is the 4D time series and 3D NIfTI volume image files.
Expand All @@ -264,7 +276,10 @@ def _create_workflow(self):
name='output_spec')
workflow.connect(collect_vols, 'volume_files',
output_spec, 'volume_files')
workflow.connect(merge_vols, 'out_file', output_spec, 'time_series')
if is_multi_volume:
workflow.connect(merge_vols, 'out_file', output_spec, 'time_series')
else:
output_spec.inputs.time_series = None

# Instrument the nodes for cluster submission, if necessary.
self._configure_nodes(workflow)
Expand Down Expand Up @@ -517,8 +532,11 @@ def stage_volume(collection, subject, session, scan, volume, in_files,
out_dir = "%s/volume%03d" % (dest, volume)
os.mkdir(out_dir)

# The volume workflow runs in a subdirectory.
base_dir = "volume%03d" % volume

# Make the workflow.
stg_wf = VolumeStagingWorkflow(**opts)
stg_wf = VolumeStagingWorkflow(base_dir=base_dir, **opts)
# Execute the workflow.
logger(__name__).debug("Staging %s %s scan %d volume %d in %s..." %
(subject, session, scan, volume, out_dir))
Expand All @@ -530,7 +548,8 @@ def stage_volume(collection, subject, session, scan, volume, in_files,
return out_file


def upload(project, subject, session, scan, dcm_dir, volume_files, time_series):
def upload(project, subject, session, scan, dcm_dir, volume_files,
time_series=None):
"""
Uploads the staged files in *dcm_dir* as follows:
* the processed DICOM ``.dcm.gz`` files are uploaded to the
Expand All @@ -544,7 +563,8 @@ def upload(project, subject, session, scan, dcm_dir, volume_files, time_series):
:param scan: the scan number
:param dcm_dir: the input staged directory
:param volume_files: the 3D scan volume files
:param time_series: the 4D scan time series file
:param time_series: the 4D scan time series file, if the scan is
multi-volume
:return: the 3D volume image NIfTI files
"""
import os
Expand Down Expand Up @@ -594,7 +614,11 @@ def upload(project, subject, session, scan, dcm_dir, volume_files, time_series):
rsc = xnat.find_or_create(
project, subject, session, scan=scan, resource='NIFTI'
)
xnat.upload(rsc, time_series, *volume_files)
if time_series:
nii_files = volume_files + [time_series]
else:
nii_files = volume_files
xnat.upload(rsc, *nii_files)
_logger.debug("Uploaded %d %s %s scan %d staged NIfTI files to"
" XNAT." % (nii_file_cnt, subject, session, scan))

Expand Down

0 comments on commit 8343bfc

Please sign in to comment.