Skip to content

Commit

Permalink
moved some code to executor.py - tracking what actually happens now
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Fiers committed Jun 4, 2012
1 parent 50d2ca7 commit 37aae4a
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 83 deletions.
44 changes: 44 additions & 0 deletions lib/python/moa/backend/ruff/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os
import stat
import tempfile

import moa.actor

from moa.sysConf import sysConf

def ruffusExecutor(input, output, script, jobData):

if not sysConf.has_key('files_processed'):
sysConf.files_processed = []
sysConf.files_processed.append((input, output))
print input, '->', output
wd = jobData['wd']
tmpdir = os.path.realpath(os.path.abspath(
os.path.join(wd, '.moa', 'tmp')))
if not os.path.exists(tmpdir):
os.makedirs(tmpdir)

tf = tempfile.NamedTemporaryFile( delete = False,
dir=tmpdir,
prefix='moa',
mode='w')

tf.write(script)
tf.close()
os.chmod(tf.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)

for k in jobData:
v = jobData[k]
if isinstance(v, list):
os.putenv(k, " ".join(v))
elif isinstance(v, dict):
continue
else:
os.putenv(k, str(v))

runner = moa.actor.getRunner()
rc = runner(jobData['wd'], [tf.name], jobData, command=jobData['command'])
if rc != 0:
raise ruffus.JobSignalledBreak
#l.debug("Executing %s" % tf.name)

58 changes: 13 additions & 45 deletions lib/python/moa/backend/ruff/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,7 @@

from moa.backend.ruff.commands import RuffCommands
from moa.backend.ruff.base import RuffBaseJob

def localMapExecutor(input, output, script, jobData):

wd = jobData['wd']
tmpdir = os.path.realpath(os.path.abspath(
os.path.join(wd, '.moa', 'tmp')))
if not os.path.exists(tmpdir):
os.makedirs(tmpdir)

tf = tempfile.NamedTemporaryFile( delete = False,
dir=tmpdir,
prefix='moa',
mode='w')

tf.write(script)
tf.close()
os.chmod(tf.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)

import logging
for k in jobData:
v = jobData[k]
if isinstance(v, list):
os.putenv(k, " ".join(v))
elif isinstance(v, dict):
continue
else:
os.putenv(k, str(v))

runner = moa.actor.getRunner()
rc = runner(jobData['wd'], [tf.name], jobData, command=jobData['command'])
if rc != 0:
raise ruffus.JobSignalledBreak
l.debug("Executing %s" % tf.name)

from moa.backend.ruff.executor import ruffusExecutor

class RuffMapJob(RuffBaseJob):

Expand Down Expand Up @@ -135,12 +102,12 @@ def generate_data_map():



#this is because we're possibly reusing the this module (are we??)
#function in multiple ruffus calls. In all cases it's to
#be interpreted as a new, fresh call - so, remove all
#metadata that might have stuck from the last time
if hasattr(localMapExecutor, 'pipeline_task'):
del localMapExecutor.pipeline_task
# this is because we're possibly reusing the this module (are
# we??) function in multiple ruffus calls. In all cases it's
# to be interpreted as a new, fresh call - so, remove all
# metadata that might have stuck from the last time
if hasattr(ruffusExecutor, 'pipeline_task'):
del ruffusExecutor.pipeline_task

#if there are no & output files complain:
if len(self.job.data.inputs) + len(self.job.data.outputs) == 0:
Expand All @@ -150,7 +117,7 @@ def generate_data_map():
#here we're telling ruffus to proceed using the in & output files
#we're generating
l.debug("decorating executor")
executor2 = ruffus.files(generate_data_map)(localMapExecutor)
executor2 = ruffus.files(generate_data_map)(ruffusExecutor)
l.debug("Start run (with %d thread(s))" %
self.args.threads)

Expand All @@ -177,7 +144,8 @@ def generate_data_map():
"{{reset}}"
moa.ui.error("Caught a Ruffus error")
moa.ui.error(startOfError)
moa.ui.error(endOfError)
print endOfError
moa.ui.error(endOfError.replace('%', '%%'))
try:
#try to get some structured info & output that.
einfo = e[0][1].split('->')[0].split('=')[1].strip()
Expand All @@ -190,9 +158,9 @@ def generate_data_map():

#empty the ruffus node name cache needs to be empty -
#otherwise ruffus might think that we're rerunning jobs
if hasattr(localMapExecutor, 'pipeline_task'):
for k in localMapExecutor.pipeline_task._name_to_node.keys():
del localMapExecutor.pipeline_task._name_to_node[k]
if hasattr(ruffusExecutor, 'pipeline_task'):
for k in ruffusExecutor.pipeline_task._name_to_node.keys():
del ruffusExecutor.pipeline_task._name_to_node[k]
return rc


45 changes: 7 additions & 38 deletions lib/python/moa/backend/ruff/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,7 @@

from moa.backend.ruff.commands import RuffCommands
from moa.backend.ruff.base import RuffBaseJob

def localMapExecutor(input, output, script, jobData):
wd = jobData['wd']
tmpdir = os.path.realpath(os.path.abspath(
os.path.join(wd, '.moa', 'tmp')))
if not os.path.exists(tmpdir):
os.makedirs(tmpdir)

tf = tempfile.NamedTemporaryFile( delete = False,
dir=tmpdir,
prefix='moa',
mode='w')
tf.write(script)
tf.close()
os.chmod(tf.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)

import logging
for k in jobData:
v = jobData[k]
if isinstance(v, list):
os.putenv(k, " ".join(v))
elif isinstance(v, dict):
continue
else:
os.putenv(k, str(v))

runner = moa.actor.getRunner()
rc = runner(jobData['wd'], [tf.name], jobData, command=jobData['command'])
if rc != 0:
raise ruffus.JobSignalledBreak
l.debug("Executing %s" % tf.name)

from moa.backend.ruff.executor import ruffusExecutor

class RuffReduceJob(RuffBaseJob):

Expand Down Expand Up @@ -117,15 +86,15 @@ def execute(self):
l.debug("Executing %s" % script)


if hasattr(localMapExecutor, 'pipeline_task'):
del localMapExecutor.pipeline_task
if hasattr(ruffusExecutor, 'pipeline_task'):
del ruffusExecutor.pipeline_task

#here we're telling ruffus to proceed using the in & output files
#we're generating
l.debug("decorating executor")
executor2 = ruffus.files(
[inputs + prereqs], outputs, script, thisJobData
)(localMapExecutor)
)(ruffusExecutor)
l.debug("Start reduce run")

try:
Expand Down Expand Up @@ -163,9 +132,9 @@ def execute(self):

#empty the ruffus node name cache needs to be empty -
#otherwise ruffus might think that we're rerunning jobs
if hasattr(localMapExecutor, 'pipeline_task'):
for k in localMapExecutor.pipeline_task._name_to_node.keys():
del localMapExecutor.pipeline_task._name_to_node[k]
if hasattr(ruffusExecutor, 'pipeline_task'):
for k in ruffusExecutor.pipeline_task._name_to_node.keys():
del ruffusExecutor.pipeline_task._name_to_node[k]

return rc

0 comments on commit 37aae4a

Please sign in to comment.