diff --git a/dumbo.py b/dumbo.py index fc0083e..bb8237a 100755 --- a/dumbo.py +++ b/dumbo.py @@ -139,14 +139,14 @@ def stream(prog,opts): else: streamonhadoop(prog,opts,addedopts["hadoop"][0]) def streamlocally(prog,opts): - addedopts = delopts(opts,["input","output","mapper","reducer","libpy", + addedopts = delopts(opts,["input","output","mapper","reducer","libegg", "delinputs"]) mapper,reducer = addedopts["mapper"][0],addedopts["reducer"][0] if (not addedopts["input"]) or (not addedopts["output"]): print >>sys.stderr,"ERROR: input or output not specified" sys.exit(1) input,output = addedopts["input"][0],addedopts["output"][0] - pythonenv = envdef("PYTHONPATH",addedopts["libpy"],opts) + pythonenv = envdef("PYTHONPATH",addedopts["libegg"],opts) retval = execute("%s %s < '%s' | LC_ALL=C sort | %s %s > '%s'" % \ (pythonenv,mapper,input,pythonenv,reducer,output)) if addedopts["delinputs"] and addedopts["delinputs"][0] == "yes": @@ -154,7 +154,7 @@ def streamlocally(prog,opts): sys.exit(retval) def streamonhadoop(prog,opts,hadoop): - addedopts = delopts(opts,["name","delinputs","libpy","libjar","inputformat", + addedopts = delopts(opts,["name","delinputs","libegg","libjar","inputformat", "nummaptasks","numreducetasks"]) opts.append(("file",prog)) opts.append(("file",sys.argv[0])) @@ -182,7 +182,7 @@ def streamonhadoop(prog,opts,hadoop): inputformat_shortcuts[inputformat.lower()] addedopts["libjar"].append(dumbojar) opts.append(("inputformat",inputformat)) - pythonenv = envdef("PYTHONPATH",addedopts["libpy"],opts) + pythonenv = envdef("PYTHONPATH",addedopts["libegg"],opts) hadoopenv = envdef("HADOOP_CLASSPATH",addedopts["libjar"],opts) cmd = hadoop + "/bin/hadoop jar " + streamingjar retval = execute(cmd,opts," ".join((pythonenv,hadoopenv)))