Permalink
Browse files

GH-32: Add generic reducer to output raw files

- It is named RawReducer
- Handles outputing to single or multiples files (requires feathers)
- Include working examples to generate tokyo-cabinets, constant dbs, and jsonlines
- Last but not least, unittests!
  • Loading branch information...
1 parent 1ff5fef commit 4feb7d73c2e669117779790632ad0aa92ea2ffb0 @dangra dangra committed Feb 21, 2011
View
@@ -0,0 +1,32 @@
+import os
+from tempfile import mkstemp
+import cdb
+
+from .rawreducer import RawReducer, chunkedread, CHUNKSIZE
+
+
+class CDBFactory(object):
+ """A RawReducer factory suitable to generate constant dbs from dumbo jobs
+
+ For more info on constant dbs see http://cr.yp.to/cdb.html
+ """
+ chunksize = CHUNKSIZE
+
+ def __init__(self):
+ fd, self.fn = mkstemp('.cdb', dir=os.getcwd())
+ os.close(fd)
+ self.maker = cdb.cdbmake(self.fn, self.fn + '.tmp')
+
+ def __call__(self, key, values):
+ for value in values:
+ self.maker.add(key, value)
+
+ def close(self):
+ self.maker.finish()
+ for chk in chunkedread(self.fn, chunksize=self.chunksize):
+ yield chk
+ os.unlink(self.fn)
+
+
+class CDBReducer(RawReducer):
+ factory = CDBFactory
@@ -0,0 +1,12 @@
+try:
+ import json
+except ImportError:
+ import simplejson as json
+
+from .rawreducer import RawReducer
+
+
+class JsonLinesReducer(RawReducer):
+
+ def factory(self):
+ return lambda key, values: (json.dumps(v) + '\n' for v in values)
View
@@ -0,0 +1,72 @@
+"""A reducer base class to output one or multiple files in its raw fileformat"""
+from itertools import groupby
+
+
+class RawReducer(object):
+ """Reducer to generate outputs in raw file format"""
+
+ multipleoutput = False
+ singleopts = [
+ ('outputformat', 'raw'),
+ ]
+ multipleopts = [
+ ('getpath', 'yes'),
+ ('outputformat', 'raw'),
+ ('partitioner', 'fm.last.feathers.partition.Prefix'),
+ ('jobconf', 'feathers.output.filename.strippart=true'),
+ ]
+
+ def __init__(self, factory=None, multipleoutput=None):
+ if factory:
+ self.factory = factory
+ if multipleoutput is not None:
+ self.multipleoutput = multipleoutput
+ self.opts = self.multipleopts if self.multipleoutput else self.singleopts
+
+ def __call__(self, data):
+ if not self.multipleoutput:
+ data = (((None, key), values) for key, values in data)
+
+ proc = self.factory()
+ for path, group in groupby(data, lambda x:x[0][0]):
+ proc = self.factory()
+ for (_, key), values in group:
+ for chk in proc(key, values) or ():
+ yield path, chk
+
+ close = getattr(proc, 'close', tuple)
+ for chk in close() or ():
+ yield path, chk
+
+ def factory(self):
+ """Processor factory used to consume reducer input (one per path on multiple outputs)
+
+ Must return a callable (aka processor) that accepts two parameters
+ "key" and "values", and returns an iterable of strings or None.
+
+ The processor may have a close() method that returns an iterable of
+ strings or None. This method is called when the last key-values pair
+ for a path is seen.
+
+ """
+ return lambda key, values: values
+
+CHUNKSIZE = 2*1024*1024 # default chunk size to read a file
+def chunkedread(filename_or_fileobj, chunksize=CHUNKSIZE):
+ """Returns a generator that reads a file in chunks"""
+ if hasattr(filename_or_fileobj, 'read'):
+ fileobj = filename_or_fileobj
+ needclose = False
+ else:
+ fileobj = open(filename_or_fileobj, 'rb')
+ needclose = True
+
+ try:
+ content = fileobj.read(chunksize)
+ while content:
+ yield content
+ content = fileobj.read(chunksize)
+ finally:
+ if needclose:
+ fileobj.close()
+
@@ -0,0 +1,36 @@
+import os
+from tempfile import mkstemp
+from tokyo.cabinet import HDB, HDBOWRITER, HDBOCREAT
+
+from .rawreducer import RawReducer, chunkedread, CHUNKSIZE
+
+
+class TokyoCabinetFactory(object):
+ """A RawReducer factory suitable to generate tokyocabinets from dumbo jobs"""
+
+ dbcls = HDB
+ flags = HDBOWRITER | HDBOCREAT
+ methodname = 'putasync'
+ chunksize = CHUNKSIZE
+
+ def __init__(self):
+ fd, self.fn = mkstemp('.db', 'tc-', os.getcwd())
+ os.close(fd)
+ self.db = self.dbcls()
+ self.db.setxmsiz(0)
+ self.db.open(self.fn, self.flags)
+ self.add = getattr(self.db, self.methodname)
+
+ def __call__(self, key, values):
+ for value in values:
+ self.add(key, value)
+
+ def close(self):
+ self.db.close()
+ for chk in chunkedread(self.fn, chunksize=self.chunksize):
+ yield chk
+ os.unlink(self.fn)
+
+
+class TokyoCabinetReducer(RawReducer):
+ factory = TokyoCabinetFactory
View
@@ -0,0 +1,41 @@
+import os
+import unittest
+from tempfile import mkstemp
+import cdb
+
+from dumbo.lib.cdbreducer import CDBReducer, CDBFactory
+
+
+class CDBTestCase(unittest.TestCase):
+
+ def test_default(self):
+ proc = CDBFactory()
+ self.assertEqual(proc('k1', ['v1']), None)
+ self.assertEqual(proc('k2', ['v2', 'v3']), None)
+ chunks = proc.close()
+ fn = mkstemp()[1]
+ fo = open(fn, 'wb')
+ for chk in chunks:
+ self.assertTrue(len(chk) <= proc.chunksize)
+ fo.write(chk)
+ fo.close()
+
+ db = cdb.init(fn)
+ self.assertEqual([(k, db[k]) for k in db.keys()],
+ [('k1', 'v1'), ('k2', 'v2')])
+ os.remove(fn)
+
+ def test_reducer(self):
+ red = CDBReducer()
+ output = red(zip('abcde', '12345'))
+
+ fn = mkstemp()[1]
+ fo = open(fn, 'wb')
+ fo.writelines(v for k, v in output)
+ fo.close()
+
+ db = cdb.init(fn)
+ self.assertEqual([(k, db[k]) for k in db.keys()],
+ [('a', '1'), ('b', '2'), ('c', '3'), ('d', '4'), ('e', '5')])
+ os.remove(fn)
+
View
@@ -0,0 +1,109 @@
+import os
+import unittest
+from tempfile import mkstemp
+from cStringIO import StringIO
+from dumbo.lib.rawreducer import RawReducer, chunkedread
+
+
+DATA = [('k1', ['v1a', 'v1b']), ('k2', ['v2c']), ('k3', ['v3d', 'v3e', 'v3f'])]
+MULTIDATA = sorted(((str(i), k), [v]) for k, vs in DATA for i, v in enumerate(vs))
+
+
+class RawReducerTestCase(unittest.TestCase):
+
+ def test_default_factory(self):
+ red = RawReducer()
+ self.assertEqual(list(red(iter(DATA))),
+ [(None, 'v1a'), (None, 'v1b'), (None, 'v2c'),
+ (None, 'v3d'), (None, 'v3e'), (None, 'v3f')])
+
+ red = RawReducer(multipleoutput=True)
+ self.assertEqual(list(red(iter(MULTIDATA))),
+ [('0', 'v1a'), ('0', 'v2c'), ('0', 'v3d'),
+ ('1', 'v1b'), ('1', 'v3e'), ('2', 'v3f')])
+
+ def test_custom_factory(self):
+ def first_value_factory():
+ return lambda k, v: [v[0]]
+
+ red = RawReducer(first_value_factory)
+ self.assertEqual(list(red(iter(DATA))),
+ [(None, 'v1a'), (None, 'v2c'), (None, 'v3d')])
+
+ red = RawReducer(first_value_factory, multipleoutput=True)
+ self.assertEqual(list(red(iter(MULTIDATA))),
+ [('0', 'v1a'), ('0', 'v2c'), ('0', 'v3d'),
+ ('1', 'v1b'), ('1', 'v3e'), ('2', 'v3f')])
+
+ def test_custom_factory_with_close(self):
+ class CloseFactory(object):
+ def __init__(self):
+ self.items = []
+
+ def __call__(self, key, values):
+ self.items.extend(values)
+
+ def close(self):
+ return self.items
+
+ red = RawReducer(CloseFactory)
+ self.assertEqual(list(red(iter(DATA))),
+ [(None, 'v1a'), (None, 'v1b'), (None, 'v2c'),
+ (None, 'v3d'), (None, 'v3e'), (None, 'v3f')])
+
+ red = RawReducer(CloseFactory, multipleoutput=True)
+ self.assertEqual(list(red(iter(MULTIDATA))),
+ [('0', 'v1a'), ('0', 'v2c'), ('0', 'v3d'),
+ ('1', 'v1b'), ('1', 'v3e'), ('2', 'v3f')])
+
+ def test_extending_rawreducer_class(self):
+ class DummyFactory(object):
+ def __call__(self, key, values):
+ yield key
+
+ class DummyReducer(RawReducer):
+ factory = DummyFactory
+
+ red = DummyReducer()
+ self.assertEqual(list(red(iter(DATA))),
+ [(None, 'k1'), (None, 'k2'), (None, 'k3')])
+
+ red = DummyReducer(multipleoutput=True)
+ self.assertEqual(list(red(iter(MULTIDATA))),
+ [('0', 'k1'), ('0', 'k2'), ('0', 'k3'),
+ ('1', 'k1'), ('1', 'k3'), ('2', 'k3')])
+
+ class MultiDummyReducer(RawReducer):
+ factory = DummyFactory
+ multipleoutput = True
+
+ red = MultiDummyReducer()
+ self.assertEqual(list(red(iter(MULTIDATA))),
+ [('0', 'k1'), ('0', 'k2'), ('0', 'k3'),
+ ('1', 'k1'), ('1', 'k3'), ('2', 'k3')])
+
+
+class ChunkedReadTestCase(unittest.TestCase):
+
+ def test_chunkedread_on_fileobject(self):
+ fo = StringIO('one\nbig\nchunk\nof\ndata\n')
+ chunks = chunkedread(fo, chunksize=10)
+ self.assertEqual(chunks.next(), 'one\nbig\nch')
+ self.assertEqual(chunks.next(), 'unk\nof\ndat')
+ self.assertEqual(chunks.next(), 'a\n')
+ self.assertRaises(StopIteration, chunks.next)
+ fo.close()
+
+ def test_chunkedread_on_filename(self):
+ fn = mkstemp()[1]
+ try:
+ fo = open(fn, 'wb')
+ fo.write('one\nbig\nchunk\nof\ndata\n')
+ fo.close()
+ chunks = chunkedread(fn, chunksize=10)
+ self.assertEqual(chunks.next(), 'one\nbig\nch')
+ self.assertEqual(chunks.next(), 'unk\nof\ndat')
+ self.assertEqual(chunks.next(), 'a\n')
+ self.assertRaises(StopIteration, chunks.next)
+ finally:
+ os.unlink(fn)
@@ -0,0 +1,68 @@
+import unittest
+import os
+from tempfile import mkstemp
+from tokyo.cabinet import HDB, HDBOREADER, BDB, BDBOREADER, BDBOWRITER, BDBOCREAT
+
+from dumbo.lib.tokyocabinetreducer import TokyoCabinetReducer, TokyoCabinetFactory
+
+
+class TokyoCabinetTestCase(unittest.TestCase):
+
+ def test_default(self):
+ proc = TokyoCabinetFactory()
+ self.assertEqual(proc('k1', ['v1']), None)
+ self.assertEqual(proc('k2', ['v2', 'v3']), None)
+ chunks = proc.close()
+ fn = mkstemp()[1]
+ fo = open(fn, 'wb')
+ for chk in chunks:
+ self.assertTrue(len(chk) <= proc.chunksize)
+ fo.write(chk)
+ fo.close()
+
+ db = HDB()
+ db.open(fn, HDBOREADER)
+ self.assertEqual(list(db.iteritems()), [('k1', 'v1'), ('k2', 'v3')])
+ db.close()
+ os.remove(fn)
+
+ def test_extended(self):
+ class BDBFactory(TokyoCabinetFactory):
+ dbcls = BDB
+ flags = BDBOWRITER | BDBOCREAT
+ methodname = 'addint'
+ chunksize = 10 # very small
+
+ proc = BDBFactory()
+ self.assertEqual(proc('k1', [2]), None)
+ self.assertEqual(proc('k2', [3, 6]), None)
+ chunks = proc.close()
+ fn = mkstemp()[1]
+ fo = open(fn, 'wb')
+ for chk in chunks:
+ self.assertTrue(len(chk) <= 10)
+ fo.write(chk)
+ fo.close()
+
+ db = BDB()
+ db.open(fn, BDBOWRITER)
+ self.assertEqual(len(db), 2)
+ self.assertEqual(db.addint('k1', 0), 2)
+ self.assertEqual(db.addint('k2', 0), 9)
+ db.close()
+ os.remove(fn)
+
+ def test_reducer(self):
+ red = TokyoCabinetReducer()
+ output = red(zip('abcde', '12345'))
+
+ fn = mkstemp()[1]
+ fo = open(fn, 'wb')
+ fo.writelines(v for k, v in output)
+ fo.close()
+ db = HDB()
+ db.open(fn, HDBOREADER)
+ self.assertEqual(list(db.iteritems()),
+ [('a', '1'), ('b', '2'), ('c', '3'), ('d', '4'), ('e', '5')])
+ db.close()
+ os.remove(fn)

0 comments on commit 4feb7d7

Please sign in to comment.