Skip to content

Commit

Permalink
stage_volume requires a collection parameter.
Browse files Browse the repository at this point in the history
  • Loading branch information
FredLoney committed Jun 5, 2017
1 parent c2b62f1 commit 6caef0e
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions qipipe/pipeline/staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def run(subject, session, scan, *in_dirs, **opts):
# Sort the volumes.
vol_dcm_dict = sort(collection, scan, *in_dirs)
# Set the inputs.
scan_wf.set_inputs(subject, session, scan, vol_dcm_dict)
scan_wf.set_inputs(collection, subject, session, scan, vol_dcm_dict, dest)

# Execute the workflow.
return scan_wf.run()
Expand Down Expand Up @@ -119,8 +119,8 @@ def __init__(self, **opts):
:class:`qipipe.pipeline.staging.StagingWorkflow`.
"""

def set_inputs(self, collection, subject, session, scan, dest,
vol_dcm_dict):
def set_inputs(self, collection, subject, session, scan, vol_dcm_dict,
dest):
"""
Sets the scan staging workflow inputs for the *input_spec* node
and the iterables.
Expand All @@ -129,8 +129,8 @@ def set_inputs(self, collection, subject, session, scan, dest,
:param subject: the subject name
:param session: the session name
:param scan: the scan number
:param dest: the destination directory
:param vol_dcm_dict: the input {volume: DICOM files} dictionary
:param dest: the destination directory
"""
# Set the top-level inputs.
input_spec = self.workflow.get_node('input_spec')
Expand Down Expand Up @@ -167,7 +167,8 @@ def _create_workflow(self):
workflow = pe.Workflow(name='stage_scan')

# The workflow input.
in_fields = ['collection', 'subject', 'session', 'scan', 'dest']
hierarchy_fields = ['subject', 'session', 'scan']
in_fields = hierarchy_fields + ['collection', 'dest']
input_spec = pe.Node(IdentityInterface(fields=in_fields),
name='input_spec')
self.logger.debug("The %s workflow input node is %s with fields %s" %
Expand All @@ -182,7 +183,9 @@ def _create_workflow(self):
(workflow.name, iter_volume.name, iter_fields))

# The volume staging node wraps the stage_volume function.
stg_inputs = in_fields + iter_fields + ['base_dir', 'opts']
stg_inputs = (
in_fields + iter_fields + ['collection', 'base_dir', 'opts']
)
stg_xfc = Function(input_names=stg_inputs, output_names=['out_dir'],
function=stage_volume)
stg_node = pe.Node(stg_xfc, name='stage')
Expand All @@ -196,7 +199,8 @@ def _create_workflow(self):
workflow.connect(iter_volume, fld, stg_node, fld)

# Upload the processed DICOM and 3D volume NIfTI files.
upload_xfc = Function(input_names=['in_dir'],
upload_fields = hierarchy_fields + ['project', 'in_dir']
upload_xfc = Function(input_names=upload_fields,
output_names=['out_files'],
function=upload)
upload_node = pe.Node(upload_xfc, name='upload')
Expand Down Expand Up @@ -436,13 +440,14 @@ def _create_workflow(self):
return workflow


def stage_volume(subject, session, scan, volume, in_files, dest,
base_dir, opts):
def stage_volume(collection, subject, session, scan, volume, in_files,
dest, base_dir, opts):
"""
Stages the given volume. The processed DICOM ``.dcm.gz`` files
and merged 3D NIfTI volume ``.nii.gz`` file are placed in the
*dest*/*volume* subdirectory.
:param collection: the collection name
:param subject: the subject name
:param session: the session name
:param scan: the scan number
Expand All @@ -456,6 +461,8 @@ def stage_volume(subject, session, scan, volume, in_files, dest,
"""
import os
import shutil
from qipipe.helpers.logging import logger
from qipipe.pipeline.staging import VolumeStagingWorkflow

# The destination is a subdirectory.
out_dir = "%s/volume%03d" % (dest, volume)
Expand Down Expand Up @@ -497,12 +504,15 @@ def upload(project, subject, session, scan, in_dir):
"""
import glob
import qixnat
from qipipe.helpers.logging import logger

# The DICOM files to upload.
dcm_files = glob.iglob("%s/volume*/*.dcm.gz" % in_dir)
# Upload the compressed DICOM files in one action.
_logger.debug("Uploading the %s %s scan %d staged DICOM files to XNAT..." %
(subject, session, scan))
logger(__name__).debug(
"Uploading the %s %s scan %d staged DICOM files to XNAT..." %
(subject, session, scan)
)
with qixnat.connect() as xnat:
# The target XNAT scan DICOM resource object.
# The modality option is required if it is necessary to
Expand Down

0 comments on commit 6caef0e

Please sign in to comment.