Skip to content

Commit

Permalink
Gate staging upload on session creation.
Browse files Browse the repository at this point in the history
  • Loading branch information
FredLoney committed Feb 20, 2015
1 parent eadd933 commit 5049cc7
Showing 1 changed file with 49 additions and 52 deletions.
101 changes: 49 additions & 52 deletions qipipe/pipeline/staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,16 @@ def _create_workflow(self, scan_type, base_dir=None):
self._logger.debug("The %s workflow input node is %s with fields %s" %
(workflow.name, input_spec.name, in_fields))

# Create the session, if necessary.
find_session_xfc = XNATFind(project=project(), create=True)
# Create the session, if necessary. The gate blocks upload until the
# session is created.
find_session_xfc = XNATFind(project=project(), modality='MR', create=True)
find_session = pe.Node(find_session_xfc, name='find_session')
workflow.connect(input_spec, 'subject', find_session, 'subject')
workflow.connect(input_spec, 'session', find_session, 'session')
session_gate_xfc = Gate(fields=['session', 'xnat_id'])
session_gate = pe.Node(session_gate_xfc, name='session_gate')
workflow.connect(input_spec, 'session', session_gate, 'session')
workflow.connect(find_session, 'xnat_id', session_gate, 'xnat_id')

# The series iterator.
iter_series_fields = ['series', 'dest']
Expand Down Expand Up @@ -267,38 +272,21 @@ def _create_workflow(self, scan_type, base_dir=None):
# If the scan type is T1, then compress the corrected DICOM files.
# T2 scan DICOM files omit DICOM compress and upload.
if scan_type == 't1':
# Compress the DICOM files.
compress_dicom = pe.Node(Compress(), name='compress_dicom')
workflow.connect(fix_dicom, 'out_file', compress_dicom, 'in_file')
workflow.connect(iter_series, 'dest', compress_dicom, 'dest')

# TODO - if there are sporadic upload race conditions, then restore
# the commented code below, else remove it.
#
# # Force the DICOM upload to follow session create.
# # Since only one upload task can run at a time for a given series,
# # this upload gate node is a JoinNode that collects the iterated
# # scan DICOM files.
# upload_dicom_gate_xfc = Gate(fields=['xnat_id', 'scan', 'files'])
# upload_dicom_gate = pe.JoinNode(upload_dicom_gate_xfc,
# joinsource='iter_dicom',
# joinfield='files',
# name='upload_dicom_gate')
# workflow.connect(find_session, 'xnat_id', upload_dicom_gate, 'xnat_id')
# workflow.connect(iter_series, 'series', upload_dicom_gate, 'scan')
# workflow.connect(compress_dicom, 'out_file', upload_dicom_gate, 'files')

# Upload the compressed DICOM files.
upload_dicom_xfc = XNATCopy(project=project(), resource='DICOM',
skip_existing=True)
upload_dicom = pe.JoinNode(upload_dicom_xfc, joinsource='iter_dicom',
joinfield='in_files', name='upload_dicom')
workflow.connect(input_spec, 'subject', upload_dicom, 'subject')
workflow.connect(input_spec, 'session', upload_dicom, 'session')
# workflow.connect(upload_dicom_gate, 'scan', upload_dicom, 'scan')
# workflow.connect(upload_dicom_gate, 'files', upload_dicom, 'in_files')
workflow.connect(session_gate, 'session', upload_dicom, 'session')
workflow.connect(iter_series, 'series', upload_dicom, 'scan')
workflow.connect(compress_dicom, 'out_file', upload_dicom, 'in_files')

# Stack the scan into a 3D NiFTI file.
# Stack the scan volume into a 3D NiFTI file.
suffix = "_%s" % scan_type
stack_xfc = DcmStack(embed_meta=True,
out_format="series%(SeriesNumber)03d" + suffix)
Expand All @@ -307,46 +295,55 @@ def _create_workflow(self, scan_type, base_dir=None):

workflow.connect(fix_dicom, 'out_file', stack, 'dicom_files')

