Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add BatchParallelTask to provide simple iteration #2

Merged
merged 1 commit into from
Apr 30, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
93 changes: 91 additions & 2 deletions python/lsst/ctrl/pool/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import argparse
import traceback
import contextlib
from lsst.pipe.base import CmdLineTask
from .pool import startPool, NODE, abortOnError
from lsst.pipe.base import CmdLineTask, TaskRunner
from .pool import startPool, Pool, NODE, abortOnError
from . import log # register pickle functions for pex_logging

__all__ = ["Batch", "PbsBatch", "SlurmBatch", "SmpBatch", "BATCH_TYPES", "BatchArgumentParser",
Expand Down Expand Up @@ -465,6 +465,10 @@ def logOperation(self, operation, catch=False, trace=True):


class BatchPoolTask(BatchCmdLineTask):
"""Starts a BatchCmdLineTask with an MPI process pool

Use this subclass of BatchCmdLineTask if you want to use the Pool directly.
"""
@classmethod
@abortOnError
def parseAndRun(cls, *args, **kwargs):
Expand All @@ -473,3 +477,88 @@ def parseAndRun(cls, *args, **kwargs):
super(BatchPoolTask, cls).parseAndRun(*args, **kwargs)
pool.exit()


class BatchTaskRunner(TaskRunner):
"""Run a Task individually on a list of inputs using the MPI process pool"""
def __init__(self, *args, **kwargs):
"""Constructor

Warn if the user specified multiprocessing.
"""
TaskRunner.__init__(self, *args, **kwargs)
if self.numProcesses > 1:
self.log.warn("Multiprocessing arguments (-j %d) ignored since using batch processing" %
self.numProcesses)
self.numProcesses = 1

def run(self, parsedCmd):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All methods need parameter documentation

"""Run the task on all targets

Sole input is the result of parsing the command-line with the ArgumentParser.

Output is None if 'precall' failed; otherwise it is a list of calling ourself
on each element of the target list from the 'getTargetList' method.
"""
resultList = None
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be either None or [] on return when no data is run, depending on what happens later. Shouldn't it be consistent, or do the two have different semantic meanings?


import multiprocessing
self.prepareForMultiProcessing()
pool = Pool()

if self.precall(parsedCmd):
targetList = self.getTargetList(parsedCmd)
if len(targetList) > 0:
parsedCmd.log.info("Processing %d targets with a pool of %d processes..." %
(len(targetList), pool.size))
# Run the task using self.__call__
resultList = pool.map(self, targetList)
else:
log.warn("Not running the task because there is no data to process; "
"you may preview data using \"--show data\"")
resultList = []

return resultList

@abortOnError
def __call__(self, cache, args):
"""Run the Task on a single target

Strips out the process pool 'cache' argument.

'args' are those arguments provided by the getTargetList method.

Brings down the entire job if an exception is not caught (i.e., --doraise).
"""
return TaskRunner.__call__(self, args)


class BatchParallelTask(BatchCmdLineTask):
"""Runs the BatchCmdLineTask in parallel

Use this subclass of BatchCmdLineTask if you don't need to use the Pool
directly, but just want to iterate over many objects (like a multi-node
version of the '-j' command-line argument).
"""
RunnerClass = BatchTaskRunner

@classmethod
def _makeArgumentParser(cls, *args, **kwargs):
"""Build an ArgumentParser

Removes the batch-specific parts in order to delegate to the parent classes.
"""
kwargs.pop("doBatch", False)
kwargs.pop("add_help", False)
return super(BatchCmdLineTask, cls)._makeArgumentParser(*args, **kwargs)

@classmethod
def parseAndRun(cls, *args, **kwargs):
"""Parse an argument list and run the command

This is the entry point when we run in earnest, so start the process pool
so that the worker nodes don't go any further.
"""
pool = startPool()
results = super(BatchParallelTask, cls).parseAndRun(*args, **kwargs)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the coding style guideline suggests not using super, but since all the subclasses use it maybe it's o.k. in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried very hard not to use it! But I found that I must use it if I want to call the base class implementation of one method that calls another method I've implemented in my subclass. Consider the following simplified case:

class Foo(object):
    @staticmethod
    def name():
        print "Foo!"
    @classmethod
    def run(cls):
        cls.name()

class Bar(Foo):
    @staticmethod
    def name():
        print "Bar!"
    @classmethod
    def run(cls):
        Foo.run()

I have a base class (Foo) and a subclass (Bar). I want to call Bar.run() which defers the implementation to the base class (and presumably does something different with the result, but that's left out of this simplified case), but that implementation calls the name method on my object. If I implement Bar.run as I have above, without using super and calling Foo.run, I get:

>>> Bar.run()
Foo!

Foo.run gets cls = Foo, and so calls Foo.name(). If I want Foo.run to call Bar.name, then I have to use super so I get an object that is still a Bar while I'm calling Foo.run. With this implementation:

class Bar(Foo):
    @staticmethod
    def name():
        print "Bar!"
    @classmethod
    def run(cls):
        super(Bar, cls).run()

then I get the desired result:

>>> Bar.run()
Bar!

In my case, I want BatchParallelTask.parseAndRun to call the base class BatchCmdLineTask.parseAndRun(), but that calls the _makeArgumentParser method and I want the implementation in the subclass to fire, not the one in the base class. So, as far as I can tell, I absolutely must use super.

pool.exit()
return results