Skip to content

Commit

Permalink
better handling of asynchronous jobs, openlava now cleans up its own …
Browse files Browse the repository at this point in the history
…jobs on error
  • Loading branch information
mfiers committed Nov 9, 2012
1 parent 1650a31 commit c3ca7a0
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 29 deletions.
16 changes: 16 additions & 0 deletions moa/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@

l = moa.logger.getLogger(__name__)

def async(f):
"""
decorator designating an actor to be asynchronous
"""
f.category = 'async'
return f

def sync(f):
"""
decorator designating an actor to be synchronous
"""
f.category = 'sync'
return f

def getRunner():
actorId = getattr(sysConf.args, 'actorId', 'default')
Expand All @@ -32,7 +45,10 @@ def getRunner():
l.debug("Actor: %s" % actorId)
return sysConf.actor.actors[actorId]

#should use this
getActor = getRunner

@sync
def simpleRunner(wd, cl, conf={}, **kwargs):
"""
Don't think - just run - here & now
Expand Down
1 change: 1 addition & 0 deletions moa/backend/ruff/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def ruffusExecutor(input, output, script, jobData):
os.putenv(k, str(v))

runner = moa.actor.getRunner()
#print runner.category
rc = runner(jobData['wd'], [tf.name], jobData, command=jobData['command'])
if rc != 0:
raise ruffus.JobSignalledBreak
Expand Down
2 changes: 1 addition & 1 deletion moa/data/etc/config
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ plugins:
enable: true
order: 50
openLavaActor:
module: moa.plugin.job.openLavaActor
enabled: true
module: moa.plugin.job.openLavaActor
system:
moaGit:
module: moa.plugin.system.moaGit
Expand Down
17 changes: 16 additions & 1 deletion moa/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""

import os
import sys
import glob
import tempfile

Expand All @@ -22,6 +23,7 @@

import moa.ui
import moa.args
import moa.actor
import moa.utils
import moa.logger
import moa.plugin
Expand Down Expand Up @@ -257,6 +259,7 @@ def execute(self, job, args, **kwargs):
Execute `command` in the context of this job. Execution is
alwasy deferred to the backend
#Note: this is the function that will be called from argparse
#Note: Uncertain how to test verbose & silent
:param verbose: output lots of data
Expand All @@ -275,6 +278,9 @@ def execute(self, job, args, **kwargs):
if not self.backend:
moa.ui.exitError("No backend loaded - cannot execute %s" % command)

actor = moa.actor.getActor()
moa.ui.message("loaded %s actor %s" % (actor.category, actor.__name__))

#figure out what we were really after
command = args.command

Expand All @@ -283,12 +289,13 @@ def execute(self, job, args, **kwargs):
#prepare for execution - i.e. prepare log dir, etc..
self.prepareExecute()

#unless command == 'run' - just execute it and return the RC
# unless command == 'run' - just execute it and return the RC
if command != 'run':
l.debug("Simple type execute of '%s'" % command)
rc = self.backend.execute(self, command, args)
sysConf.rc = rc
if rc != 0:
self.pluginHandler.run("post_error", job=self)
sysConf.pluginHandler.run("post_error")
moa.ui.exitError("Error running")
return rc
Expand All @@ -314,8 +321,16 @@ def execute(self, job, args, **kwargs):

self.finishExecute()

#if async - exit here
if actor.category == 'async':
self.pluginHandler.run("async_exit", job=self)
sysConf.pluginHandler.run("async_exit")
moa.ui.message("async run started - quiting now")
sys.exit(0)

return sysConf.rc


def prepareExecute(self):
"""
Give this job a chance to prepare for execution.
Expand Down
127 changes: 100 additions & 27 deletions moa/plugin/system/openLavaActor.py → moa/plugin/job/openLavaActor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,28 @@ def hook_defineCommandOptions(job, parser):
action='store_true',
help='Do not execute - just create a script to run')

