Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch too many or too few outputs with mapredtest. #73

Merged
merged 1 commit into from
Sep 4, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 18 additions & 27 deletions dumbo/mapredtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,26 @@
import sys
import os
import inspect
from itertools import imap
from itertools import imap, izip_longest, ifilter

from dumbo.core import itermap, iterreduce, itermapred
from dumbo.backends.common import MapRedBase

__all__ = ['MapDriver', 'ReduceDriver', 'MapReduceDriver']

def assert_iters_equal(expected, actual):
""":Raise AssertionError: If the elements of iterators `expected` and `actual`
are not equal (or one has more elements than the other)."""
sentinel = object()
expdiff, actdiff = next(ifilter(lambda x: cmp(*x), izip_longest(iter(expected), iter(actual), fillvalue=sentinel)), (None, None))
if expdiff == sentinel:
raise AssertionError("expected sequence exhausted before actual at element {0}".format(actdiff))
elif actdiff == sentinel:
raise AssertionError("actual sequence exhausted before expected at element {0}".format(expdiff))
elif expdiff != actdiff:
raise AssertionError("Element {0} did not match expected output: {1}".format(actdiff, expdiff))


class BaseDriver(object):
"""A Generic test driver that passes
input stream through a callable and
Expand Down Expand Up @@ -82,12 +95,7 @@ def with_output(self, output_source):

def run(self):
"""Run test"""
for output in imap(self._func, self._input_source):
exp_out = self._output_source.next()
assert output == exp_out, \
"Output {0} did not match expected output: {1}".format(\
output, exp_out)

assert_iters_equal(self._output_source, imap(self._func, self._input_source))

def _instrument_class(self, cls):
"""Instrument a class for use with dumbo mapreduce tests"""
Expand All @@ -104,12 +112,7 @@ def mapper(self):

def run(self):
"""Run test"""
it = itermap(self._input_source, self._callable)
for output in it:
exp_out = self._output_source.next()
assert output == exp_out, \
"Output {0} did not match expected output: {1}".format(\
output, exp_out)
assert_iters_equal(self._output_source, itermap(self._input_source, self._callable))


class ReduceDriver(BaseDriver):
Expand All @@ -121,12 +124,7 @@ def reducer(self):

def run(self):
"""Run test"""
it = iterreduce(self._input_source, self._callable)
for output in it:
exp_out = self._output_source.next()
assert output == exp_out, \
"Output {0} did not match expected output: {1}".format(\
output, exp_out)
assert_iters_equal(self._output_source, iterreduce(self._input_source, self._callable))


class MapReduceDriver(BaseDriver):
Expand Down Expand Up @@ -157,11 +155,4 @@ def reducer(self):

def run(self):
"""Run test"""
it = itermapred(self._input_source, self._mapper, self._reducer)
for output in it:
exp_out = self._output_source.next()
assert output == exp_out, \
"Output {0} did not match expected output: {1}".format(\
output, exp_out)


assert_iters_equal(self._output_source, itermapred(self._input_source, self._mapper, self._reducer))
15 changes: 9 additions & 6 deletions tests/testmapredtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,8 @@ def testreducer_with_params(self):
(1, "hello")
]
#each 3 map calls will yield with both params
output = [('foo', 'bar'),
('one', '1'),('foo', 'bar'),
('one', '1'),('foo', 'bar'),
('one', '1')]
params = [('foo', 'bar'),
('one', '1')]
output = [('foo', 'bar'), ('one', '1')] * 2
params = [('foo', 'bar'), ('one', '1')]
ReduceDriver(reducer_with_params).with_params(params).with_input(input).with_output(output).run()

def testreducer(self):
Expand All @@ -66,6 +62,13 @@ def testmapreduce(self):
output = [('hello', 1), ('me', 1), ('test', 2)]
MapReduceDriver(mapper, reducer).with_input(input).with_output(output).run()

def test_toomany(self):
input_ = [(0, 'a b c')]
output = [('a', 1), ('b', 1)]
self.assertRaises(AssertionError, MapDriver(mapper).with_input(input_).with_output(output).run)
output = [('a', 1), ('b', 1), ('c', 1), ('d', 1)]
self.assertRaises(AssertionError, MapDriver(mapper).with_input(input_).with_output(output).run)

if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(MRTestCase)
unittest.TextTestRunner(verbosity=2).run(suite)
Expand Down