Skip to content

Commit

Permalink
_run_workflow is always on self.workflow.
Browse files Browse the repository at this point in the history
  • Loading branch information
FredLoney committed Jun 30, 2017
1 parent b3fa63d commit 1da9680
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 31 deletions.
4 changes: 2 additions & 2 deletions qipipe/pipeline/mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def run(self, subject, session, scan, time_series):
out_file = os.path.join(self.base_dir, 'mask.nii.gz')
self.set_inputs(subject, session, scan, time_series, out_file)
# Execute the workflow.
self._run_workflow(self.workflow)
self._run_workflow()
self.logger.debug("Created the %s %s scan %s time series %s mask XNAT"
" resource %s in %s." %
(subject, session, scan, time_series, MASK_RESOURCE,
Expand Down Expand Up @@ -127,7 +127,7 @@ def _create_workflow(self, **opts):
(default False)
:return: the Workflow object
"""
self.logger.debug('Creating the mask reusable workflow...')
self.logger.debug('Building the mask workflow...')
workflow = pe.Workflow(name='mask', base_dir=self.base_dir)

# The workflow input.
Expand Down
8 changes: 4 additions & 4 deletions qipipe/pipeline/modeling.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ def run(self, subject, session, scan, time_series, **opts):
" time series %s..." %
(self.workflow.name, subject, session, scan, time_series)
)
wf_res = self._run_workflow(self.workflow)
# If dry-run is set, then there is no result.
wf_res = self._run_workflow()
# If dry_run is set, then there is no result.
if not wf_res:
return {}
# The magic incantation to get the Nipype workflow result.
Expand Down Expand Up @@ -306,8 +306,6 @@ def _create_workflow(self, **opts):
:return: the Nipype workflow
"""
self.logger.debug("Building the modeling workflow...")

# The supervisory workflow.
exec_wf = pe.Workflow(name='modeling', base_dir=self.base_dir)

# The default modeling technique is the OHSU proprietary modeling
Expand Down Expand Up @@ -455,6 +453,7 @@ def _create_airc_workflow(self, **opts):
:param opts: the PK modeling parameters
:return: the Nipype Workflow
"""
self.logger.debug('Building the AIRC modeling workflow...')
workflow = pe.Workflow(name='airc', base_dir=self.base_dir)

# The modeling profile configuration sections.
Expand Down Expand Up @@ -634,6 +633,7 @@ def _create_mock_workflow(self, **opts):
:param opts: the PK modeling parameters
:return: the Nipype Workflow
"""
self.logger.debug('Building the mock modeling workflow...')
workflow = pe.Workflow(name='mock', base_dir=self.base_dir)

# The modeling profile configuration sections.
Expand Down
5 changes: 2 additions & 3 deletions qipipe/pipeline/roi.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def run(self, subject, session, scan, time_series, *inputs):
# Execute the workflow.
self.logger.info("Executing the %s workflow on %s %s scan %d..." %
(self.workflow.name, subject, session, scan))
self._run_workflow(self.workflow)
self._run_workflow()
self.logger.info("Executed the %s workflow on %s %s scan %d." %
(self.workflow.name, subject, session, scan))

Expand Down Expand Up @@ -165,8 +165,7 @@ def _create_workflow(self, **opts):
:param opts: the workflow creation options:
:return: the execution workflow
"""
self.logger.debug("Creating the ROI execution workflow...")

self.logger.debug("Building the ROI workflow...")
# The execution workflow.
workflow = pe.Workflow(name='roi', base_dir=self.base_dir)

Expand Down
49 changes: 27 additions & 22 deletions qipipe/pipeline/workflow_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ def __init__(self, name, **opts):
self.config_dir = cfg_dir
"""The workflow node inputs configuration directory."""

self.configuration = self._load_configuration(name)
cfg_name = name.split('.')[-1]
self.configuration = self._load_configuration(cfg_name)
"""The workflow node inputs configuration."""

config_s = pprint.pformat(self.configuration)
Expand Down Expand Up @@ -213,6 +214,8 @@ def _load_configuration(self, name):
def_cfg_files = self._configuration_files('default')
# The workflow configuration files.
wf_cfg_files = self._configuration_files(name)
self.logger.debug("Found %d %s workflow-specific configuration"
" files." % (len(wf_cfg_files), name))
# All path configuration files.
cfg_files = def_cfg_files + wf_cfg_files

Expand Down Expand Up @@ -258,50 +261,51 @@ def _download_scans(self, xnat, subject, session, dest):
"""
return xnat.download(self.project, subject, session, dest=dest)

def _run_workflow(self, workflow):
def _run_workflow(self):
"""
Executes the given workflow.
Executes the Nipype workflow.
:param workflow: the workflow to run
:return: the workflow execution result graph
"""
# If the workflow can be distributed, then get the plugin
# arguments.
is_dist_clause = 'is' if self.is_distributable else 'is not'
self.logger.debug("The %s workflow %s distributable in a"
" cluster environment." %
(workflow.name, is_dist_clause))
(self.workflow.name, is_dist_clause))
if self.is_distributable:
opts = self._configure_plugin(workflow)
opts = self._configure_plugin()
else:
opts = {}

# Set the base directory to an absolute path.
if workflow.base_dir:
workflow.base_dir = os.path.abspath(workflow.base_dir)
if self.workflow.base_dir:
self.workflow.base_dir = os.path.abspath(self.workflow.base_dir)
else:
workflow.base_dir = self.base_dir
self.workflow.base_dir = self.base_dir

if self.dry_run:
self.logger.debug("Skipped workflow %s execution,"
" since the dry run flag is set." %
workflow.name)
self.workflow.name)
else:
# Run the workflow.
self.logger.debug("Executing the %s workflow in %s..." %
(workflow.name, workflow.base_dir))
with qixnat.connect(cachedir=workflow.base_dir):
return workflow.run(**opts)
(self.workflow.name, self.workflow.base_dir))
with qixnat.connect(cachedir=self.workflow.base_dir):
return self.workflow.run(**opts)

