Skip to content
Browse files

Options refactoring across all the codebase, the change is backward c…

…ompatible
  • Loading branch information...
1 parent ca76ffb commit 3eb93adb318b3ca48128f8cf5888f146f6bf0479 @andrix andrix committed Jan 25, 2012
Showing with 639 additions and 441 deletions.
  1. +2 −0 .gitignore
  2. +31 −43 dumbo/backends/common.py
  3. +141 −153 dumbo/backends/streaming.py
  4. +40 −42 dumbo/backends/unix.py
  5. +47 −43 dumbo/cmd.py
  6. +79 −94 dumbo/core.py
  7. +4 −3 dumbo/decor.py
  8. +16 −16 dumbo/lib/__init__.py
  9. +10 −10 dumbo/lib/rawreducer.py
  10. +112 −25 dumbo/util.py
  11. +13 −12 tests/testexamples.py
  12. +144 −0 tests/testutil.py
View
2 .gitignore
@@ -0,0 +1,2 @@
+*.pyc
+*.pyo
View
74 dumbo/backends/common.py
@@ -18,8 +18,7 @@
import os
import re
-from dumbo.util import incrcounter, setstatus, getopts, configopts
-
+from dumbo.util import incrcounter, setstatus, configopts
class Params(object):
"""
@@ -127,34 +126,25 @@ def __init__(self, prog, opts):
(self.prog, self.opts) = (prog, opts)
def run(self):
- addedopts = getopts(self.opts, ['fake',
- 'debug',
- 'python',
- 'iteration',
- 'itercount',
- 'hadoop',
- 'starter',
- 'name',
- 'memlimit',
- 'param',
- 'parser',
- 'record',
- 'joinkeys',
- 'hadoopconf',
- 'mapper',
- 'reducer'])
- if addedopts['fake'] and addedopts['fake'][0] == 'yes':
+ opts = self.opts
+ attrs = ['fake', 'debug', 'python', 'iteration', 'itercount', 'hadoop',
+ 'starter', 'name', 'memlimit', 'param', 'parser', 'record',
+ 'joinkeys', 'hadoopconf', 'mapper', 'reducer']
+ addedopts = opts.filter(attrs)
+ opts.remove(*attrs)
+
+ if 'yes' in addedopts['fake']:
def dummysystem(*args, **kwargs):
return 0
global system
system = dummysystem # not very clean, but it works...
- if addedopts['debug'] and addedopts['debug'][0] == 'yes':
- self.opts.append(('cmdenv', 'dumbo_debug=yes'))
+ if 'yes' in addedopts['debug']:
+ opts.add('cmdenv', 'dumbo_debug=yes')
if not addedopts['python']:
python = 'python'
else:
python = addedopts['python'][0]
- self.opts.append(('python', python))
+ opts.add('python', python)
if not addedopts['iteration']:
iter = 0
else:
@@ -167,13 +157,13 @@ def dummysystem(*args, **kwargs):
name = addedopts['name'][0]
else:
name = self.prog.split('/')[-1]
- self.opts.append(('name', '%s (%s/%s)' % (name, iter + 1,
- itercnt)))
+ opts.add('name', '%s (%s/%s)' % (name, iter + 1, itercnt))
if not addedopts['hadoop']:
pypath = '/'.join(self.prog.split('/')[:-1])
- if pypath: self.opts.append(('pypath', pypath))
+ if pypath:
+ opts.add('pypath', pypath)
else:
- self.opts.append(('hadoop', addedopts['hadoop'][0]))
+ opts.add('hadoop', addedopts['hadoop'][0])
progmod = self.prog.split('/')[-1]
progmod = progmod[:-3] if progmod.endswith('.py') else progmod
memlim = ' 262144000' # 250MB limit by default
@@ -193,38 +183,36 @@ def dummysystem(*args, **kwargs):
memlim = ' ' + addedopts['memlimit'][0]
if addedopts['mapper']:
- self.opts.append(('mapper', addedopts['mapper'][0]))
+ opts.add('mapper', addedopts['mapper'][0])
else:
- self.opts.append(('mapper', '%s -m %s map %i%s' % (python,
- progmod, iter, memlim)))
+ opts.add('mapper', '%s -m %s map %i%s' % (python, progmod, iter,
+ memlim))
if addedopts['reducer']:
- self.opts.append(('reducer', addedopts['reducer'][0]))
+ opts.add('reducer', addedopts['reducer'][0])
else:
- self.opts.append(('reducer', '%s -m %s red %i%s' % (python,
- progmod, iter, memlim)))
+ opts.add('reducer', '%s -m %s red %i%s' % (python, progmod,
+ iter, memlim))
for param in addedopts['param']:
- self.opts.append(('cmdenv', param))
+ opts.add('cmdenv', param)
if addedopts['parser'] and iter == 0:
parser = addedopts['parser'][0]
shortcuts = dict(configopts('parsers', self.prog))
if parser in shortcuts:
parser = shortcuts[parser]
- self.opts.append(('cmdenv', 'dumbo_parser=' + parser))
+ opts.add('cmdenv', 'dumbo_parser=' + parser)
if addedopts['record'] and iter == 0:
record = addedopts['record'][0]
shortcuts = dict(configopts('records', self.prog))
if record in shortcuts:
record = shortcuts[record]
- self.opts.append(('cmdenv', 'dumbo_record=' + record))
- if addedopts['joinkeys'] and addedopts['joinkeys'][0] == 'yes':
- self.opts.append(('cmdenv', 'dumbo_joinkeys=yes'))
- self.opts.append(('partitioner',
- 'org.apache.hadoop.mapred.lib.BinaryPartitioner'))
- self.opts.append(('jobconf',
- 'mapred.binary.partitioner.right.offset=-6'))
+ opts.add('cmdenv', 'dumbo_record=' + record)
+ if 'yes' in addedopts['joinkeys']:
+ opts.add('cmdenv', 'dumbo_joinkeys=yes')
+ opts.add('partitioner', 'org.apache.hadoop.mapred.lib.BinaryPartitioner')
+ opts.add('jobconf', 'mapred.binary.partitioner.right.offset=-6')
for hadoopconf in addedopts['hadoopconf']:
- self.opts.append(('jobconf', hadoopconf))
- self.opts.append(('libegg', re.sub('\.egg.*$', '.egg', __file__)))
+ opts.add('jobconf', hadoopconf)
+ opts.add('libegg', re.sub('\.egg.*$', '.egg', __file__))
return 0
View
294 dumbo/backends/streaming.py
@@ -19,22 +19,20 @@
import re
from dumbo.backends.common import Backend, Iteration, FileSystem, RunInfo
-from dumbo.util import getopt, getopts, configopts, envdef, execute
-from dumbo.util import findhadoop, findjar, dumpcode, dumptext
+from dumbo.util import (configopts, envdef, execute, findhadoop, findjar,
+ dumpcode, dumptext, Options)
class StreamingBackend(Backend):
def matches(self, opts):
- return bool(getopt(opts, 'hadoop', delete=False))
+ return bool(opts['hadoop'])
def create_iteration(self, opts):
- progopt = getopt(opts, 'prog')
- return StreamingIteration(progopt[0], opts)
+ return StreamingIteration(opts['prog'][0], opts)
def create_filesystem(self, opts):
- hadoopopt = getopt(opts, 'hadoop', delete=False)
- return StreamingFileSystem(findhadoop(hadoopopt[0]))
+ return StreamingFileSystem(findhadoop(opts['hadoop'][0]))
def get_runinfo_class(self, opts):
return StreamingRunInfo
@@ -44,193 +42,184 @@ class StreamingIteration(Iteration):
def __init__(self, prog, opts):
Iteration.__init__(self, prog, opts)
- self.opts += configopts('streaming', prog, self.opts)
- hadoop = getopt(self.opts, 'hadoop', delete=False)[0]
- self.opts += configopts('streaming_' + hadoop, prog, self.opts)
+ self.opts += Options(configopts('streaming', prog, self.opts))
+ hadoop_streaming = 'streaming_%s' % self.opts['hadoop'][0]
+ self.opts += Options(configopts(hadoop_streaming, prog, self.opts))
def run(self):
retval = Iteration.run(self)
if retval != 0:
return retval
+ opts = self.opts
if os.path.exists(self.prog):
- self.opts.append(('file', self.prog))
- addedopts = getopts(self.opts, ['hadoop',
- 'name',
- 'delinputs',
- 'libegg',
- 'libjar',
- 'inputformat',
- 'outputformat',
- 'nummaptasks',
- 'numreducetasks',
- 'priority',
- 'queue',
- 'cachefile',
- 'cachearchive',
- 'file',
- 'codewritable',
- 'addpath',
- 'getpath',
- 'python',
- 'streamoutput',
- 'pypath'])
+ opts.add('file', self.prog)
+
+ keys = ['hadoop', 'name', 'delinputs', 'libegg', 'libjar',
@erikvdp
erikvdp added a note Jun 25, 2014

Shouldn't 'preoutputs' also be included here? Otherwise it is being passed over to Hadoop which raises an unrecognized option exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ 'inputformat', 'outputformat', 'nummaptasks', 'numreducetasks',
+ 'priority', 'queue', 'cachefile', 'cachearchive', 'file',
+ 'codewritable', 'addpath', 'getpath', 'python', 'streamoutput',
+ 'pypath']
+ addedopts = opts.filter(keys)
+ opts.remove(*keys)
+
hadoop = findhadoop(addedopts['hadoop'][0])
streamingjar = findjar(hadoop, 'streaming')
if not streamingjar:
print >> sys.stderr, 'ERROR: Streaming jar not found'
return 1
- try:
+ try:
import typedbytes
except ImportError:
print >> sys.stderr, 'ERROR: "typedbytes" module not found'
return 1
modpath = re.sub('\.egg.*$', '.egg', typedbytes.__file__)
- if modpath.endswith('.egg'):
- addedopts['libegg'].append(modpath)
+ if modpath.endswith('.egg'):
+ addedopts.add('libegg', modpath)
else:
- self.opts.append(('file', modpath))
- self.opts.append(('jobconf', 'stream.map.input=typedbytes'))
- self.opts.append(('jobconf', 'stream.reduce.input=typedbytes'))
+ opts.add('file', modpath)
+ opts.add('jobconf', 'stream.map.input=typedbytes')
+ opts.add('jobconf', 'stream.reduce.input=typedbytes')
+
if addedopts['numreducetasks'] and addedopts['numreducetasks'][0] == '0':
- self.opts.append(('jobconf', 'stream.reduce.output=typedbytes'))
+ opts.add('jobconf', 'stream.reduce.output=typedbytes')
if addedopts['streamoutput']:
id_ = addedopts['streamoutput'][0]
- self.opts.append(('jobconf', 'stream.map.output=' + id_))
- else:
- self.opts.append(('jobconf', 'stream.map.output=typedbytes'))
+ opts.add('jobconf', 'stream.map.output=' + id_)
+ else:
+ opts.add('jobconf', 'stream.map.output=typedbytes')
else:
- self.opts.append(('jobconf', 'stream.map.output=typedbytes'))
+ opts.add('jobconf', 'stream.map.output=typedbytes')
if addedopts['streamoutput']:
id_ = addedopts['streamoutput'][0]
- self.opts.append(('jobconf', 'stream.reduce.output=' + id_))
+ opts.add('jobconf', 'stream.reduce.output=' + id_)
else:
- self.opts.append(('jobconf', 'stream.reduce.output=typedbytes'))
- if not addedopts['name']:
- self.opts.append(('jobconf', 'mapred.job.name='
- + self.prog.split('/')[-1]))
- else:
- self.opts.append(('jobconf', 'mapred.job.name=%s'
- % addedopts['name'][0]))
- if addedopts['nummaptasks']:
- self.opts.append(('jobconf', 'mapred.map.tasks=%s'
- % addedopts['nummaptasks'][0]))
- if addedopts['numreducetasks']:
- numreducetasks = int(addedopts['numreducetasks'][0])
- self.opts.append(('numReduceTasks', str(numreducetasks)))
+ opts.add('jobconf', 'stream.reduce.output=typedbytes')
+
+ progname = self.prog.split('/')[-1] if not addedopts['name'] \
+ else addedopts['name'][0]
+ opts.add('jobconf', 'mapred.job.name=%s' % progname)
+
+ nummaptasks = addedopts['nummaptasks']
+ numreducetasks = addedopts['numreducetasks']
+ if nummaptasks:
+ opts.add('jobconf', 'mapred.map.tasks=%s' % nummaptasks[0])
+ if numreducetasks:
+ opts.add('numReduceTasks', numreducetasks[0])
if addedopts['priority']:
- self.opts.append(('jobconf', 'mapred.job.priority=%s'
- % addedopts['priority'][0]))
+ opts.add('jobconf', 'mapred.job.priority=%s' % addedopts['priority'][0])
if addedopts['queue']:
- self.opts.append(('jobconf', 'mapred.job.queue.name=%s'
- % addedopts['queue'][0]))
- if addedopts['cachefile']:
- for cachefile in addedopts['cachefile']:
- self.opts.append(('cacheFile', cachefile))
- if addedopts['cachearchive']:
- for cachearchive in addedopts['cachearchive']:
- self.opts.append(('cacheArchive', cachearchive))
- if addedopts['file']:
- for file in addedopts['file']:
- if not '://' in file:
- if not os.path.exists(file):
- raise ValueError('file "' + file + '" does not exist')
- file = 'file://' + os.path.abspath(file)
- self.opts.append(('file', file))
+ opts.add('jobconf', 'mapred.job.queue.name=%s' % addedopts['queue'][0])
+
+ for cachefile in addedopts['cachefile']:
+ opts.add('cacheFile', cachefile)
+
+ for cachearchive in addedopts['cachearchive']:
+ opts.add('cacheArchive', cachearchive)
+
+ for _file in addedopts['file']:
+ if not '://' in _file:
+ if not os.path.exists(_file):
+ raise ValueError('file "%s" does not exist' % _file)
+ _file = 'file://%s' % os.path.abspath(file)
+ opts.add('file', _file)
+
if not addedopts['inputformat']:
- addedopts['inputformat'] = ['auto']
- inputformat_shortcuts = \
- {'code': 'org.apache.hadoop.streaming.AutoInputFormat',
- 'text': 'org.apache.hadoop.mapred.TextInputFormat',
- 'sequencefile': 'org.apache.hadoop.streaming.AutoInputFormat',
- 'auto': 'org.apache.hadoop.streaming.AutoInputFormat'}
+ addedopts.add('inputformat', 'auto')
+
+ inputformat_shortcuts = {
+ 'code': 'org.apache.hadoop.streaming.AutoInputFormat',
+ 'text': 'org.apache.hadoop.mapred.TextInputFormat',
+ 'sequencefile': 'org.apache.hadoop.streaming.AutoInputFormat',
+ 'auto': 'org.apache.hadoop.streaming.AutoInputFormat'
+ }
inputformat_shortcuts.update(configopts('inputformats', self.prog))
- inputformat = addedopts['inputformat'][0]
- if inputformat_shortcuts.has_key(inputformat.lower()):
- inputformat = inputformat_shortcuts[inputformat.lower()]
- self.opts.append(('inputformat', inputformat))
+
+ inputformat = addedopts['inputformat'][0].lower()
+ if inputformat in inputformat_shortcuts:
+ inputformat = inputformat_shortcuts[inputformat]
+ opts.add('inputformat', inputformat)
+
if not addedopts['outputformat']:
- addedopts['outputformat'] = ['sequencefile']
- if addedopts['getpath'] and addedopts['getpath'] != 'no':
- outputformat_shortcuts = \
- {'code': 'fm.last.feathers.output.MultipleSequenceFiles',
- 'text': 'fm.last.feathers.output.MultipleTextFiles',
- 'raw': 'fm.last.feathers.output.MultipleRawFileOutputFormat',
- 'sequencefile': 'fm.last.feathers.output.MultipleSequenceFiles'}
+ addedopts.add('outputformat', 'sequencefile')
+
+ if 'no' not in addedopts['getpath']:
+ outputformat_shortcuts = {
+ 'code': 'fm.last.feathers.output.MultipleSequenceFiles',
+ 'text': 'fm.last.feathers.output.MultipleTextFiles',
+ 'raw': 'fm.last.feathers.output.MultipleRawFileOutputFormat',
+ 'sequencefile': 'fm.last.feathers.output.MultipleSequenceFiles'
+ }
else:
- outputformat_shortcuts = \
- {'code': 'org.apache.hadoop.mapred.SequenceFileOutputFormat',
- 'text': 'org.apache.hadoop.mapred.TextOutputFormat',
- 'raw': 'fm.last.feathers.output.RawFileOutputFormat',
- 'sequencefile': 'org.apache.hadoop.mapred.SequenceFileOutputFormat'}
+ outputformat_shortcuts = {
+ 'code': 'org.apache.hadoop.mapred.SequenceFileOutputFormat',
+ 'text': 'org.apache.hadoop.mapred.TextOutputFormat',
+ 'raw': 'fm.last.feathers.output.RawFileOutputFormat',
+ 'sequencefile': 'org.apache.hadoop.mapred.SequenceFileOutputFormat'
+ }
outputformat_shortcuts.update(configopts('outputformats', self.prog))
- outputformat = addedopts['outputformat'][0]
- if outputformat_shortcuts.has_key(outputformat.lower()):
- outputformat = outputformat_shortcuts[outputformat.lower()]
- self.opts.append(('outputformat', outputformat))
- if addedopts['addpath'] and addedopts['addpath'][0] != 'no':
- self.opts.append(('cmdenv', 'dumbo_addpath=true'))
- pyenv = envdef('PYTHONPATH',
- addedopts['libegg'],
- 'file',
- self.opts,
- shortcuts=dict(configopts('eggs', self.prog)),
- quote=False,
- trim=True,
- extrapaths=addedopts['pypath'])
+
+ outputformat = addedopts['outputformat'][0].lower()
+ if outputformat in outputformat_shortcuts:
+ outputformat = outputformat_shortcuts[outputformat]
+ opts.add('outputformat', outputformat)
+
+ if 'no' not in addedopts['addpath']:
+ opts.add('cmdenv', 'dumbo_addpath=true')
+
+ pyenv = envdef('PYTHONPATH', addedopts['libegg'], 'file', self.opts,
+ shortcuts=dict(configopts('eggs', self.prog)), quote=False, trim=True,
+ extrapaths=addedopts['pypath'])
if pyenv:
- self.opts.append(('cmdenv', pyenv))
- hadenv = envdef('HADOOP_CLASSPATH', addedopts['libjar'], 'libjar',
- self.opts, shortcuts=dict(configopts('jars', self.prog)))
- fileopt = getopt(self.opts, 'file')
- if fileopt:
- tmpfiles = []
- for file in fileopt:
- if file.startswith('file://'):
- self.opts.append(('file', file[7:]))
- else:
- tmpfiles.append(file)
- if tmpfiles:
- self.opts.append(('jobconf', 'tmpfiles=' + ','.join(tmpfiles)))
- libjaropt = getopt(self.opts, 'libjar')
- if libjaropt:
- tmpjars = []
- for jar in libjaropt:
- if jar.startswith('file://'):
- self.opts.append(('file', jar[7:]))
- else:
- tmpjars.append(jar)
- if tmpjars:
- self.opts.append(('jobconf', 'tmpjars=' + ','.join(tmpjars)))
+ opts.add('cmdenv', pyenv)
+
+ hadenv = envdef('HADOOP_CLASSPATH', addedopts['libjar'], 'libjar',
+ self.opts, shortcuts=dict(configopts('jars', self.prog)))
+
+ tmpfiles = []
+ for _file in opts['file']:
+ if _file.startswith('file://'):
+ opts.add('file', _file[7:])
+ else:
+ tmpfiles.append(_file)
+ if tmpfiles:
+ opts.add('jobconf', 'tmpfiles=%s' % ','.join(tmpfiles))
+
+ tmpjars = []
+ for jar in opts['libjar']:
+ if jar.startswith('file://'):
+ opts.add('file', jar[7:])
+ else:
+ tmpjars.append(jar)
+ if tmpjars:
+ opts.add('jobconf', 'tmpjars=%s' % ','.join(tmpjars))
+
cmd = hadoop + '/bin/hadoop jar ' + streamingjar
- retval = execute(cmd, self.opts, hadenv)
- if addedopts['delinputs'] and addedopts['delinputs'][0] == 'yes':
- for (key, value) in self.opts:
- if key == 'input':
- if os.path.exists(hadoop + "/bin/hdfs"):
- hdfs = hadoop + "/bin/hdfs"
- else:
- hdfs = hadoop + "/bin/hadoop"
- execute("%s dfs -rmr '%s'" % (hdfs, value))
+ retval = execute(cmd, opts, hadenv)
+
+ if 'yes' in addedopts['delinputs']:
+ inputs = opts['input']
+ hdfs = _hdfspath(hadoop)
+ for path in inputs:
+ execute("%s dfs -rmr '%s'" % (hdfs, path))
return retval
+def _hdfspath(hadoop):
+ return hadoop + ("/bin/hdfs" if os.path.exists(hadoop + "/bin/hdfs")
+ else "/bin/hadoop")
class StreamingFileSystem(FileSystem):
def __init__(self, hadoop):
self.hadoop = hadoop
- if os.path.exists(hadoop + "/bin/hdfs"):
- self.hdfs = hadoop + "/bin/hdfs"
- else:
- self.hdfs = hadoop + "/bin/hadoop"
+ self.hdfs = _hdfspath(hadoop)
def cat(self, path, opts):
- addedopts = getopts(opts, ['libjar'], delete=False)
streamingjar = findjar(self.hadoop, 'streaming')
if not streamingjar:
print >> sys.stderr, 'ERROR: Streaming jar not found'
return 1
- hadenv = envdef('HADOOP_CLASSPATH', addedopts['libjar'],
- shortcuts=dict(configopts('jars')))
+ hadenv = envdef('HADOOP_CLASSPATH', opts['libjar'],
+ shortcuts=dict(configopts('jars')))
try:
import typedbytes
ls = os.popen('%s %s dfs -ls %s' % (hadenv, self.hdfs, path))
@@ -247,11 +236,10 @@ def cat(self, path, opts):
continue
dumptb = os.popen('%s %s/bin/hadoop jar %s dumptb %s 2> /dev/null'
% (hadenv, self.hadoop, streamingjar, subpath))
- ascodeopt = getopt(opts, 'ascode')
- if ascodeopt and ascodeopt[0] == 'yes':
- outputs = dumpcode(typedbytes.PairedInput(dumptb))
- else:
- outputs = dumptext(typedbytes.PairedInput(dumptb))
+
+ dump = dumpcode if 'yes' in opts['ascode'] else dumptext
+ outputs = dump(typedbytes.PairedInput(dumptb))
+
for output in outputs:
print '\t'.join(output)
dumptb.close()
View
82 dumbo/backends/unix.py
@@ -8,18 +8,17 @@
import operator
from dumbo.backends.common import Backend, Iteration, FileSystem
-from dumbo.util import getopt, getopts, configopts, envdef, execute
+from dumbo.util import configopts, envdef, execute, Options
from dumbo.cmd import decodepipe
class UnixBackend(Backend):
-
+
def matches(self, opts):
return True # always matches, but it's last in the list
-
+
def create_iteration(self, opts):
- progopt = getopt(opts, 'prog')
- return UnixIteration(progopt[0], opts)
+ return UnixIteration(opts['prog'][0], opts)
def create_filesystem(self, opts):
return UnixFileSystem()
@@ -29,47 +28,44 @@ class UnixIteration(Iteration):
def __init__(self, prog, opts):
Iteration.__init__(self, prog, opts)
- self.opts += configopts('unix', prog, self.opts)
+ self.opts += Options(configopts('unix', prog, self.opts))
def run(self):
retval = Iteration.run(self)
if retval != 0:
return retval
- addedopts = getopts(self.opts, ['input',
- 'output',
- 'mapper',
- 'reducer',
- 'libegg',
- 'delinputs',
- 'cmdenv',
- 'pv',
- 'addpath',
- 'inputformat',
- 'outputformat',
- 'numreducetasks',
- 'python',
- 'pypath',
- 'sorttmpdir',
- 'sortbufsize'])
- (mapper, reducer) = (addedopts['mapper'][0], addedopts['reducer'][0])
+
+ opts = self.opts
+ keys = ['input', 'output', 'mapper', 'reducer', 'libegg', 'delinputs',
+ 'cmdenv', 'pv', 'addpath', 'inputformat', 'outputformat',
+ 'numreducetasks', 'python', 'pypath', 'sorttmpdir', 'sortbufsize']
+ addedopts = opts.filter(keys)
+ opts.remove(*keys)
+
+ mapper, reducer = addedopts['mapper'][0], addedopts['reducer'][0]
if not addedopts['input'] or not addedopts['output']:
print >> sys.stderr, 'ERROR: input or output not specified'
return 1
- inputs = reduce(operator.concat, (input.split(' ') for input in
- addedopts['input']))
- output = addedopts['output'][0]
+
+ _inputs = addedopts['input']
+ _output = addedopts['output']
+
+ inputs = reduce(operator.concat, (inp.split(' ') for inp in _inputs))
+ output = _output[0]
+
pyenv = envdef('PYTHONPATH', addedopts['libegg'],
- shortcuts=dict(configopts('eggs', self.prog)),
- extrapaths=addedopts['pypath'])
+ shortcuts=dict(configopts('eggs', self.prog)),
+ extrapaths=addedopts['pypath'])
cmdenv = ' '.join("%s='%s'" % tuple(arg.split('=')) for arg in
addedopts['cmdenv'])
- if addedopts['pv'] and addedopts['pv'][0] == 'yes':
+
+ if 'yes' in addedopts['pv']:
mpv = '| pv -s `du -b %s | cut -f 1` -cN map ' % ' '.join(inputs)
(spv, rpv) = ('| pv -cN sort ', '| pv -cN reduce ')
else:
(mpv, spv, rpv) = ('', '', '')
- (sorttmpdir, sortbufsize) = ('', '')
+ sorttmpdir, sortbufsize = '', ''
if addedopts['sorttmpdir']:
sorttmpdir = "-T %s" % addedopts['sorttmpdir'][0]
if addedopts['sortbufsize']:
@@ -78,9 +74,10 @@ def run(self):
python = addedopts['python'][0]
encodepipe = pyenv + ' ' + python + \
' -m dumbo.cmd encodepipe -file ' + ' -file '.join(inputs)
- if addedopts['inputformat'] and addedopts['inputformat'][0] == 'code':
+
+ if 'code' in addedopts['inputformat']:
encodepipe += ' -alreadycoded yes'
- if addedopts['addpath'] and addedopts['addpath'][0] != 'no':
+ if 'no' not in addedopts['addpath']:
encodepipe += ' -addpath yes'
if addedopts['numreducetasks'] and addedopts['numreducetasks'][0] == '0':
retval = execute("%s | %s %s %s %s > '%s'" % (encodepipe,
@@ -104,28 +101,29 @@ def run(self):
reducer,
rpv,
output))
- if addedopts['delinputs'] and addedopts['delinputs'][0] == 'yes':
- for file in addedopts['input']:
- execute('rm ' + file)
+
+ if 'yes' in addedopts['delinputs']:
+ for _file in addedopts['input']:
+ execute('rm ' + _file)
return retval
-
+
class UnixFileSystem(FileSystem):
-
+
def cat(self, path, opts):
return decodepipe(opts + [('file', path)])
-
+
def ls(self, path, opts):
return execute("ls -l '%s'" % path, printcmd=False)
-
+
def exists(self, path, opts):
return execute("test -e '%s'" % path, printcmd=False)
-
+
def rm(self, path, opts):
return execute("rm -rf '%s'" % path, printcmd=False)
-
+
def put(self, path1, path2, opts):
return execute("cp '%s' '%s'" % (path1, path2), printcmd=False)
-
+
def get(self, path1, path2, opts):
return execute("cp '%s' '%s'" % (path1, path2), printcmd=False)
View
90 dumbo/cmd.py
@@ -17,7 +17,8 @@
import sys
import os
-from dumbo.util import *
+from dumbo.util import (dumpcode, Options, loadcode, dumptext, loadtext,
+ configopts, parseargs, execute, envdef)
from dumbo.backends import create_filesystem
@@ -67,14 +68,15 @@ def start(prog,
opts,
stdout=sys.stdout,
stderr=sys.stderr):
- opts += configopts('common')
- opts += configopts('start')
- addedopts = getopts(opts, ['libegg'], delete=False)
- pyenv = envdef('PYTHONPATH', addedopts['libegg'],
+ opts += Options(configopts('common'))
+ opts += Options(configopts('start'))
+
+ pyenv = envdef('PYTHONPATH', opts['libegg'],
shortcuts=dict(configopts('eggs', prog)),
extrapaths=sys.path)
- if not getopt(opts, 'prog', delete=False):
- opts.append(('prog', prog))
+ if not opts['prog']:
+ opts.add('prog', prog)
+
if not os.path.exists(prog):
if prog.endswith(".py"):
print >> sys.stderr, 'ERROR:', prog, 'does not exist'
@@ -89,72 +91,74 @@ def start(prog,
def cat(path, opts):
- opts += configopts('common')
- opts += configopts('cat')
+ opts += Options(configopts('common'))
+ opts += Options(configopts('cat'))
return create_filesystem(opts).cat(path, opts)
def ls(path, opts):
- opts += configopts('common')
- opts += configopts('ls')
+ opts += Options(configopts('common'))
+ opts += Options(configopts('ls'))
return create_filesystem(opts).ls(path, opts)
def exists(path, opts):
- opts += configopts('common')
- opts += configopts('exists')
+ opts += Options(configopts('common'))
+ opts += Options(configopts('exists'))
return create_filesystem(opts).exists(path, opts)
def rm(path, opts):
- opts += configopts('common')
- opts += configopts('rm')
+ opts += Options(configopts('common'))
+ opts += Options(configopts('rm'))
return create_filesystem(opts).rm(path, opts)
def put(path1, path2, opts):
- opts += configopts('common')
- opts += configopts('put')
+ opts += Options(configopts('common'))
+ opts += Options(configopts('put'))
return create_filesystem(opts).put(path1, path2, opts)
def get(path1, path2, opts):
- opts += configopts('common')
- opts += configopts('get')
+ opts += Options(configopts('common'))
+ opts += Options(configopts('get'))
return create_filesystem(opts).get(path1, path2, opts)
-def encodepipe(opts=[]):
- addedopts = getopts(opts, ['addpath', 'file', 'alreadycoded'])
- if addedopts['file']:
- files = (open(f) for f in addedopts['file'])
- else:
- files = [sys.stdin]
- for file in files:
- outputs = (line[:-1] for line in file)
- if addedopts['alreadycoded']:
- outputs = loadcode(outputs)
- else:
- outputs = loadtext(outputs)
- if addedopts['addpath']:
- outputs = (((file.name, key), value) for (key, value) in outputs)
+def encodepipe(opts=None):
+ opts = opts or Options()
+ keys = ['addpath', 'file', 'alreadycoded']
+ addedopts = opts.filter(keys)
+ opts.remove(*keys)
+
+ ofiles = addedopts['file']
+ files = map(open, ofiles) if ofiles else [sys.stdin]
+
+ loadfun = loadcode if addedopts['alreadycoded'] else loadtext
+ addpath = addedopts['addpath']
+
+ for _file in files:
+ outputs = loadfun(line[:-1] for line in _file)
+ if addpath:
+ outputs = (((_file.name, key), value) for (key, value) in outputs)
for output in dumpcode(outputs):
print '\t'.join(output)
- file.close()
+ _file.close()
return 0
-def decodepipe(opts=[]):
- addedopts = getopts(opts, ['file'])
- if addedopts['file']:
- files = (open(f) for f in addedopts['file'])
- else:
- files = [sys.stdin]
- for file in files:
- outputs = loadcode(line[:-1] for line in file)
+def decodepipe(opts=None):
+ opts = opts or Options()
+ ofiles = opts['file']
+ opts.remove('file')
+ files = map(open, ofiles) if ofiles else [sys.stdin]
+
+ for _file in files:
+ outputs = loadcode(line[:-1] for line in _file)
for output in dumptext(outputs):
print '\t'.join(output)
- file.close()
+ _file.close()
return 0
View
173 dumbo/core.py
@@ -16,7 +16,6 @@
import sys
import os
-import re
import types
import resource
import copy
@@ -54,94 +53,88 @@ def additer(self, *args, **kwargs):
return iter
def run(self):
- for (iter, (args, kwargs)) in enumerate(self.iters):
- kwargs['iter'] = iter
+ for _iter, (args, kwargs) in enumerate(self.iters):
+ kwargs['iter'] = _iter
if len(sys.argv) > 1 and not sys.argv[1][0] == '-':
run(*args, **kwargs)
else:
- opts = kwargs.get('opts', [])
+ opts = Options(kwargs.get('opts', []))
opts += parseargs(sys.argv[1:])
# this has to be done early, while all the opts are still there
backend = get_backend(opts)
fs = backend.create_filesystem(opts)
- preoutputsopt = getopt(opts, 'preoutputs')
- delinputsopt = getopt(opts, 'delinputs')
- addpathopt = getopt(opts, 'addpath', delete=False)
- getpathopt = getopt(opts, 'getpath', delete=False)
+ preoutputsopt = opts.pop('preoutputs')
+ delinputsopt = opts.pop('delinputs')
- job_inputs = getopt(opts, 'input', delete=False)
+ job_inputs = opts['input']
if not job_inputs:
print >> sys.stderr, 'ERROR: No input path specified'
sys.exit(1)
- outputopt = getopt(opts, 'output', delete=False)
+ outputopt = opts['output']
if not outputopt:
print >> sys.stderr, 'ERROR: No output path specified'
sys.exit(1)
+
job_output = outputopt[0]
- newopts = {}
- newopts['iteration'] = str(iter)
- newopts['itercount'] = str(len(self.iters))
+ newopts = Options()
+ newopts.add('iteration', str(_iter))
+ newopts.add('itercount', str(len(self.iters)))
- input = kwargs['input']
- if type(input) == int:
- input = [input]
- if input == [-1]:
+ _input = kwargs['input']
+ if type(_input) == int:
+ _input = [_input]
+ if _input == [-1]:
kwargs['input'] = job_inputs
- if delinputsopt and delinputsopt[0] == 'yes' and iter == self.deps[-1]:
- newopts['delinputs'] = 'yes'
- else:
- newopts['delinputs'] = 'no'
+ delinputs = 'yes' if 'yes' in delinputsopt and _iter == self.deps[-1] else 'no'
+ newopts.add('delinputs', delinputs)
else:
- if -1 in input:
+ if -1 in _input:
print >> sys.stderr, 'ERROR: Cannot mix job input with intermediate results'
sys.exit(1)
- kwargs['input'] = [job_output + "_pre" + str(initer + 1) for initer in input]
- newopts['inputformat'] = 'code'
- if addpathopt and addpathopt[0] == 'yes': # not when == 'iter'
- newopts['addpath'] = 'no'
- newopts['delinputs'] = 'no' # we'll take care of it ourselves
+ kwargs['input'] = [job_output + "_pre" + str(initer + 1) for initer in _input]
+ newopts.add('inputformat', 'code')
+ if 'yes' in opts['addpath']: # not when == 'iter'
+ newopts.add('addpath', 'no')
+ newopts.add('delinputs', 'no')
- if iter == len(self.iters) - 1:
+ if _iter == len(self.iters) - 1:
kwargs['output'] = job_output
else:
- kwargs['output'] = job_output + "_pre" + str(iter + 1)
- newopts['outputformat'] = 'code'
- if getpathopt and getpathopt[0] == 'yes': # not when == 'iter'
- newopts['getpath'] = 'no'
-
- (key, delindexes) = (None, [])
- for (index, (key, value)) in enumerate(opts):
- if newopts.has_key(key):
- delindexes.append(index)
- for delindex in reversed(delindexes):
- del opts[delindex]
- opts += newopts.iteritems()
+ kwargs['output'] = job_output + "_pre" + str(_iter + 1)
+ newopts.add('outputformat', 'code')
+ if 'yes' in opts['getpath']: # not when == 'iter'
+ newopts.add('getpath', 'no')
+
+ keys = [k for k, _ in opts if k in newopts]
+ opts.remove(*keys)
+ opts += newopts
+
kwargs['opts'] = opts
run(*args, **kwargs)
- if not (preoutputsopt and preoutputsopt[0] == 'yes') and input != [-1]:
- for initer in input:
- if iter == self.deps[initer]:
+ if 'yes' not in preoutputsopt and _input != [-1]:
+ for initer in _input:
+ if _iter == self.deps[initer]:
fs.rm(job_output + "_pre" + str(initer + 1), opts)
class Program(object):
- def __init__(self, prog, opts=[]):
- (self.prog, self.opts) = (prog, opts)
+ def __init__(self, prog, opts=None):
+ self.prog, self.opts = prog, opts or Options()
self.started = False
def addopt(self, key, value):
- self.opts.append((key, value))
+ self.opts.add(key, value)
def delopts(self, key):
- return getopts(self.opts, [key], delete=True)[key]
+ return self.opts.pop(key)
def delopt(self, key):
try:
@@ -150,7 +143,7 @@ def delopt(self, key):
return None
def getopts(self, key):
- return getopts(self.opts, [key], delete=False)[key]
+ return self.opts[key]
def getopt(self, key):
try:
@@ -169,16 +162,15 @@ def start(self):
def main(runner, starter=None, variator=None):
- opts = parseargs(sys.argv[1:])
- starteropt = getopts(opts, ['starter'])['starter']
- opts.append(('starter', 'no'))
- if starter and not (starteropt and starteropt[0] == 'no') \
- and not (len(sys.argv) > 1 and sys.argv[1][0] != '-'):
- progopt = getopt(opts, 'prog')
- if not progopt:
- program = Program(sys.argv[0], opts)
- else:
- program = Program(progopt[0], opts)
+ opts = Options(parseargs(sys.argv[1:]))
+ starteropt = opts.pop('starter')
+ opts.add('starter', 'no')
+ if starter and 'no' not in starteropt and \
+ not (len(sys.argv) > 1 and sys.argv[1][0] != '-'):
+ progopt = opts.pop('prog')
+ progname = progopt[0] if progopt else sys.argv[0]
+ program = Program(progname, opts)
+
try:
if variator:
programs = variator(program)
@@ -236,11 +228,11 @@ def run(mapper,
if len(sys.argv) > 3:
memlim = int(sys.argv[3])
resource.setrlimit(resource.RLIMIT_AS, (memlim, memlim))
-
+
mrbase_class = loadclassname(os.environ['dumbo_mrbase_class'])
jk_class = loadclassname(os.environ['dumbo_jk_class'])
runinfo = loadclassname(os.environ['dumbo_runinfo_class'])()
-
+
if iterarg == iter:
if sys.argv[1].startswith('map'):
if type(mapper) in (types.ClassType, type):
@@ -285,8 +277,8 @@ def run(mapper,
inputs = ((jk_class(k), v) for (k, v) in inputs)
if os.environ.has_key('dumbo_parser'):
parser = os.environ['dumbo_parser']
- clsname = parser.split('.')[-1]
- modname = '.'.join(parser.split('.')[:-1])
+ clsname = parser.split('.')[-1]
+ modname = '.'.join(parser.split('.')[:-1])
if not modname:
raise ImportError(parser)
module = __import__(modname, fromlist=[clsname])
@@ -369,64 +361,57 @@ def run(mapper,
for output in dumpcode(inputs):
print '\t'.join(output)
else:
- if not opts:
- opts = []
+ opts = opts or Options()
if type(mapper) == str:
- opts.append(('mapper', mapper))
+ opts.add('mapper', mapper)
elif hasattr(mapper, 'opts'):
opts += mapper.opts
if type(reducer) == str:
- opts.append(('reducer', reducer))
+ opts.add('reducer', reducer)
elif hasattr(reducer, 'opts'):
opts += reducer.opts
if type(combiner) == str:
- opts.append(('combiner', combiner))
+ opts.add('combiner', combiner)
opts += parseargs(sys.argv[1:])
if input is not None:
- getopt(opts, 'input', delete=True) # delete -input opts
+ opts.remove('input')
for infile in input:
- opts.append(('input', infile))
-
+ opts.add('input', infile)
+
if output is None:
- outputopt = getopt(opts, 'output', delete=False)
+ outputopt = opts['output']
if not outputopt:
print >> sys.stderr, 'ERROR: No output path specified'
sys.exit(1)
output = outputopt[0]
- newopts = {}
-
- newopts['output'] = output
+ newopts = Options()
+ newopts.add('output', output)
if not reducer:
- newopts['numreducetasks'] = '0'
-
- (key, delindexes) = (None, [])
- for (index, (key, value)) in enumerate(opts):
- if newopts.has_key(key):
- delindexes.append(index)
- for delindex in reversed(delindexes):
- del opts[delindex]
- opts += newopts.iteritems()
-
+ newopts.add('numreducetasks', '0')
+
+ keys = [k for k, _ in opts if k in newopts]
+ opts.remove(*keys)
+ opts += newopts
+
backend = get_backend(opts)
- overwriteopt = getopt(opts, 'overwrite')
- checkoutopt = getopt(opts, 'checkoutput')
- checkoutput = not (checkoutopt and checkoutopt[0] == 'no')
+ overwriteopt = opts.pop('overwrite')
+ checkoutput = 'no' not in opts.pop('checkoutput')
fs = backend.create_filesystem(opts)
- if overwriteopt and overwriteopt[0] == 'yes':
+ if 'yes' in overwriteopt:
fs.rm(output, opts)
elif checkoutput and fs.exists(output, opts) == 0:
print >> sys.stderr, 'ERROR: Output path exists already: %s' % output
sys.exit(1)
- opts.append(('cmdenv', 'dumbo_mrbase_class=' + \
- getclassname(backend.get_mapredbase_class(opts))))
- opts.append(('cmdenv', 'dumbo_jk_class=' + \
- getclassname(backend.get_joinkey_class(opts))))
- opts.append(('cmdenv', 'dumbo_runinfo_class=' + \
- getclassname(backend.get_runinfo_class(opts))))
+ opts.add('cmdenv', 'dumbo_mrbase_class=' + \
+ getclassname(backend.get_mapredbase_class(opts)))
+ opts.add('cmdenv', 'dumbo_jk_class=' + \
+ getclassname(backend.get_joinkey_class(opts)))
+ opts.add('cmdenv', 'dumbo_runinfo_class=' + \
+ getclassname(backend.get_runinfo_class(opts)))
retval = backend.create_iteration(opts).run()
if retval == 127:
print >> sys.stderr, 'ERROR: Are you sure that "python" is on your path?'
View
7 dumbo/decor.py
@@ -15,7 +15,7 @@
# limitations under the License.
from dumbo.lib import PrimaryMapper, SecondaryMapper
-
+from dumbo.util import Options
class opt(object):
@@ -24,9 +24,10 @@ def __init__(self, name, value):
def __call__(self, func):
if hasattr(func, 'opts'):
- func.opts.append(self.opt)
+ key, value = self.opt
+ func.opts.add(key, value)
else:
- func.opts = [self.opt]
+ func.opts = Options([self.opt])
return func
View
32 dumbo/lib/__init__.py
@@ -21,36 +21,36 @@
from math import sqrt
from copy import copy
-from dumbo.util import loadclassname
+from dumbo.util import loadclassname, Options
-def identitymapper(key, value):
+def identitymapper(key, value):
yield (key, value)
def identityreducer(key, values):
- for value in values:
- yield (key, value)
+ for value in values:
+ yield (key, value)
def sumreducer(key, values):
- yield (key, sum(values))
+ yield (key, sum(values))
def sumsreducer(key, values):
- yield (key, tuple(imap(sum, izip(*values))))
+ yield (key, tuple(imap(sum, izip(*values))))
-def nlargestreducer(n, key=None):
+def nlargestreducer(n, key=None):
def reducer(key_, values):
yield (key_, heapq.nlargest(n, chain(*values), key=key))
- return reducer
+ return reducer
def nlargestcombiner(n, key=None):
def combiner(key_, values):
- yield (key_, heapq.nlargest(n, values, key=key))
- return combiner
+ yield (key_, heapq.nlargest(n, values, key=key))
+ return combiner
def nsmallestreducer(n, key=None):
@@ -59,7 +59,7 @@ def reducer(key_, values):
return reducer
def nsmallestcombiner(n, key=None):
- def combiner(key_, values):
+ def combiner(key_, values):
yield (key_, heapq.nsmallest(n, values, key=key))
return combiner
@@ -96,10 +96,10 @@ def __new__(cls):
else:
cls.__call__ = cls.__call__normalkey
return object.__new__(cls)
-
+
def __init__(self):
self.mappers = []
- self.opts = [("addpath", "iter")]
+ self.opts = Options([("addpath", "iter")])
def configure(self):
mrbase_class = loadclassname(os.environ['dumbo_mrbase_class'])
@@ -152,7 +152,7 @@ class JoinMapper(object):
def __init__(self, mapper, isprimary=False):
self.mapper = mapper
self.isprimary = isprimary
- self.opts = [('joinkeys', 'yes')]
+ self.opts = Options([('joinkeys', 'yes')])
if hasattr(mapper, 'opts'):
self.opts += self.mapper.opts
self.closefunc = None
@@ -197,7 +197,7 @@ def __init__(self, mapper):
class JoinCombiner(object):
- opts = [("joinkeys", "yes")]
+ opts = Options([("joinkeys", "yes")])
def __call__(self, key, values):
if key.isprimary:
@@ -213,7 +213,7 @@ def __call__(self, key, values):
jk = copy(key)
jk.body = k
yield jk, v
-
+
def secondary_blocked(self, key_body):
'''Determines if the secondary method should be blocked or not.'''
return False
View
20 dumbo/lib/rawreducer.py
@@ -1,20 +1,20 @@
"""A reducer base class to output one or multiple files in its raw fileformat"""
from itertools import groupby
-
+from dumbo.util import Options
class RawReducer(object):
"""Reducer to generate outputs in raw file format"""
multipleoutput = False
- singleopts = [
- ('outputformat', 'raw'),
- ]
- multipleopts = [
- ('getpath', 'yes'),
- ('outputformat', 'raw'),
- ('partitioner', 'fm.last.feathers.partition.Prefix'),
- ('jobconf', 'feathers.output.filename.strippart=true'),
- ]
+ singleopts = Options([
+ ('outputformat', 'raw'),
+ ])
+ multipleopts = Options([
+ ('getpath', 'yes'),
+ ('outputformat', 'raw'),
+ ('partitioner', 'fm.last.feathers.partition.Prefix'),
+ ('jobconf', 'feathers.output.filename.strippart=true'),
+ ])
def __init__(self, factory=None, multipleoutput=None):
if factory:
View
137 dumbo/util.py
@@ -18,7 +18,8 @@
import os
import re
import subprocess
-
+import warnings
+from collections import defaultdict
def sorted(iterable, piecesize=None, key=None, reverse=False):
if not piecesize:
@@ -42,8 +43,8 @@ def incrcounter(group, counter, amount):
def setstatus(message):
print >> sys.stderr, 'reporter:status:%s' % message
-
-
+
+
def dumpcode(outputs):
for output in outputs:
yield map(repr, output)
@@ -78,40 +79,124 @@ def loadtext(inputs):
yield (offset, input)
offset += len(input)
+class Options(object):
+ """
+ Class that represent a a set of options. A key can hold
+ more than a value and key are stored in lowercase.
+ """
+
+ def __init__(self, seq=None, **kwargs):
+ """
+ Initialize the option object
+
+ Args:
+ - seq: a list of (key, value) pairs
+ """
+ self._opts = defaultdict(set)
+ options = (seq or []) + kwargs.items()
+ for k, v in options:
+ self.add(k, v)
+
+ def add(self, key, value):
+ self._opts[key.lower()].add(value)
+
+ def get(self, key):
+ if key not in self._opts:
+ return []
+ return list(self._opts[key])
+
+ def __getitem__(self, key):
+ return self.get(key)
+
+ def __delitem__(self, key):
+ return self.remove(key)
+
+ def __iadd__(self, opts):
+ if isinstance(opts, Options):
+ for k, vs in opts._opts.items():
+ self._opts[k].update(vs)
+ return self
+ elif isinstance(opts, (list, tuple, set)):
+ for k, v in opts:
+ self.add(k, v)
+ return self
+ else:
+ raise ValueError('Invalid opts type. Must be an iterable of (key, value)')
+
+ def __iter__(self):
+ return iter(self.allopts())
+
+ def __contains__(self, key):
+ return key in self._opts
+
+ def __len__(self):
+ return len(self.allopts())
+
+ def __bool__(self):
+ return bool(self._opts)
+
+ def filter(self, keys):
+ return Options(seq=[(k, v) for k, v in self.allopts() if k in keys])
+
+ def allopts(self):
+ """Return a list with all the options in the form of (key, value)"""
+ return [(k, v) for k, vs in self._opts.items() for v in sorted(vs)]
+
+ def to_dict(self):
+ return dict((k, list(sorted(vs))) for k, vs in self._opts.items())
+
+ def __str__(self):
+ ps = self.allopts()
+ return "Options(%s)" % (', '.join('%s="%s"' % (k, v) for k, v in ps))
+ __repr__ = __str__
+
+ def remove(self, *keys):
+ for k in keys:
+ del self._opts[k]
+
+ def pop(self, key, default=None):
+ return list(self._opts.pop(key, default))
def parseargs(args):
- (opts, key, values) = ([], None, [])
+ (opts, key, values) = (Options(), None, [])
for arg in args:
if arg[0] == '-' and len(arg) > 1:
if key:
- opts.append((key, ' '.join(values)))
+ opts.add(key, ' '.join(values))
(key, values) = (arg[1:], [])
else:
values.append(arg)
if key:
- opts.append((key, ' '.join(values)))
+ opts.add(key, ' '.join(values))
return opts
-
def getopts(opts, keys, delete=True):
- askedopts = dict((key, []) for key in keys)
- (key, delindexes) = (None, [])
- for (index, (key, value)) in enumerate(opts):
- key = key.lower()
- if askedopts.has_key(key):
- askedopts[key].append(value)
- delindexes.append(index)
+ warnings.warn("getopts will be deprecated. use dumbo.util.Options",
+ DeprecationWarning)
+ o = Options(opts)
+ result = o.filter(keys).to_dict()
if delete:
- for delindex in reversed(delindexes):
- del opts[delindex]
- return askedopts
-
+ for k in keys:
+ for v in o.get(k):
+ opts.remove((k, v))
+ if k in o:
+ o.remove(k)
+ return result
def getopt(opts, key, delete=True):
- return getopts(opts, [key], delete)[key]
-
+ warnings.warn("getopts will be deprecated. use dumbo.util.Options",
+ DeprecationWarning)
+ o = Options(opts)
+ if key not in o:
+ return []
+ values = o.get(key)
+ if delete:
+ for val in values:
+ opts.remove((key, val))
+ o.remove(key)
+ return values
-def configopts(section, prog=None, opts=[]):
+def configopts(section, prog=None, opts=None):
from ConfigParser import SafeConfigParser, NoSectionError
if prog:
prog = prog.split('/')[-1]
@@ -124,7 +209,7 @@ def configopts(section, prog=None, opts=[]):
os.environ['PWD'])])
except KeyError:
pass
- for (key, value) in opts:
+ for (key, value) in opts or Options():
defaults[key.lower()] = value
parser = SafeConfigParser(defaults)
parser.read(['/etc/dumbo.conf', os.environ['HOME'] + '/.dumborc'])
@@ -139,13 +224,14 @@ def configopts(section, prog=None, opts=[]):
def execute(cmd,
- opts=[],
+ opts=None,
precmd='',
printcmd=True,
stdout=sys.stdout,
stderr=sys.stderr):
if precmd:
cmd = ' '.join((precmd, cmd))
+ opts = opts or Options()
args = ' '.join("-%s '%s'" % (key, value) for (key, value) in opts)
if args:
cmd = ' '.join((cmd, args))
@@ -225,11 +311,12 @@ def envdef(varname,
pathvals.extend(extrapaths)
path = ':'.join(pathvals)
if optname and optvals:
+ opts = opts or Options()
if not commasep:
for optval in optvals:
- opts.append((optname, optval))
+ opts.add(optname, optval)
else:
- opts.append((optname, ','.join(optvals)))
+ opts.add(optname, ','.join(optvals))
if not quote:
return '%s=%s' % (varname, path)
else:
View
25 tests/testexamples.py
@@ -2,6 +2,7 @@
import sys
import unittest
from dumbo import cmd, util
+from dumbo.util import Options
class TestExamples(unittest.TestCase):
@@ -18,15 +19,15 @@ def setUp(self):
self.tstdir = "./"
self.logfile = open(self.tstdir+"log.txt", "w")
self.outfile = self.tstdir + "output.code"
- self.common_opts = [('checkoutput', 'no')]
+ self.common_opts = Options([('checkoutput', 'no')])
def tearDown(self):
self.logfile.close()
os.remove(self.outfile)
def testwordcount(self):
opts = self.common_opts
- opts += [('input', self.exdir+'brian.txt'), ('output', self.outfile)]
+ opts += Options([('input', self.exdir+'brian.txt'), ('output', self.outfile)])
retval = cmd.start(self.exdir+'wordcount.py', opts,
stdout=self.logfile, stderr=self.logfile)
self.assertEqual(0, retval)
@@ -35,8 +36,8 @@ def testwordcount(self):
def testoowordcount(self):
opts = self.common_opts
- opts += [('excludes', self.exdir+'excludes.txt'),
- ('input', self.exdir+'brian.txt'), ('output', self.outfile)]
+ opts += Options([('excludes', self.exdir+'excludes.txt'),
+ ('input', self.exdir+'brian.txt'), ('output', self.outfile)])
retval = cmd.start(self.exdir+'oowordcount.py', opts,
stdout=self.logfile, stderr=self.logfile)
self.assertEquals(0, retval)
@@ -45,7 +46,7 @@ def testoowordcount(self):
def testaltwordcount(self):
opts = self.common_opts
- opts += [('input', self.exdir+'brian.txt'), ('output', self.outfile)]
+ opts += Options([('input', self.exdir+'brian.txt'), ('output', self.outfile)])
retval = cmd.start(self.exdir+'altwordcount.py', opts,
stdout=self.logfile, stderr=self.logfile)
self.assertEqual(0, retval)
@@ -54,7 +55,7 @@ def testaltwordcount(self):
def testitertwice(self):
opts = self.common_opts
- opts += [('input', self.exdir+'brian.txt'), ('output', self.outfile)]
+ opts += Options([('input', self.exdir+'brian.txt'), ('output', self.outfile)])
retval = cmd.start(self.exdir+'itertwice.py', opts,
stdout=self.logfile, stderr=self.logfile)
self.assertEqual(0, retval)
@@ -63,9 +64,9 @@ def testitertwice(self):
def testjoin(self):
opts = self.common_opts
- opts += [('input', self.exdir+'hostnames.txt'),
+ opts += Options([('input', self.exdir+'hostnames.txt'),
('input', self.exdir+'logs.txt'),
- ('output', self.outfile)]
+ ('output', self.outfile)])
retval = cmd.start(self.exdir+'join.py', opts,
stdout=self.logfile, stderr=self.logfile)
self.assertEqual(0, retval)
@@ -74,16 +75,16 @@ def testjoin(self):
def testmulticount(self):
opts = self.common_opts
- opts += [('input', self.exdir+'brian.txt'),
+ opts += Options([('input', self.exdir+'brian.txt'),
('input', self.exdir+'eno.txt'),
- ('output', self.outfile)]
+ ('output', self.outfile)])
retval = cmd.start(self.exdir+'multicount.py', opts,
stdout=self.logfile, stderr=self.logfile)
self.assertEqual(0, retval)
output = dict(util.loadcode(open(self.outfile)))
self.assertEqual(6, int(output[('A', 'Brian')]))
- self.assertEqual(6, int(output[('B', 'Eno')]))
-
+ self.assertEqual(6, int(output[('B', 'Eno')]))
+
if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(TestExamples)
View
144 tests/testutil.py
@@ -0,0 +1,144 @@
+import unittest
+from dumbo.util import getopt, getopts, Options
+
+class TestUtil(unittest.TestCase):
+
+ def test_getopt(self):
+ # Test for backward compatibility
+ opts = []
+ values = getopt(opts, 'input')
+ self.assertEquals(values, [])
+ self.assertEquals(opts, [])
+
+ opts = [('param', 'p1'), ('param', 'p2'), ('input', '/dev/path')]
+ values = getopt(opts, 'param')
+ expected = ['p2', 'p1']
+ self.assertEquals(set(values), set(expected))
+ self.assertEquals(set(opts), set([('input', '/dev/path')]))
+
+ opts = [('output', '/prod/path')]
+ values = getopt(opts, 'output', delete=False)
+ self.assertEquals(values, ['/prod/path'])
+ self.assertEquals(opts, [('output', '/prod/path')])
+
+ values = getopt(opts, 'output')
+ self.assertEquals(values, ['/prod/path'])
+ self.assertEquals(opts, [])
+
+ def test_getopts(self):
+ # Test for backward compatibility
+ opts = []
+ values = getopts(opts, ['input'])
+ self.assertEquals(values, {})
+ self.assertEquals(opts, [])
+
+ opts = [('param', 'p1'), ('param', 'p2'), ('input', '/dev/path'),
+ ('output', '/prod/path')]
+ values = getopts(opts, ['param', 'input'])
+ expected = {'input': ['/dev/path'], 'param': ['p2', 'p1']}
+ settize = lambda _dict: set([(k, tuple(sorted(v))) for k, v in _dict.items()])
+ self.assertEquals(settize(values), settize(expected))
+ self.assertEquals(set(opts), set([('output', '/prod/path')]))
+
+ opts = [('output', '/prod/path')]
+ values = getopts(opts, ['output'], delete=False)
+ self.assertEquals(values, {'output': ['/prod/path']})
+ self.assertEquals(opts, [('output', '/prod/path')])
+
+ values = getopts(opts, ['output'])
+ self.assertEquals(values, {'output': ['/prod/path']})
+ self.assertEquals(opts, [])
+
+ def test_Options(self):
+ o = Options([('param', 'p1')])
+ # test add / get
+ o.add('param', 'p2')
+
+ # test repeat add same parameter
+ o.add('param', 'p2')
+ o.add('input', '/dev/path')
+ o.add('output', '/dev/out')
+ self.assertEquals(set(o.get('param')), set(['p1', 'p2']))
+ self.assertEquals(o.get('input'), ['/dev/path'])
+ self.assertEquals(o.get('notexist'), [])
+
+ # test __getitem__
+ self.assertEquals(set(o['param']), set(['p1', 'p2']))
+ self.assertEquals(o['input'], ['/dev/path'])
+ self.assertEquals(o['notexist'], [])
+
+ # test __delitem__
+ self.assertEquals(o['output'], ['/dev/out'])
+ del o['output']
+ self.assertEquals(o['output'], [])
+
+ # test __iadd__
+ # adding Options objects
+ o += Options([('output', '/dev/out2'), ('jar', 'my.jar')])
+ self.assertEquals(o['output'], ['/dev/out2'])
+ self.assertEquals(o['jar'], ['my.jar'])
+ # adding a list & set
+ o += [('param', 'p3'), ('egg', 'lib.egg')]
+ self.assertEquals(set(o['param']), set(['p1', 'p2', 'p3']))
+ self.assertEquals(o['egg'], ['lib.egg'])
+
+ o += set([('cmdenv', 'p=2')])
+ self.assertEquals(o['cmdenv'], ['p=2'])
+
+ # testing iter / allopts
+ o2 = Options([('param', 'p1')])
+ o2.add('param', 'p2')
+ o2.add('input', '/dev/path')
+ self.assertEquals(set(o2), set([('param', 'p1'), ('param', 'p2'), ('input', '/dev/path')]))
+ self.assertEquals(set(o2.allopts()), set([('param', 'p1'), ('param', 'p2'), ('input', '/dev/path')]))
+
+
+ # testing len
+ self.assertEquals(len(o), 8)
+ self.assertEquals(len(o2), 3)
+ self.assertEquals(len(Options()), 0)
+
+ # testing boolean
+ self.assertTrue(o)
+ self.assertTrue(o2)
+ self.assertFalse(Options())
+
+ # testing filter
+ self.assertEquals(set(o2.filter(['param'])['param']), set(['p1', 'p2']))
+ self.assertEquals(o2.filter(['input'])['input'], ['/dev/path'])
+
+ nop = o.filter(['param', 'jar', 'egg'])
+ self.assertEquals(len(nop), 5)
+ self.assertEquals(set(nop['param']), set(['p1', 'p2', 'p3']))
+ self.assertEquals(nop['jar'], ['my.jar'])
+ self.assertEquals(nop['egg'], ['lib.egg'])
+
+ # testing to_dict
+ expected = {
+ 'param': ['p1', 'p2', 'p3'],
+ 'egg': ['lib.egg'],
+ 'jar': ['my.jar']
+ }
+ self.assertEquals(nop.to_dict(), expected)
+
+ # testing remove
+ nop.remove('param', 'jar')
+ self.assertEquals(len(nop), 1)
+ self.assertEquals(nop['param'], [])
+ self.assertEquals(nop['jar'], [])
+ self.assertEquals(nop['egg'], ['lib.egg'])
+
+ # testing pop
+ self.assertEquals(nop.pop('egg'), ['lib.egg'])
+ self.assertEquals(len(nop), 0)
+ self.assertEquals(nop['egg'], [])
+
+
+
+
+
+
+
+if __name__ == "__main__":
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestUtil)
+ unittest.TextTestRunner(verbosity=2).run(suite)

0 comments on commit 3eb93ad

Please sign in to comment.
Something went wrong with that request. Please try again.