Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Closes #31

  • Loading branch information...
commit c3648cbdb3f9b6f0b39af66116ff2164aee277eb 1 parent d5551db
@klbostee authored
Showing with 65 additions and 23 deletions.
  1. +30 −0 dumbo/decor.py
  2. +19 −3 dumbo/lib.py
  3. +16 −20 examples/join.py
View
30 dumbo/decor.py
@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from copy import copy
+
class opt(object):
@@ -26,3 +28,31 @@ def __call__(self, func):
else:
func.opts = [self.opt]
return func
+
+
+class joinmapper(object):
+
+ def __init__(self, isprimary):
+ self.isprimary = isprimary
+
+ def __call__(self, mapper):
+ if len(mapper.func_code.co_varnames) != 2:
+ raise TypeError('joinmapper has to take two arguments')
+ isprimary = self.isprimary # avoid additional lookups
+ def wrapper(key, value):
+ key.isprimary = isprimary
+ for k, v in mapper(key.body, value):
+ jk = copy(key)
+ jk.body = k
+ yield jk, v
+ wrapper.opts = [('joinkeys', 'yes')]
+ if hasattr(mapper, 'opts'):
+ wrapper.opts += mapper.opts
+ return wrapper
+
+
+def primary(mapper):
+ return joinmapper(isprimary=True)(mapper)
+
+def secondary(mapper):
+ return joinmapper(isprimary=False)(mapper)
View
22 dumbo/lib.py
@@ -19,6 +19,7 @@
import types
from itertools import chain, imap, izip
from math import sqrt
+from copy import copy
from dumbo.core import MapRedBase
@@ -89,8 +90,6 @@ def statscombiner(key, values):
class MultiMapper(object):
- opts = [("addpath", "yes")]
-
def __new__(cls):
if os.environ.get("dumbo_joinkeys", "no") == "yes":
cls.__call__ = cls.__call__joinkey
@@ -100,6 +99,7 @@ def __new__(cls):
def __init__(self):
self._mappers = []
+ self.opts = [("addpath", "yes")]
def itermappers(self):
for pattern, mapper in self._mappers:
@@ -132,4 +132,20 @@ def __call__joinkey(self, data):
yield output
def add(self, pattern, mapper):
- self._mappers.append((pattern, mapper))
+ self._mappers.append((pattern, mapper))
+ if hasattr(mapper, 'opts'):
+ self.opts += mapper.opts
+
+
+class JoinReducer(object):
+
+ opts = [("joinkeys", "yes")]
+
+ def __call__(self, key, values):
+ if key.isprimary:
+ self.primary(key.body, values)
+ else:
+ for k, v in self.secondary(key.body, values):
+ jk = copy(key)
+ jk.body = k
+ yield jk, v
View
36 examples/join.py
@@ -2,22 +2,20 @@
Joins hostnames with logs and counts number of logs per host.
"""
-def mapper1(key, value):
- key.isprimary = "hostnames" in key.body[0]
- key.body = key.body[1]
- yield key, value
-
-class Reducer1:
+import dumbo
+from dumbo.lib import identitymapper, JoinReducer
+from dumbo.decor import primary, secondary
+
+class Reducer1(JoinReducer):
def __init__(self):
self.hostname = "unknown"
- def __call__(self, key, values):
- if key.isprimary:
- self.hostname = values.next()
- else:
- key.body = self.hostname
- for value in values:
- yield key, value
- self.hostname = "unknown"
+ def primary(self, key, values):
+ self.hostname = values.next()
+ def secondary(self, key, values):
+ key = self.hostname
+ self.hostname = "unknown"
+ for value in values:
+ yield key, value
def mapper2(key, value):
yield key, 1
@@ -26,13 +24,11 @@ def reducer2(key, values):
yield key, sum(values)
def runner(job):
+ mapper1 = dumbo.MultiMapper()
+ mapper1.add("hostnames", primary(identitymapper))
+ mapper1.add("logs", secondary(identitymapper))
job.additer(mapper1, Reducer1)
job.additer(mapper2, reducer2, combiner=reducer2)
-
-def starter(prog):
- prog.addopt("addpath", "yes")
- prog.addopt("joinkeys", "yes")
if __name__ == "__main__":
- import dumbo
- dumbo.main(runner, starter)
+ dumbo.main(runner)
Please sign in to comment.
Something went wrong with that request. Please try again.