def _inspect_workflow_inputs(self, workflow):
def _inspect_workflow_inputs(self):
"""
Collects the given workflow nodes' inputs for debugging.
:return: a {node name: parameters} dictionary, where *parameters*
is a node parameter {name: value} dictionary
"""
return {node_name: self._inspect_node_inputs(workflow.get_node(node_name))
for node_name in workflow.list_node_names()}
node_names = self.workflow.list_node_names()
node = self.workflow.get_node
inspect = lambda name: self._inspect_node_inputs(node(name))
return {name: inspect(name) for name in node_names}

def _inspect_node_inputs(self, node):
"""
Expand All @@ -318,26 +322,27 @@ def _inspect_node_inputs(self, node):

return param_dict

def _configure_plugin(self, workflow):
def _configure_plugin(self):
"""
Sets the *execution* and plug-in parameters for the given workflow.
See the ``conf`` directory files for examples.
:param workflow: the workflow to run
:return: the workflow execution arguments
"""
# The execution setting.
if 'Execution' in self.configuration:
workflow.config['execution'] = self.configuration['Execution']
self.logger.debug("Workflow %s execution parameters: %s." %
(workflow.name, workflow.config['execution']))
self.workflow.config['execution'] = self.configuration['Execution']
self.logger.debug(
"Workflow %s execution parameters: %s." %
(self.workflow.name, self.workflow.config['execution'])
)

# The Nipype plug-in parameters.
if self.plug_in and self.plug_in in self.configuration:
plug_in_opts = self.configuration[self.plug_in]
opts = dict(plugin=self.plug_in, **plug_in_opts)
self.logger.debug("Workflow %s %s plug-in parameters: %s." %
(workflow.name, self.plug_in, opts))
(self.workflow.name, self.plug_in, opts))
else:
opts = {}

Expand Down

0 comments on commit 1da9680

Please sign in to comment.