Skip to content

Commit

Permalink
Merge branch 'master' of git@github.com:klbostee/dumbo
Browse files Browse the repository at this point in the history
Conflicts:

	src/python/setup.py
  • Loading branch information
Klaas Bosteels authored and Klaas Bosteels committed Nov 25, 2008
2 parents 5e29b48 + 0db4709 commit 0f70ee0
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions src/python/dumbo.py
Expand Up @@ -152,7 +152,7 @@ def execute(cmd,opts=[],precmd="",printcmd=True,stdout=sys.stdout,stderr=sys.std
if precmd: cmd = " ".join((precmd,cmd))
args = " ".join("-%s '%s'" % (key,value) for key,value in opts)
if args: cmd = " ".join((cmd,args))
if printcmd: print >>stdout,"EXEC:",cmd
if printcmd: print >>stderr,"EXEC:",cmd
return system(cmd,stdout,stderr)

def system(cmd,stdout=sys.stdout,stderr=sys.stderr):
Expand Down Expand Up @@ -197,28 +197,33 @@ def dummysystem(*args,**kwargs): return 0
else: python = addedopts["python"][0]
if not addedopts["iteration"]: iter = 0
else: iter = int(addedopts["iteration"][0])
if addedopts["hadoop"]: prog = prog.split("/")[-1]
opts.append(("mapper","%s %s map %i" % (python,prog,iter)))
opts.append(("reducer","%s %s red %i" % (python,prog,iter)))
if not addedopts["hadoop"]: progincmd = prog
else: progincmd = prog.split("/")[-1]
opts.append(("mapper","%s %s map %i" % (python,progincmd,iter)))
opts.append(("reducer","%s %s red %i" % (python,progincmd,iter)))
if not addedopts["hadoop"]: return startonunix(prog,opts)
else: return startonstreaming(prog,opts,addedopts["hadoop"][0])

def startonunix(prog,opts):
try: opts += configopts("unix",prog,opts)
except: pass # ignore
addedopts = getopts(opts,["input","output","mapper","reducer","libegg",
"delinputs","cmdenv"])
"delinputs","cmdenv","pv"])
mapper,reducer = addedopts["mapper"][0],addedopts["reducer"][0]
if (not addedopts["input"]) or (not addedopts["output"]):
print >>sys.stderr,"ERROR: input or output not specified"
return 1
inputs = " ".join("'%s'" % input for input in addedopts["input"])
inputs = " ".join(addedopts["input"])
output = addedopts["output"][0]
pyenv = envdef("PYTHONPATH",addedopts["libegg"])
cmdenv = " ".join("%s='%s'" % tuple(arg.split("=")) \
for arg in addedopts["cmdenv"])
retval = execute("cat %s | %s %s %s | LC_ALL=C sort | %s %s %s > '%s'" % \
(inputs,pyenv,cmdenv,mapper,pyenv,cmdenv,reducer,output))
if addedopts["pv"] and addedopts["pv"][0] == "yes":
cat = "pv -cN source"
mpv,spv,rpv = "| pv -cN map ","| pv -cN sort ","| pv -cN reduce "
else: cat,mpv,spv,rpv = "cat","","",""
retval = execute("%s %s | %s %s %s %s| LC_ALL=C sort %s| %s %s %s %s> '%s'" % \
(cat,inputs,pyenv,cmdenv,mapper,mpv,spv,pyenv,cmdenv,reducer,rpv,output))
if addedopts["delinputs"] and addedopts["delinputs"][0] == "yes":
for file in addedopts["input"]: execute("rm " + file)
return retval
Expand Down

0 comments on commit 0f70ee0

Please sign in to comment.