diff --git a/setup.py b/setup.py index edd7301..b2b0c93 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='wimpy', - version='0.3', + version='0.4', description='Anti-copy-pasta', url='https://github.com/wimglenn/wimpy', author='Wim Glenn', diff --git a/tests/test_util.py b/tests/test_util.py index c1d96ff..0b41b6f 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -5,9 +5,11 @@ from wimpy import cached_property from wimpy import ceiling_division +from wimpy import chunks from wimpy import grouper from wimpy import strip_prefix from wimpy import strip_suffix +from wimpy import WimpyError from wimpy import working_directory @@ -76,3 +78,33 @@ def test_ceiling_div(numerator, denominator, result): ]) def test_grouper(iterable, n, fillvalue, result): assert list(grouper(iterable, n, fillvalue)) == result + + +@pytest.mark.parametrize('iterable,chunk_size,overlap,result', [ + ('1234567', 3, 0, [('1', '2', '3'), ('4', '5', '6'), ('7',)]), + ('123456', 3, 0, [('1', '2', '3'), ('4', '5', '6')]), + ('123456', 4, 2, [('1', '2', '3', '4'), ('3', '4', '5', '6')]), + ('', 3, 0, []), +]) +def test_chunks(iterable, chunk_size, overlap, result): + assert list(chunks(iterable, chunk_size, overlap)) == result + + +def test_chunks_doesnt_get_stuck_due_to_small_chunk_size(): + gen = chunks('123456', chunk_size=0) + with pytest.raises(WimpyError, match='chunk size too small'): + next(gen) + + +def test_chunks_doesnt_get_stuck_due_to_big_overlap(): + gen = chunks('123456', chunk_size=3, overlap=3) + with pytest.raises(WimpyError, match='overlap too large'): + next(gen) + + +def test_chunks_from_infinite_generator(): + gen = iter(int, 1) + g = chunks(gen, chunk_size=5) + assert next(g) == (0, 0, 0, 0, 0) + assert next(g) == (0, 0, 0, 0, 0) + assert next(g) == (0, 0, 0, 0, 0) diff --git a/wimpy/__init__.py b/wimpy/__init__.py index 86e0c29..8409461 100644 --- a/wimpy/__init__.py +++ b/wimpy/__init__.py @@ -1 +1,4 @@ +from wimpy.exceptions import WimpyError from wimpy.util import * + +__version__ = '0.4' diff --git a/wimpy/compat.py b/wimpy/compat.py index f1c306a..c7b18ea 100644 --- a/wimpy/compat.py +++ b/wimpy/compat.py @@ -2,3 +2,8 @@ from itertools import zip_longest except ImportError: from itertools import izip_longest as zip_longest + +try: + range = xrange +except NameError: + range = range diff --git a/wimpy/exceptions.py b/wimpy/exceptions.py new file mode 100644 index 0000000..789206e --- /dev/null +++ b/wimpy/exceptions.py @@ -0,0 +1,2 @@ +class WimpyError(Exception): + """base for any errors explicitly raised by this package""" diff --git a/wimpy/util.py b/wimpy/util.py index 8a03cd1..7a1fe91 100644 --- a/wimpy/util.py +++ b/wimpy/util.py @@ -1,11 +1,14 @@ import os +from collections import deque from contextlib import contextmanager from functools import update_wrapper +from wimpy.compat import range from wimpy.compat import zip_longest +from wimpy.exceptions import WimpyError -__all__ = ['cached_property', 'working_directory', 'strip_prefix', 'strip_suffix', 'ceiling_division', 'grouper'] +__all__ = ['cached_property', 'working_directory', 'strip_prefix', 'strip_suffix', 'ceiling_division', 'grouper', 'chunks'] class cached_property(object): @@ -60,3 +63,28 @@ def grouper(iterable, n, fillvalue=None): """Yield successive non-overlapping chunks of size n from iterable""" args = [iter(iterable)] * n return zip_longest(*args, fillvalue=fillvalue) + + +def chunks(iterable, chunk_size=3, overlap=0): + # we'll use a deque to hold the values because it automatically + # discards any extraneous elements if it grows too large + if chunk_size < 1: + raise WimpyError("chunk size too small") + if overlap >= chunk_size: + raise WimpyError("overlap too large") + queue = deque(maxlen=chunk_size) + it = iter(iterable) + i = 0 + try: + # start by filling the queue with the first group + for i in range(chunk_size): + queue.append(next(it)) + while True: + yield tuple(queue) + # after yielding a chunk, get enough elements for the next chunk + for i in range(chunk_size - overlap): + queue.append(next(it)) + except StopIteration: + # if the iterator is exhausted, yield any remaining elements + if i > 0: + yield tuple(queue)[-i-overlap:]