Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Closes #38

  • Loading branch information...
commit b1dd50ce9c06e4c276c1cdd8c0860ba45ea582d2 1 parent ad33096
@klbostee authored
Showing with 72 additions and 37 deletions.
  1. +32 −17 dumbo/core.py
  2. +40 −20 dumbo/lib.py
View
49 dumbo/core.py
@@ -460,8 +460,10 @@ def run(mapper,
buffersize=None,
mapconf=None,
redconf=None,
+ combconf=None,
mapclose=None,
redclose=None,
+ combclose=None,
opts=None,
iter=0,
itercnt=1):
@@ -474,25 +476,25 @@ def run(mapper,
memlim = int(sys.argv[3])
resource.setrlimit(resource.RLIMIT_AS, (memlim, memlim))
if iterarg == iter:
- if type(mapper) in (types.ClassType, type):
- mappercls = type('DumboMapper', (mapper, MapRedBase), {})
- if hasattr(mappercls, 'map'):
- mapper = mappercls().map
- else:
+ if sys.argv[1].startswith('map'):
+ if type(mapper) in (types.ClassType, type):
+ mappercls = type('DumboMapper', (mapper, MapRedBase), {})
mapper = mappercls()
- if type(reducer) in (types.ClassType, type):
- reducercls = type('DumboReducer', (reducer, MapRedBase), {})
- if hasattr(reducercls, 'reduce'):
- reducer = reducercls().reduce
- else:
- reducer = reducercls()
- if type(combiner) in (types.ClassType, type):
- combinercls = type('DumboCombiner', (combiner, MapRedBase), {})
- if hasattr(combinercls, 'reduce'):
- combiner = combinercls().reduce
- else:
+ if hasattr(mapper, 'map'):
+ mapper = mapper.map
+ if hasattr(mapper, 'configure'):
+ mapconf = mapper.configure
+ if hasattr(mapper, 'close'):
+ mapclose = mapper.close
+ if type(combiner) in (types.ClassType, type):
+ combinercls = type('DumboCombiner', (combiner, MapRedBase), {})
combiner = combinercls()
- if sys.argv[1].startswith('map'):
+ if hasattr(combiner, 'reduce'):
+ combiner = combiner.reduce
+ if hasattr(combiner, 'configure'):
+ combconf = combiner.configure
+ if hasattr(combiner, 'close'):
+ combclose = combiner.close
try:
print >> sys.stderr, "INFO: consuming %s" % \
os.environ['map_input_file']
@@ -534,6 +536,8 @@ def run(mapper,
else:
outputs = itermap(inputs, mapper)
if combiner:
+ if combconf:
+ combconf()
if (not buffersize) and memlim:
buffersize = int(memlim * 0.33) / 512 # educated guess
print >> sys.stderr, 'INFO: buffersize =', buffersize
@@ -543,6 +547,8 @@ def run(mapper,
keyfunc=JoinKey.fromjoinkey)
else:
outputs = iterreduce(inputs, combiner)
+ if combclose:
+ combclose()
if os.environ.has_key('dumbo_joinkeys'):
outputs = ((jk.dump(), v) for (jk, v) in outputs)
if mapclose:
@@ -557,6 +563,15 @@ def run(mapper,
for output in dumpcode(outputs):
print '\t'.join(output)
elif reducer:
+ if type(reducer) in (types.ClassType, type):
+ reducercls = type('DumboReducer', (reducer, MapRedBase), {})
+ reducer = reducercls()
+ if hasattr(reducer, 'reduce'):
+ reducer = reducer.reduce
+ if hasattr(reducer, 'configure'):
+ redconf = reducer.configure
+ if hasattr(reducer, 'close'):
+ redclose = reducer.close
if os.environ.has_key('stream_reduce_input') and \
os.environ['stream_reduce_input'].lower() == 'typedbytes':
print >> sys.stderr, "INFO: inputting typed bytes"
View
60 dumbo/lib.py
@@ -98,22 +98,31 @@ def __new__(cls):
return object.__new__(cls)
def __init__(self):
- self._mappers = []
+ self.mappers = []
self.opts = [("addpath", "iter")]
- def itermappers(self):
- for pattern, mapper in self._mappers:
+ def configure(self):
+ mappers, closefuncs = [], []
+ for pattern, mapper in self.mappers:
if type(mapper) in (types.ClassType, type):
mappercls = type('DumboMapper', (mapper, MapRedBase), {})
+ mapper = mappercls()
if hasattr(mappercls, 'map'):
- yield (pattern, mappercls().map)
- else:
- yield (pattern, mappercls())
- else:
- yield (pattern, mapper)
+ mapper = mapper.map
+ if hasattr(mapper, 'configure'):
+ mapper.configure()
+ if hasattr(mapper, 'close'):
+ closefuncs.append(mapper.close)
+ mappers.append((pattern, mapper))
+ self.mappers = mappers
+ self.closefuncs = closefuncs
+
+ def close(self):
+ for closefunc in self.closefuncs:
+ closefunc()
def __call__normalkey(self, data):
- mappers = list(self.itermappers())
+ mappers = self.mappers
for key, value in data:
path, key = key
for pattern, mapper in mappers:
@@ -122,7 +131,7 @@ def __call__normalkey(self, data):
yield output
def __call__joinkey(self, data):
- mappers = list(self.itermappers())
+ mappers = self.mappers
for key, value in data:
path = key.body[0]
key.body = key.body[1]
@@ -132,7 +141,7 @@ def __call__joinkey(self, data):
yield output
def add(self, pattern, mapper):
- self._mappers.append((pattern, mapper))
+ self.mappers.append((pattern, mapper))
if hasattr(mapper, 'opts'):
self.opts += mapper.opts
@@ -140,18 +149,29 @@ def add(self, pattern, mapper):
class JoinMapper(object):
def __init__(self, mapper, isprimary=False):
- if type(mapper) in (types.ClassType, type):
- mappercls = type('DumboMapper', (mapper, MapRedBase), {})
- if hasattr(mappercls, 'map'):
- self.mapper = mappercls().map
- else:
- self.mapper = mappercls()
- else:
- self.mapper = mapper
+ self.mapper = mapper
self.isprimary = isprimary
self.opts = [('joinkeys', 'yes')]
- if hasattr(self.mapper, 'opts'):
+ if hasattr(mapper, 'opts'):
self.opts += self.mapper.opts
+ self.closefunc = None
+
+ def configure(self):
+ mapper = self.mapper
+ if type(mapper) in (types.ClassType, type):
+ mappercls = type('DumboMapper', (mapper, MapRedBase), {})
+ mapper = mappercls()
+ if hasattr(mapper, 'map'):
+ mapper = mapper.map
+ if hasattr(mapper, 'configure'):
+ mapper.configure()
+ if hasattr(mapper, 'close'):
+ self.closefunc = mapper.close
+ self.mapper = mapper
+
+ def close(self):
+ if self.closefunc:
+ self.closefunc()
def __call__(self, key, value):
key.isprimary = self.isprimary
Please sign in to comment.
Something went wrong with that request. Please try again.