Skip to content

Commit

Permalink
better openlava integration
Browse files Browse the repository at this point in the history
  • Loading branch information
mfiers committed May 30, 2012
1 parent 3bdfd69 commit 754785c
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 27 deletions.
22 changes: 14 additions & 8 deletions lib/python/moa/backend/ruff/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import moa.utils
import moa.template
import moa.actor
import moa.backend
import moa.logger as l
from moa.sysConf import sysConf
Expand Down Expand Up @@ -81,11 +80,18 @@ def writeScript(self):
Render & write the script to a tempfile
"""

tf = tempfile.NamedTemporaryFile(
delete = False, prefix='moa', mode='w')
self.scriptFile = tf.name

#save the file
tmpdir = os.path.join(self.job.wd, '.moa', 'tmp')
if not os.path.exists(tmpdir):
os.makedirs(tmpdir)

tmpfile = tempfile.NamedTemporaryFile(dir=tmpdir, prefix='moa.',
delete=False, suffix='.sh')
#tmpfile.write("\n".join(sc))
self.scriptFile = os.path.realpath(os.path.abspath(tmpfile.name))

script = self.commands.render(self.command, self.jobData)
tf.write(script + "\n")
tf.close()
os.chmod(tf.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
tmpfile.write(script + "\n\n")
tmpfile.close()
os.chmod(tmpfile.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)

8 changes: 8 additions & 0 deletions lib/python/moa/backend/ruff/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@
from moa.backend.ruff.base import RuffBaseJob

def localMapExecutor(input, output, script, jobData):

wd = jobData['wd']
tmpdir = os.path.realpath(os.path.abspath(
os.path.join(wd, '.moa', 'tmp')))
if not os.path.exists(tmpdir):
os.makedirs(tmpdir)

tf = tempfile.NamedTemporaryFile( delete = False,
dir=tmpdir,
prefix='moa',
mode='w')

Expand Down
10 changes: 8 additions & 2 deletions lib/python/moa/backend/ruff/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@
from moa.backend.ruff.base import RuffBaseJob

def localMapExecutor(input, output, script, jobData):
wd = jobData['wd']
tmpdir = os.path.realpath(os.path.abspath(
os.path.join(wd, '.moa', 'tmp')))
if not os.path.exists(tmpdir):
os.makedirs(tmpdir)

tf = tempfile.NamedTemporaryFile( delete = False,
dir=tmpdir,
prefix='moa',
mode='w')

mode='w')
tf.write(script)
tf.close()
os.chmod(tf.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
Expand Down
46 changes: 29 additions & 17 deletions lib/python/moa/plugin/system/openLavaActor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
import os
import sys
import stat
import time
import tempfile
import optparse
Expand Down Expand Up @@ -53,7 +54,7 @@ def openlavaRunner(wd, cl, conf={}, **kwargs):

l.debug("starting openlava actor for %s" % command)

outDir = os.path.join(wd, '.moa', 'log.latest')
outDir = os.path.realpath(os.path.abspath(os.path.join(wd, '.moa', 'log.latest')))
if not os.path.exists(outDir):
try:
os.makedirs(outDir)
Expand All @@ -63,24 +64,26 @@ def openlavaRunner(wd, cl, conf={}, **kwargs):
#expect the cl to be nothing more than a single script to execute
outfile = os.path.join(outDir, 'stdout')
errfile = os.path.join(outDir, 'stderr')

bsub_cl = ['bsub']

sc = []
def s(*cl):
sc.append(" ".join(map(str, cl)))

s("#!/bin/bash")
s("cd", wd)
s("#BSUB -o", outfile)
s("#BSUB -e", errfile)
s("#BSUB -q", sysConf.args.openlavaQueue)
bsub_cl.extend(["-o", outfile])
bsub_cl.extend(["-e", errfile])
bsub_cl.extend(["-q", sysConf.args.openlavaQueue])


if '--oln' in sys.argv:
slots = sysConf.args.openlavaSlots
else:
slots = sysConf.job.conf.get('threads', sysConf.args.openlavaSlots)

s("#BSUB -n", slots)
bsub_cl.extend(["-n", slots])

lastJids = []

Expand All @@ -92,19 +95,18 @@ def s(*cl):
#hold until the 'prepare' jobs are done
#l.critical("Prepare jids - wait for these! %s" % prep_jids)
for j in prep_jids:
s("#BSUB -w done(%d)" % j)
# qcl.append("-w '")
# qcl.append('&&'.join(['exit(%s)' % x for x in prep_jids])

bsub_cl.extend(["-w", "'done(%d)'" % j])
elif command == 'finish':
run_jids = sysConf.job.data.openlava.jids.get('run', [])
#hold until the 'prepare' jobs are done
for j in run_jids:
s("#BSUB -w done(%d)" % j)
bsub_cl.extend(["-w", "'done(%d)'" % j])

#give it a reasonable name
jobname = "%s/%s" % (str(conf['runid']), command)
if conf.get('runid', None):
s("#BSUB -J %s/%s" % (str(conf['runid']), command))
bsub_cl.extend(["-J", jobname])
#s("#BSUB-J %s" % jobname)

# make sure the environment is copied
#qcl.append('-V')
Expand All @@ -124,6 +126,11 @@ def s(*cl):
else:
outk = 'moa_' + k
v = conf[k]

#this should not happen:
if ' ' in outk:
continue

if isinstance(v, list):
s("%s='%s'" % (outk, " ".join(v)))
elif isinstance(v, dict):
Expand All @@ -134,23 +141,28 @@ def s(*cl):
s("")
s("## Run the command")
s("")
s(*(['/bin/bash'] + cl))

s(*cl)

#save the file
tmpdir = os.path.join(wd, '.moa', 'tmp')
tmpdir = os.path.join(wd, '.moa', 'tmp')
if not os.path.exists(tmpdir):
os.makedirs(tmpdir)

tmpfile = tempfile.NamedTemporaryFile(dir=tmpdir, prefix='openlava.',
delete=False, suffix='.sh')
tmpfile.write("\n".join(sc))
tmpfilename = tmpfile.name
tmpfilename = os.path.realpath(os.path.abspath(tmpfile.name))
tmpfile.close()

os.chmod(tmpfile.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)

l.debug("executing bsub")
moa.ui.message("Submitting job to openlava")
p = sp.Popen(['bsub', tmpfilename], cwd = wd, stdout=sp.PIPE, stdin=sp.PIPE)
o,e = p.communicate("\n".join(sc))
bsub_cl.append(tmpfilename)
moa.ui.message("Executing")
moa.ui.message(" ".join(map(str, bsub_cl)))
p = sp.Popen(map(str, bsub_cl), cwd = wd, stdout=sp.PIPE)
o,e = p.communicate()

jid = int(o.split("<")[1].split(">")[0])

Expand Down

0 comments on commit 754785c

Please sign in to comment.