Permalink
Browse files

added main (runners and starters) and [hadoops] config section

  • Loading branch information...
1 parent 92c4079 commit fc3deb9cfcdd888065c10c7c09d4b050279f61ac Klaas Bosteels committed Dec 7, 2008
Showing with 66 additions and 19 deletions.
  1. 0 examples/excludes.txt
  2. +11 −2 examples/oowordcount.py
  3. +55 −17 src/python/dumbo.py
View
0 examples/excludes.txt 100644 → 100755
No changes.
View
13 examples/oowordcount.py 100644 → 100755
@@ -2,7 +2,7 @@
Counts how many times each non-excluded word occurs:
>>> import dumbo
->>> opts = [('input','brian.txt'),('output','counts.txt'),('inputformat','text')]
+>>> opts = [('who','Brian'),('output','counts.txt')]
>>> logfile = open('log.txt','a')
>>> dumbo.start('oowordcount.py',opts,stdout=logfile,stderr=logfile)
0
@@ -23,6 +23,15 @@ def __call__(self,key,value):
def reducer(key,values):
yield key,sum(values)
+def runner(job):
+ job.additer(Mapper,reducer,reducer)
+
+def starter(prog):
+ opt = prog.delopt("who")
+ if not opt: return "'who' not specified"
+ prog.addopt("input",opt[0].lower() + ".txt")
+ prog.addopt("inputformat","text")
+
if __name__ == "__main__":
import dumbo
- dumbo.run(Mapper,reducer,reducer)
+ dumbo.main(runner,starter)
View
@@ -19,14 +19,29 @@ def run(self):
newopts["outputformat"] = "sequencefile"
kwargs["iter"],kwargs["newopts"] = index,newopts
run(*args,**kwargs)
-
+
+class Program:
+ def __init__(self,prog,opts=[]):
+ self.prog,self.opts = prog,opts
+ def addopt(self,key,value): self.opts.append((key,value))
+ def delopt(self,key): return getopts(self.opts,[key],delete=True)[key]
+ def getopt(self,key): return getopts(self.opts,[key],delete=False)[key]
+ def start(self): return start(self.prog,self.opts)
+
+class Counter:
+ def __init__(self,name,group="Program"):
+ self.group = group
+ self.name = name
+ def incr(self,amount):
+ incrcounter(self.group,self.name,amount)
+
class Iteration:
def __init__(self,prog,opts):
self.prog,self.opts = prog,opts
self.opts += configopts("common",prog,self.opts)
def run(self):
addedopts = getopts(self.opts,["fake","debug","python",
- "iteration","hadoop"])
+ "iteration","hadoop","starter"])
if addedopts["fake"] and addedopts["fake"][0] == "yes":
def dummysystem(*args,**kwargs): return 0
global system
@@ -39,7 +54,7 @@ def dummysystem(*args,**kwargs): return 0
if not addedopts["iteration"]: iter = 0
else: iter = int(addedopts["iteration"][0])
if not addedopts["hadoop"]: progincmd = self.prog
- else:
+ else:
self.opts.append(("hadoop",addedopts["hadoop"][0]))
progincmd = self.prog.split("/")[-1]
self.opts.append(("mapper","%s %s map %i" % (python,progincmd,iter)))
@@ -103,7 +118,7 @@ def run(self):
"nummaptasks","numreducetasks","priority",
"cachefile","cachearchive","codewritable",
"addpath","python"])
- hadoop = addedopts["hadoop"][0]
+ hadoop = findhadoop(addedopts["hadoop"][0])
streamingjar,dumbojar = findjar(hadoop,"streaming"),findjar(hadoop,"dumbo")
if not streamingjar:
print >>sys.stderr,"ERROR: Streaming jar not found"
@@ -181,16 +196,29 @@ def run(self):
for key,value in self.opts:
if key == "input":
execute("%s/bin/hadoop dfs -rmr '%s'" % (hadoop,value))
- return retvalw
-
-class Counter:
- def __init__(self,name,group="Program"):
- self.group = group
- self.name = name
- def incr(self,amount):
- incrcounter(self.group,self.name,amount)
+ return retval
+def main(runner,starter=None):
+ opts = parseargs(sys.argv[1:])
+ starteropt = getopts(opts,["starter"])["starter"]
+ opts.append(("starter","no"))
+ if starter and not (starteropt and starteropt[0] == 'no') and \
+ not (len(sys.argv) > 1 and sys.argv[1][0] != "-"):
+ program = Program(sys.argv[0],opts)
+ errormsg = starter(program)
+ if errormsg:
+ print >>sys.stderr,errormsg
+ sys.exit(1)
+ program.start()
+ else:
+ job = Job()
+ errormsg = runner(job)
+ if errormsg:
+ print >>sys.sdterr,errormsg
+ sys.exit(1)
+ job.run()
+
def run(mapper,reducer=None,combiner=None,
mapconf=None,redconf=None,mapclose=None,redclose=None,
iter=0,newopts={}):
@@ -314,9 +342,10 @@ def getopts(opts,keys,delete=True):
for delindex in reversed(delindexes): del opts[delindex]
return askedopts
-def configopts(section,prog,opts=[]):
+def configopts(section,prog=None,opts=[]):
from ConfigParser import SafeConfigParser,NoSectionError
- defaults = {'prog': prog.split("/")[-1].split(".py",1)[0]}
+ if prog: defaults = {'prog': prog.split("/")[-1].split(".py",1)[0]}
+ else: defaults = {}
try: defaults.update([('user',os.environ["USER"]),('pwd',os.environ["PWD"])])
except KeyError: pass
for key,value in opts: defaults[key] = value
@@ -342,6 +371,15 @@ def system(cmd,stdout=sys.stdout,stderr=sys.stderr):
proc = subprocess.Popen(cmd,shell=True,stdout=stdout,stderr=stderr)
return os.waitpid(proc.pid,0)[1] / 256
+def findhadoop(optval):
+ hadoop,hadoop_shortcuts = optval,dict(configopts("hadoops"))
+ if hadoop_shortcuts.has_key(hadoop.lower()):
+ hadoop = hadoop_shortcuts[hadoop.lower()]
+ if not os.path.exists(hadoop):
+ print >>sys.stderr,"ERROR: directory %s does not exist" % hadoop
+ sys.exit(1)
+ return hadoop
+
def findjar(hadoop,name):
jardir = hadoop + "/build/contrib/" + name
if not os.path.exists(jardir): jardir = hadoop + "/contrib/" + name
@@ -366,7 +404,7 @@ def envdef(varname,files,optname=None,opts=None,commasep=False,shortcuts={}):
def start(prog,opts,stdout=sys.stdout,stderr=sys.stderr):
addedopts = getopts(opts,["libegg"],delete=False)
pyenv = envdef("PYTHONPATH",addedopts["libegg"])
- return execute("python '%s'" % prog,opts,pyenv,stdout=stdout,stderr=stderr)
+ return execute("python '%s'" % prog,opts,pyenv,stdout=stdout,stderr=stderr,printcmd=False)
def submit(*args,**kwargs):
print >>sys.stderr,"WARNING: submit() is deprecated, use start() instead"
@@ -375,12 +413,12 @@ def submit(*args,**kwargs):
def cat(path,opts):
addedopts = getopts(opts,["hadoop","type","libjar"])
if not addedopts["hadoop"]: return decodepipe(opts + [("file",path)])
- hadoop = addedopts["hadoop"][0]
+ hadoop = findhadoop(addedopts["hadoop"][0])
dumbojar = findjar(hadoop,"dumbo")
if not dumbojar:
print >>sys.stderr,"ERROR: Dumbo jar not found"
return 1
- if not addedopts["type"]: type = "sequencefile"
+ if not addedopts["type"]: type = "auto"
else: type = addedopts["type"][0]
hadenv = envdef("HADOOP_CLASSPATH",addedopts["libjar"])
try:

0 comments on commit fc3deb9

Please sign in to comment.