Skip to content

Commit

Permalink
Merge pull request #4 from lsst/tickets/DM-7264
Browse files Browse the repository at this point in the history
DM-7264: Port to Python 3
  • Loading branch information
fred3m committed Sep 14, 2016
2 parents 2290eda + 1839123 commit f0833ce
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 74 deletions.
37 changes: 23 additions & 14 deletions examples/demoPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,27 @@
# I suggest running with n=1, 3, NUM nodes to cover all the operational modes.
#

NUM = 10 # Number of items in data list
from __future__ import print_function
from builtins import map
from builtins import range
NUM = 10 # Number of items in data list

import math


def test1(cache, data, *args, **kwargs):
result = math.sqrt(data)
print "Store: %s" % ("present" if hasattr(cache, "p") else "absent")
print("Store: %s" % ("present" if hasattr(cache, "p") else "absent"))
cache.result = result
cache.args = args
cache.kwargs = kwargs
return result


def test2(cache, data, *args, **kwargs):
result = math.sqrt(data)
print "%d: %f vs %f ; %s vs %s ; %s vs %s ; %s" % (cache.comm.rank, cache.result, result, cache.args,
args, cache.kwargs, kwargs, hasattr(cache, "p"))
print("%d: %f vs %f ; %s vs %s ; %s vs %s ; %s" % (cache.comm.rank, cache.result, result, cache.args,
args, cache.kwargs, kwargs, hasattr(cache, "p")))
return None

from lsst.ctrl.pool.pool import startPool, Pool, Debugger, Comm
Expand All @@ -31,26 +37,30 @@ def test2(cache, data, *args, **kwargs):

startPool()

dataList = map(float, range(NUM))
dataList = [float(i) for i in range(NUM)]


def context1(pool1):
pool1.storeSet(p=1)
print "Calculating [sqrt(x) for x in %s]" % dataList
print "And checking for 'p' in our pool"
print pool1.map(test1, dataList, "foo", foo="bar")
print("Calculating [sqrt(x) for x in %s]" % dataList)
print("And checking for 'p' in our pool")
print(pool1.map(test1, dataList, "foo", foo="bar"))

# Now let's say we're somewhere else and forgot to hold onto pool1


def context2(pool2):
# New context: should have no 'p'
fruit = ["tomato", "tomahtoe"]
veges = {"potato": "potahtoe"}
print pool2.mapNoBalance(test1, dataList, *fruit, **veges)
print pool2.mapToPrevious(test2, dataList, *fruit, **veges)
print(pool2.mapNoBalance(test1, dataList, *fruit, **veges))
print(pool2.mapToPrevious(test2, dataList, *fruit, **veges))


def context3(pool3):
# Check cache/store functionality
pool3.storeSet(p=1, q=2)
print pool1.map(test1, dataList, "foo", foo="bar")
print(pool1.map(test1, dataList, "foo", foo="bar"))
pool3.storeDel("p")
pool3.storeList()
pool1.cacheList()
Expand All @@ -65,8 +75,7 @@ def context3(pool3):
pool3 = Pool(3)
context3(pool3)

Pool().exit() # This is important, to bring everything down nicely; or the wheels will just keep turning
Pool().exit() # This is important, to bring everything down nicely; or the wheels will just keep turning
# Can do stuff here, just not use any MPI because the slaves have exited.
# If you want the slaves, then pass "killSlaves=False" to startPool(); they'll emerge after startPool().
print "Done."

print("Done.")
3 changes: 2 additions & 1 deletion python/lsst/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
import pkgutil, lsstimport
import pkgutil
import lsstimport
__path__ = pkgutil.extend_path(__path__, __name__)
3 changes: 2 additions & 1 deletion python/lsst/ctrl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
import pkgutil, lsstimport
import pkgutil
import lsstimport
__path__ = pkgutil.extend_path(__path__, __name__)
11 changes: 7 additions & 4 deletions python/lsst/ctrl/pool/log.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from future import standard_library
standard_library.install_aliases()
import os
import copy_reg
import copyreg

import lsst.pex.logging as pexLog


def pickleLog(log):
"""Pickle a log
Expand All @@ -12,13 +15,13 @@ def pickleLog(log):
"""
return pexLog.getDefaultLog, tuple()

copy_reg.pickle(pexLog.Log, pickleLog)
copy_reg.pickle(pexLog.ScreenLog, pickleLog)
copyreg.pickle(pexLog.Log, pickleLog)
copyreg.pickle(pexLog.ScreenLog, pickleLog)


def jobLog(job):
"""Add a job-specific log destination"""
if job is None or job == "None":
return
machine = os.uname()[1].split(".")[0]
pexLog.getDefaultLog().addDestination(job + ".%s.%d" % (machine, os.getpid()))

44 changes: 27 additions & 17 deletions python/lsst/ctrl/pool/parallel.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python

from __future__ import print_function
from builtins import object

import re
import os
Expand All @@ -17,13 +18,15 @@
from . import log # register pickle functions for pex_logging

__all__ = ["Batch", "PbsBatch", "SlurmBatch", "SmpBatch", "BATCH_TYPES", "BatchArgumentParser",
"BatchCmdLineTask", "BatchPoolTask",]
"BatchCmdLineTask", "BatchPoolTask", ]

UMASK = "002" # umask to set
UMASK = "002" # umask to set

# Functions to convert a list of arguments to a quoted shell command, provided by Dave Abrahams
# http://stackoverflow.com/questions/967443/python-module-to-shellquote-unshellquote
_quote_pos = re.compile('(?=[^-0-9a-zA-Z_./\n])')


