Skip to content

Commit

Permalink
Upload all DICOM files in one node.
Browse files Browse the repository at this point in the history
  • Loading branch information
FredLoney committed Apr 21, 2015
1 parent 2599011 commit b06b1d7
Showing 1 changed file with 61 additions and 4 deletions.
65 changes: 61 additions & 4 deletions qipipe/pipeline/staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from collections import defaultdict
from nipype.pipeline import engine as pe
from nipype.interfaces.utility import IdentityInterface
from nipype.interfaces.utility import (IdentityInterface, Function)
from nipype.interfaces.dcmstack import DcmStack
from .. import project
from ..interfaces import (Gate, FixDicom, Compress, XNATFind, XNATCopy)
Expand Down Expand Up @@ -278,15 +278,54 @@ def _create_workflow(self, base_dir=None):
compress_dicom = pe.Node(Compress(), name='compress_dicom')
workflow.connect(fix_dicom, 'out_file', compress_dicom, 'in_file')
workflow.connect(iter_volume, 'dest', compress_dicom, 'dest')

# Collect the compressed DICOM files, as follows:
# * The volume DICOM files are collected into a list.
# * The volume DICOM file lists are merged into a scan
# DICOM file list.
# * The combined scan DICOM file list is uploaded to XNAT.
#
# The collection involves a two-step JoinNode, first on
# the volume, then on the scan. The second scan JoinNode
# calls a merge function to merge the lists from all of
# the first JoinNodes. All of this is necessary for the
# following reasons:
# * XNAT concurrent file upload tasks to the same resource
# results in a pyxnat.core.errors.DatabaseError with the
# useless message:
# 'The server encountered an unexpected condition'
# * Nipype Merge is inflexible (see the merge function
# comment below)
# Since concurrent upload is not supported, all of the
# compressed DICOM files must be collected into a single
# list which is uploaded in a single upload task when
# deployed in a clustered environment such as SGE.
collect_vol_dicom_xfc = IdentityInterface(fields=['dicom_files'])
collect_vol_dicom = pe.JoinNode(collect_vol_dicom_xfc,
joinsource='iter_dicom',
joinfield='dicom_files',
name='collect_vol_dicom')
workflow.connect(compress_dicom, 'out_file',
collect_vol_dicom, 'dicom_files')
collect_scan_dicom_xfc = Function(input_names=['lists'],
output_names=['out_list'],
function=merge)
collect_scan_dicom = pe.JoinNode(collect_scan_dicom_xfc,
joinsource='iter_volume',
joinfield='lists',
name='collect_scan_dicom')
workflow.connect(collect_vol_dicom, 'dicom_files',
collect_scan_dicom, 'lists')

# Upload the compressed DICOM files.
upload_dicom_xfc = XNATCopy(project=project(), resource='DICOM',
skip_existing=True)
upload_dicom = pe.JoinNode(upload_dicom_xfc, joinsource='iter_dicom',
joinfield='in_files', name='upload_dicom')
upload_dicom = pe.Node(upload_dicom_xfc, name='upload_dicom')
workflow.connect(input_spec, 'subject', upload_dicom, 'subject')
workflow.connect(input_spec, 'session', upload_dicom, 'session')
workflow.connect(scan_gate, 'scan', upload_dicom, 'scan')
workflow.connect(compress_dicom, 'out_file', upload_dicom, 'in_files')
workflow.connect(collect_scan_dicom, 'out_list',
upload_dicom, 'in_files')

# Stack the scan volume into a 3D NiFTI file.
# TODO - obtain the DICOM volume_tag below from the AIRCCollection via
Expand Down Expand Up @@ -353,3 +392,21 @@ def _create_workflow(self, base_dir=None):
self.depict_workflow(workflow)

return workflow


def merge(lists):
"""
Merges the given lists. This function works around the following
Nipype Merge node limitation:
* The Nipype Merge initializer requires the number of lists to merge.
This merge function accepts an arbitrary number of lists.
:param lists: this lists to merge
:return: the merged lists
"""
merger = lambda x,y: x + y

return reduce(merger, lists)

0 comments on commit b06b1d7

Please sign in to comment.