# Force the T1 3D upload to follow DICOM upload.
# Note: XNAT fails app. 80% into the T1 upload. It appears to be a
# concurrency conflict, possibly arising from the following causes:
# * the non-reentrant pyxnat's custom non-http2lib cache is corrupted
# * an XNAT archive directory access race condition
#
# However, the error cannot be isolated for the following reasons:
# * the error is sporadic and unreproducible
# * since nipype swallows non-nipype Python log messages, the upload
# and pyxnat log messages disappear
#
# This gate task serializes upload to prevent potential XNAT access
# conflicts.
# TODO - if upload works, then delete the commented lines below and
# following the line:
# workflow.connect(iter_series, 'series', upload_3d, 'scan')
#
# TODO - isolate and fix.
#
if scan_type == 't1':
upload_3d_gate_xfc = Gate(fields=['out_file', 'xnat_files'])
upload_3d_gate = pe.Node(upload_3d_gate_xfc, name='upload_3d_gate')
workflow.connect(upload_dicom, 'xnat_files', upload_3d_gate, 'xnat_files')
workflow.connect(stack, 'out_file', upload_3d_gate, 'out_file')
# # Force the T1 3D upload to follow DICOM upload.
# # Note: XNAT fails app. 80% into the T1 upload. It appears to be a
# # concurrency conflict, possibly arising from the following causes:
# # * the non-reentrant pyxnat's custom non-http2lib cache is corrupted
# # * an XNAT archive directory access race condition
# #
# # However, the error cannot be isolated for the following reasons:
# # * the error is sporadic and unreproducible
# # * since nipype swallows non-nipype Python log messages, the upload
# # and pyxnat log messages disappear
# #
# # This gate task serializes upload to prevent potential XNAT access
# # conflicts.
# #
# # TODO - isolate and fix.
# #
# if scan_type == 't1':
# upload_3d_gate_xfc = Gate(fields=['out_file', 'xnat_files'])
# upload_3d_gate = pe.Node(upload_3d_gate_xfc, name='upload_3d_gate')
# workflow.connect(upload_dicom, 'xnat_files', upload_3d_gate, 'xnat_files')
# workflow.connect(stack, 'out_file', upload_3d_gate, 'out_file')

# Upload the 3D NiFTI stack files to XNAT.
upload_3d_xfc = XNATCopy(project=project(), resource='NIFTI',
skip_existing=True)
upload_3d = pe.Node(upload_3d_xfc, name='upload_3d')
workflow.connect(input_spec, 'subject', upload_3d, 'subject')
workflow.connect(input_spec, 'session', upload_3d, 'session')
workflow.connect(session_gate, 'session', upload_3d, 'session')
workflow.connect(iter_series, 'series', upload_3d, 'scan')
# T1 upload is gated by DICOM upload.
if scan_type == 't1':
workflow.connect(upload_3d_gate, 'out_file', upload_3d, 'in_files')
else:
workflow.connect(stack, 'out_file', upload_3d, 'in_files')

# The output is the 3D NiFTI stack file.
output_spec = pe.Node(Gate(fields=['image', 'xnat_files']), name='output_spec')
# # T1 upload is gated by DICOM upload.
# if scan_type == 't1':
# workflow.connect(upload_3d_gate, 'out_file', upload_3d, 'in_files')
# else:
# workflow.connect(stack, 'out_file', upload_3d, 'in_files')
workflow.connect(stack, 'out_file', upload_3d, 'in_files')

# The output is the 3D NiFTI stack file. Make the output a Gate node
# rather than an IdentityInterface in order to prevent nipype from
# overzealously pruning it as extraneous.
output_spec_xfc = Gate(fields=['image', 'xnat_files'])
output_spec = pe.Node(output_spec_xfc, name='output_spec')
workflow.connect(stack, 'out_file', output_spec, 'image')
workflow.connect(upload_3d, 'xnat_files', output_spec, 'xnat_files')

# Instrument the nodes for cluster submission, if necessary.
self._configure_nodes(workflow)

self._logger.debug("Created the %s workflow." % workflow.name)
Expand Down

0 comments on commit 5049cc7

Please sign in to comment.