def shQuote(arg):
r"""Quote the argument for the shell.
Expand All @@ -34,14 +37,16 @@ def shQuote(arg):
"""
# This is the logic emacs uses
if arg:
return _quote_pos.sub('\\\\', arg).replace('\n',"'\n'")
return _quote_pos.sub('\\\\', arg).replace('\n', "'\n'")
else:
return "''"


def shCommandFromArgs(args):
"""Convert a list of shell arguments to a shell command-line"""
return ' '.join([shQuote(a) for a in args])


def processStats():
"""Collect Linux-specific process statistics
Expand All @@ -55,11 +60,13 @@ def processStats():
result[key] = value.strip()
return result


def printProcessStats():
"""Print the process statistics to the log"""
from lsst.pex.logging import getDefaultLog
getDefaultLog().info("Process stats for %s: %s" % (NODE, processStats()))


class Batch(object):
"""Base class for batch submission"""

Expand Down Expand Up @@ -171,6 +178,7 @@ def run(self, command, walltime=None):

class PbsBatch(Batch):
"""Batch submission with PBS"""

def preamble(self, walltime=None):
if walltime is None:
walltime = self.walltime
Expand All @@ -181,19 +189,20 @@ def preamble(self, walltime=None):
if self.numCores > 0:
raise RuntimeError("PBS does not support setting the number of cores")
return "\n".join([
"#PBS %s" % self.options if self.options is not None else "",
"#PBS -l nodes=%d:ppn=%d" % (self.numNodes, self.numProcsPerNode),
"#PBS -l walltime=%d" % walltime if walltime is not None else "",
"#PBS -o %s" % self.outputDir if self.outputDir is not None else "",
"#PBS -N %s" % self.jobName if self.jobName is not None else "",
"#PBS -q %s" % self.queue if self.queue is not None else "",
"#PBS -j oe",
"#PBS -W umask=%s" % UMASK,
])
"#PBS %s" % self.options if self.options is not None else "",
"#PBS -l nodes=%d:ppn=%d" % (self.numNodes, self.numProcsPerNode),
"#PBS -l walltime=%d" % walltime if walltime is not None else "",
"#PBS -o %s" % self.outputDir if self.outputDir is not None else "",
"#PBS -N %s" % self.jobName if self.jobName is not None else "",
"#PBS -q %s" % self.queue if self.queue is not None else "",
"#PBS -j oe",
"#PBS -W umask=%s" % UMASK,
])

def submitCommand(self, scriptName):
return "qsub %s -V %s" % (self.submit if self.submit is not None else "", scriptName)


class SlurmBatch(Batch):
"""Batch submission with Slurm"""

Expand All @@ -211,7 +220,7 @@ def preamble(self, walltime=None):
filename = os.path.join(outputDir, (self.jobName if self.jobName is not None else "slurm") + ".o%j")
return "\n".join([("#SBATCH --nodes=%d" % self.numNodes) if self.numNodes > 0 else "",
("#SBATCH --ntasks-per-node=%d" % self.numProcsPerNode) if
self.numProcsPerNode > 0 else "",
self.numProcsPerNode > 0 else "",
("#SBATCH --ntasks=%d" % self.numCores) if self.numCores > 0 else "",
"#SBATCH --time=%d" % max(walltime/60.0 + 0.5, 1) if walltime is not None else "",
"#SBATCH --job-name=%s" % self.jobName if self.jobName is not None else "",
Expand Down Expand Up @@ -256,7 +265,7 @@ def submitCommand(self, scriptName):
BATCH_TYPES = {'pbs': PbsBatch,
'slurm': SlurmBatch,
'smp': SmpBatch,
} # Mapping batch type --> Batch class
} # Mapping batch type --> Batch class


class BatchArgumentParser(argparse.ArgumentParser):
Expand All @@ -281,7 +290,7 @@ def __init__(self, parent=None, *args, **kwargs):
group.add_argument("--cores", type=int, default=0, help="Number of cores (Slurm/SMP only)")
group.add_argument("--time", type=float, default=1000,
help="Expected execution time per element (sec)")
group.add_argument("--batch-type", dest="batchType", choices=BATCH_TYPES.keys(), default="smp",
group.add_argument("--batch-type", dest="batchType", choices=list(BATCH_TYPES.keys()), default="smp",
help="Batch system to use")
group.add_argument("--batch-output", dest="batchOutput", help="Output directory")
group.add_argument("--batch-submit", dest="batchSubmit", help="Batch submission command-line flags")
Expand Down Expand Up @@ -326,7 +335,7 @@ def makeBatch(self, args):
'options': 'batchOptions',
}
# kwargs is a dict that maps Batch init kwarg names to parsed arguments attribute *values*
kwargs = {k: getattr(args, v) for k, v in argMapping.iteritems()}
kwargs = {k: getattr(args, v) for k, v in argMapping.items()}
return BATCH_TYPES[args.batchType](**kwargs)

def format_help(self):
Expand Down Expand Up @@ -392,7 +401,7 @@ def parseAndSubmit(cls, args=None, **kwargs):
batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.applyOverrides,
**kwargs)

if not cls.RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent): # Write config, schema
if not cls.RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent): # Write config, schema
taskParser.error("Error in task preparation")

numCores = batchArgs.cores if batchArgs.cores > 0 else batchArgs.nodes*batchArgs.procs
Expand Down Expand Up @@ -482,6 +491,7 @@ def parseAndRun(cls, *args, **kwargs):

class BatchTaskRunner(TaskRunner):
"""Run a Task individually on a list of inputs using the MPI process pool"""

def __init__(self, *args, **kwargs):
"""Constructor
Expand Down

0 comments on commit f0833ce

Please sign in to comment.