parser.add_argument('--olm', default=1, dest='openlavaHost',
parser.add_argument('--olm', default="", dest='openlavaHost',
help='The host to use for openlava')


def _writeOlTmpFile(wd, _script):
#save the file
tmpdir = os.path.join(wd, '.moa', 'tmp')
if not os.path.exists(tmpdir):
os.makedirs(tmpdir)

tf = tempfile.NamedTemporaryFile(dir=tmpdir, prefix='openlava.',
delete=False, suffix='.sh')
if isinstance(_script, list):
tf.write("\n".join(_script))
else:
tf.write(str(_script))

tf.close()
os.chmod(tf.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
return tf.name

@moa.actor.async
def openlavaRunner(wd, cl, conf={}, **kwargs):
"""
Run the job using OPENLAVA
Expand All @@ -74,6 +92,8 @@ def openlavaRunner(wd, cl, conf={}, **kwargs):
outDir = outDir.rsplit('.moa', 1)[0] + '.moa' + \
os.path.realpath(outDir).rsplit('.moa', 1)[1]

sysConf.job.data.openlava.outDir = outDir

if not os.path.exists(outDir):
try:
os.makedirs(outDir)
Expand All @@ -84,6 +104,9 @@ def openlavaRunner(wd, cl, conf={}, **kwargs):
outfile = os.path.join(outDir, 'stdout')
errfile = os.path.join(outDir, 'stderr')

sysConf.job.data.openlava.outfile = outfile
sysConf.job.data.openlava.errfile = errfile

bsub_cl = ['bsub']

sc = []
Expand All @@ -96,7 +119,7 @@ def s(*cl):
s("#BSUB -e %s" % errfile)
s("#BSUB -q %s" % sysConf.args.openlavaQueue)

if '--oln' in sys.argv:
if '--olC' in sys.argv:
cores = sysConf.args.openlavaCores
else:
cores = sysConf.job.conf.get('threads', sysConf.args.openlavaCores)
Expand Down Expand Up @@ -126,8 +149,8 @@ def s(*cl):
#bsub_cl.extend(["-w", "'done(%d)'" % j])

#give it a reasonable name
jobname = ("moa %s in %s" % (command, wd)).replace("'", '"')
#bsub_cl.extend(["-J", jobname])
jobname = ("%s_%s" % (wd.split('/')[-1], command[0]))
bsub_cl.extend(['-J', jobname])
s("#BSUB -J '%s'" % jobname)

#dump the configuration in the environment
Expand Down Expand Up @@ -185,20 +208,10 @@ def s(*cl):
outFile.rsplit('/', 1)[1]))
return 0

#save the file
tmpdir = os.path.join(wd, '.moa', 'tmp')
if not os.path.exists(tmpdir):
os.makedirs(tmpdir)

tmpfile = tempfile.NamedTemporaryFile(dir=tmpdir, prefix='openlava.',
delete=False, suffix='.sh')

tmpfile.write("\n".join(sc))
tmpfile.close()
os.chmod(tmpfile.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)

l.debug("executing bsub")

tmpfile = _writeOlTmpFile(wd, sc)

moa.ui.message("Running %s:" % " ".join(map(str, bsub_cl)))
moa.ui.message("(copy of) the bsub script: %s" % tmpfile)
p = sp.Popen(map(str, bsub_cl), cwd=wd, stdout=sp.PIPE, stdin=sp.PIPE)
o, e = p.communicate("\n".join(sc))

Expand All @@ -209,7 +222,7 @@ def s(*cl):
if not sysConf.job.data.openlava.jids.get(command):
sysConf.job.data.openlava.jids[command] = []

moa.ui.message("submitted job with openlava job id %s " % jid)
#moa.ui.message("submitted job with openlava job id %s " % jid)

#store the job id submitted
if not sysConf.job.data.openlava.jids.get(command):
Expand All @@ -221,25 +234,85 @@ def s(*cl):
l.debug("jids stored %s" % str(sysConf.job.data.openlava.jids))
return p.returncode

OnSuccessScript = """
#BSUB -w '({%- for j in jids -%}
OnSuccessScript = """#!/bin/bash
#BSUB -o {{ job.data.openlava.outfile }}
#BSUB -w {{ job.data.openlava.errfile }}
#BSUB -q {{ args.openlavaQueue }}
#BSUB -J "{{ job.data.openlava.uid }}_Ok"
{% if args.openlavaHost -%}
#BSUB -m {{ args.openlavaHost }}
{%- endif %}
#BSUB -w '({%- for j in job.data.openlava.alljids -%}
{%- if loop.index0 > 0 %}&&{% endif -%}
done({{j}})
{%- endfor -%})'
cd {{ job.wd }}
echo "Openlava OnSuccess Start"
echo "Killing the OnError job"
bkill -J "{{ job.data.openlava.uid }}_Err"
moasetstatus success
"""

OnErrorScript = """

OnErrorScript = """#!/bin/bash
## only run this job if there is a single job
#BSUB -o {{ job.data.openlava.outfile }}
#BSUB -w {{ job.data.openlava.errfile }}
#BSUB -q {{ args.openlavaQueue }}
#BSUB -J "{{ job.data.openlava.uid }}_Err"
{% if args.openlavaHost -%}
#BSUB -m {{ args.openlavaHost }}
{%- endif %}
#BSUB -w '({%- for j in job.data.openlava.alljids -%}
{%- if loop.index0 > 0 %}||{% endif -%}
exit({{j}},!=0)
{%- endfor -%}
)'
cd {{ job.wd }}
echo "Openlava OnError Start"
echo "Killing the all other jobs"
#killing all jobs
{% for j in job.data.openlava.alljids %}
bkill -s 9 {{ j }}
{% endfor %}
bkill -J "{{ job.data.openlava.uid }}_Ok"
moasetstatus error
"""

def hook_postRun():
def hook_async_exit(job):
"""
Need to exit here, and reconvene once all jobs have executed
"""
if sysConf.job.data.openlava.get('alljids'):
with open('jidlist', 'w') as F:
F.write("\n".join(
map(str, sysConf.job.data.openlava.get('alljids'))))

#make sure that this is the correct actor
actor = moa.actor.getActor()
if actor.__name__ != 'openlavaRunner':
return

jidlist = sysConf.job.data.openlava.get('alljids', [])
if len(jidlist) == 0:
return


uid = "%s.%s" % (job.wd.split('/')[-1],max(jidlist))
sysConf.job.data.openlava.uid = uid
onsuccess = jinja2.Template(OnSuccessScript).render(sysConf)
onerror = jinja2.Template(OnErrorScript).render(sysConf)

with open('succ', 'w') as F:
F.write(onsuccess)
with open('onerr', 'w') as F:
F.write(onerror)
P = sp.Popen('bsub', stdin=sp.PIPE)
P.communicate(onsuccess)
P = sp.Popen('bsub', stdin=sp.PIPE)
P.communicate(onerror)




#register this actor globally
Expand Down
7 changes: 7 additions & 0 deletions moa/plugin/system/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* waiting - not yet executed
* running - is currently being executed
* running_async - is currently being executed ansynchronously
* success - finished succesfully
* error - finished with an error
* interrupted - manual interruption
Expand Down Expand Up @@ -135,6 +136,7 @@ def _setStatus(job, status):
with open(statusFile, 'w') as F:
F.write("%s" % status)


def _setPid(job, pid):
l.debug("write PID file (%s)" % pid)
pidFile = os.path.join(job.wd, '.moa', 'pid')
Expand Down Expand Up @@ -202,11 +204,16 @@ def hook_postInterrupt():
_setStatus(sysConf.job, 'interrupted')
_removePid(sysConf.job)


def hook_postError():
if sysConf.job.isMoa():
_setStatus(sysConf.job, 'error')
_removePid(sysConf.job)

def hook_async_exit():
if sysConf.job.isMoa():
_setStatus(sysConf.job, 'running_async')

@moa.args.needsJob
@moa.args.command
def pause(job, args):
Expand Down

0 comments on commit c3ca7a0

Please sign in to comment.