Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #61 from mshevelev/cleanup

Fix #60 Added cleanup functionality
  • Loading branch information...
commit beab85c5226853e8af237230a4717edd2bf3c617 2 parents 82b0dd2 + 4f7b037
@klbostee authored
Showing with 22 additions and 1 deletion.
  1. +22 −1 dumbo/core.py
View
23 dumbo/core.py
@@ -19,7 +19,7 @@
import types
import resource
import copy
-from itertools import groupby
+from itertools import groupby, chain
from operator import itemgetter
from dumbo.backends import get_backend
@@ -221,6 +221,9 @@ def run(mapper,
mapclose=None,
redclose=None,
combclose=None,
+ mapcleanup=None,
+ redcleanup=None,
+ combcleanup=None,
opts=None,
input=None,
output=None,
@@ -249,6 +252,8 @@ def run(mapper,
mapclose = mapper.close
if hasattr(mapper, 'map'):
mapper = mapper.map
+ if hasattr(mapper, 'cleanup'):
+ mapcleanup = mapper.cleanup
if type(combiner) in (types.ClassType, type):
combinercls = type('DumboCombiner', (combiner, mrbase_class), {})
combiner = combinercls()
@@ -258,6 +263,8 @@ def run(mapper,
combclose = combiner.close
if hasattr(combiner, 'reduce'):
combiner = combiner.reduce
+ if hasattr(combiner, 'cleanup'):
+ combcleanup = combiner.cleanup
try:
print >> sys.stderr, "INFO: consuming %s" % \
os.environ['map_input_file']
@@ -280,6 +287,7 @@ def run(mapper,
inputs = (((path, k), v) for (k, v) in inputs)
if os.environ.has_key('dumbo_joinkeys'):
inputs = ((jk_class(k), v) for (k, v) in inputs)
+
if os.environ.has_key('dumbo_parser'):
parser = os.environ['dumbo_parser']
clsname = parser.split('.')[-1]
@@ -300,6 +308,10 @@ def run(mapper,
outputs = itermap(inputs, mapper, lambda v: set(*v))
else:
outputs = itermap(inputs, mapper)
+ if mapcleanup:
+ outputs = chain(outputs, mapcleanup())
+
+ # Combiner
if combiner and type(combiner) != str:
if (not buffersize) and memlim:
buffersize = int(memlim * 0.33) / 512 # educated guess
@@ -312,6 +324,9 @@ def run(mapper,
outputs = iterreduce(inputs, combiner)
if os.environ.has_key('dumbo_joinkeys'):
outputs = ((jk.dump(), v) for (jk, v) in outputs)
+ if combcleanup:
+ outputs = chain(outputs, combcleanup())
+
if os.environ.has_key('stream_map_output') and \
os.environ['stream_map_output'].lower() == 'typedbytes':
print >> sys.stderr, "INFO: outputting typed bytes"
@@ -325,7 +340,9 @@ def run(mapper,
combclose()
if mapclose:
mapclose()
+
elif reducer:
+ # Reducer
if type(reducer) in (types.ClassType, type):
reducercls = type('DumboReducer', (reducer, mrbase_class), {})
reducer = reducercls()
@@ -335,6 +352,8 @@ def run(mapper,
redclose = reducer.close
if hasattr(reducer, 'reduce'):
reducer = reducer.reduce
+ if hasattr(reducer, 'cleanup'):
+ redcleanup = reducer.cleanup
if os.environ.has_key('stream_reduce_input') and \
os.environ['stream_reduce_input'].lower() == 'typedbytes':
print >> sys.stderr, "INFO: inputting typed bytes"
@@ -351,6 +370,8 @@ def run(mapper,
outputs = ((jk.body, v) for (jk, v) in outputs)
else:
outputs = iterreduce(inputs, reducer)
+ if redcleanup:
+ outputs = chain(outputs, redcleanup())
if os.environ.has_key('stream_reduce_output') and \
os.environ['stream_reduce_output'].lower() == 'typedbytes':
print >> sys.stderr, "INFO: outputting typed bytes"
Please sign in to comment.
Something went wrong with that request. Please try again.