Skip to content

Commit

Permalink
Switch _executePipelineTask to keyword-only args
Browse files Browse the repository at this point in the history
  • Loading branch information
andy-slac committed Sep 18, 2019
1 parent 11362ae commit feda286
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ def _executeQuantaInProcess(self, iterable, butler, taskFactory):
for qdata in iterable:
_LOG.debug("Executing %s", qdata)
taskDef = qdata.taskDef
self._executePipelineTask(taskDef.taskClass, taskDef.config, qdata.quantum,
butler, taskFactory, self.skipExisting, self.clobberOutput)
self._executePipelineTask(taskClass=taskDef.taskClass, config=taskDef.config,
quantum=qdata.quantum, butler=butler,
taskFactory=taskFactory, skipExisting=self.skipExisting,
clobberOutput=self.clobberOutput)

def _executeQuantaMP(self, iterable, butler, taskFactory):
"""Execute all Quanta in separate process pool.
Expand Down Expand Up @@ -133,9 +135,10 @@ def _executeQuantaMP(self, iterable, butler, taskFactory):

# Add it to the pool and remember its result
_LOG.debug("Sumbitting %s", qdata)
args = (taskDef.taskClass, taskDef.config, qdata.quantum, butler, taskFactory,
self.skipExisting, self.clobberOutput)
results[qdata.index] = pool.apply_async(self._executePipelineTask, args)
kwargs = dict(taskClass=taskDef.taskClass, config=taskDef.config,
quantum=qdata.quantum, butler=butler, taskFactory=taskFactory,
skipExisting=self.skipExisting, clobberOutput=self.clobberOutput)
results[qdata.index] = pool.apply_async(self._executePipelineTask, (), kwargs)

# Everything is submitted, wait until it's complete
_LOG.debug("Wait for all tasks")
Expand All @@ -147,7 +150,7 @@ def _executeQuantaMP(self, iterable, butler, taskFactory):
res.get(self.timeout)

@staticmethod
def _executePipelineTask(taskClass, config, quantum, butler, taskFactory, skipExisting, clobberOutput):
def _executePipelineTask(*, taskClass, config, quantum, butler, taskFactory, skipExisting, clobberOutput):
"""Execute PipelineTask on a single data item.
Parameters
Expand Down

0 comments on commit feda286

Please sign in to comment.