Skip to content

Commit

Permalink
Merge branch 'moa.0.11' of ssh.github.com:mfiers/Moa into moa.0.11
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Fiers committed Nov 9, 2012
2 parents 6cbef38 + c3ca7a0 commit cfa2e8b
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 96 deletions.
19 changes: 19 additions & 0 deletions moa/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@

l = moa.logger.getLogger(__name__)

def async(f):
"""
decorator designating an actor to be asynchronous
"""
f.category = 'async'
return f

def sync(f):
"""
decorator designating an actor to be synchronous
"""
f.category = 'sync'
return f

def getRunner():
actorId = getattr(sysConf.args, 'actorId', 'default')
Expand All @@ -32,7 +45,10 @@ def getRunner():
l.debug("Actor: %s" % actorId)
return sysConf.actor.actors[actorId]

#should use this
getActor = getRunner

@sync
def simpleRunner(wd, cl, conf={}, **kwargs):
"""
Don't think - just run - here & now
Expand All @@ -44,6 +60,9 @@ def simpleRunner(wd, cl, conf={}, **kwargs):
- return the rc
"""

#declare this as an synchronous runner
simpleRunner.category = 'sync'

outDir = os.path.join(wd, '.moa', 'log.latest')
if not os.path.exists(outDir):
try:
Expand Down
1 change: 1 addition & 0 deletions moa/backend/ruff/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def ruffusExecutor(input, output, script, jobData):
os.putenv(k, str(v))

runner = moa.actor.getRunner()
#print runner.category
rc = runner(jobData['wd'], [tf.name], jobData, command=jobData['command'])
if rc != 0:
raise ruffus.JobSignalledBreak
Expand Down
71 changes: 8 additions & 63 deletions moa/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,35 +114,6 @@ def run_3(wd, exitOnError=True):
sysConf.pluginHandler.run('finish', reverse=True)


def run_recursive(wd):
"""
Run through the subdirs (depth first) and execute all moa jobs
"""
sysConf.pluginHandler.run('prepare_recursive')
for path, dirs, files in os.walk(wd):
dirs.sort() # make sure the dirs are sorted
if '.moa' in dirs:
if os.path.exists(os.path.join(path, '.moa', 'template')):
clargs = " ".join(sys.argv)
moa.ui.message("Trying to run '%s' .." % clargs)
moa.ui.message(" .. in %s" % (path))
try:
rc = run_3(path, exitOnError=False)
if rc is False:
moa.ui.message("Error running %s" % clargs)

except moa.exceptions.MoaInvalidCommandLine:
moa.ui.warn("Cannot run moa %s in %s" % (
" ".join(sys.argv[1:]), path))

# remove all '.' directories from the list to recurse

drem = [x for x in dirs if x[0] in ['.', '_']]
[dirs.remove(t) for t in drem]

sysConf.pluginHandler.run('post_recursive')


def run_2(force_silent=False):
"""
Are we going to do a recursive run?: check if -r is in the arguments...
Expand All @@ -159,45 +130,19 @@ def run_2(force_silent=False):
"""

sysConf.pluginHandler.run('prepare_recursion')
# sysConf.pluginHandler.run('prepare_recursion')

wd = moa.utils.getCwd()
l.debug("starting run_2 in %s" % wd)
if not '-r' in sys.argv:
#not recursive - go directly to stage 3
try:
run_3(wd)
except moa.exceptions.MoaInvalidCommandLine, e:
parser = e.args[0]
parser.real_error()

else:
#default mode is global
recurseMode = 'global'

# check if the command allows for recursive execution
# therefore - a quick & dirty approach to getting the
# command name - we haven't properly parsed the
# arguments yet
command = sysConf.default_command
tas = [x for x in sys.argv[1:] if not x[0] == '-']

if len(tas) > 0:
command = tas[0]

if command in sysConf.commands:
recurseMode = sysConf.commands[command].get('recursive', 'global')
else:
recurseMode = 'global'

if recurseMode == 'global':
run_recursive(wd)
elif recurseMode == 'none':
moa.ui.exitError("Recursive execution is not allowed")
else:
run_3(wd)
# never recursive - jump directly into run_3
try:
run_3(wd)
except moa.exceptions.MoaInvalidCommandLine, e:
parser = e.args[0]
parser.real_error()

sysConf.pluginHandler.run('post_recursion')
# sysConf.pluginHandler.run('post_recursion')


def run_1():
Expand Down
3 changes: 2 additions & 1 deletion moa/cli/setstatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ def setstatus():
sys.stderr.write('invalid status: %s\n' % status)
sys.exit(-1)

if status != 'run':
if status not in ['running', 'running_async']:
if os.path.exists(pidFile):
os.unlink(pidFile)

print 'SETTING STATE', status
with open(statusFile, 'w') as F:
F.write(status)

Expand Down
2 changes: 1 addition & 1 deletion moa/data/etc/config
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ plugins:
enable: true
order: 50
openLavaActor:
module: moa.plugin.system.openLavaActor
enabled: true
module: moa.plugin.job.openLavaActor
system:
moaGit:
module: moa.plugin.system.moaGit
Expand Down
23 changes: 19 additions & 4 deletions moa/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""

import os
import sys
import glob
import tempfile

Expand All @@ -22,6 +23,7 @@

import moa.ui
import moa.args
import moa.actor
import moa.utils
import moa.logger
import moa.plugin
Expand Down Expand Up @@ -257,6 +259,7 @@ def execute(self, job, args, **kwargs):
Execute `command` in the context of this job. Execution is
alwasy deferred to the backend
#Note: this is the function that will be called from argparse
#Note: Uncertain how to test verbose & silent
:param verbose: output lots of data
Expand All @@ -275,6 +278,9 @@ def execute(self, job, args, **kwargs):
if not self.backend:
moa.ui.exitError("No backend loaded - cannot execute %s" % command)

actor = moa.actor.getActor()
moa.ui.message("loaded %s actor %s" % (actor.category, actor.__name__))

#figure out what we were really after
command = args.command

Expand All @@ -283,12 +289,13 @@ def execute(self, job, args, **kwargs):
#prepare for execution - i.e. prepare log dir, etc..
self.prepareExecute()

#unless command == 'run' - just execute it and return the RC
# unless command == 'run' - just execute it and return the RC
if command != 'run':
l.debug("Simple type execute of '%s'" % command)
rc = self.backend.execute(self, command, args)
sysConf.rc = rc
if rc != 0:
self.pluginHandler.run("post_error", job=self)
sysConf.pluginHandler.run("post_error")
moa.ui.exitError("Error running")
return rc
Expand All @@ -314,8 +321,16 @@ def execute(self, job, args, **kwargs):

self.finishExecute()

#if async - exit here
if actor.category == 'async':
self.pluginHandler.run("async_exit", job=self)
sysConf.pluginHandler.run("async_exit")
moa.ui.message("async run started - quiting now")
sys.exit(0)

return sysConf.rc


def prepareExecute(self):
"""
Give this job a chance to prepare for execution.
Expand Down Expand Up @@ -406,9 +421,9 @@ def defineCommands(self, commandparser):
cp = cparser.add_parser(
str(c), help=hlp)

cp.add_argument(
"-r", "--recursive", dest="recursive", action="store_true",
default="false", help="Run this job recursively")
# cp.add_argument(
# "-r", "--recursive", dest="recursive", action="store_true",
# default="false", help="Run this job recursively")

cp.add_argument(
"-v", "--verbose", dest="verbose", action="store_true",
Expand Down

0 comments on commit cfa2e8b

Please sign in to comment.