Skip to content

Commit

Permalink
Add until operator
Browse files Browse the repository at this point in the history
  • Loading branch information
vxgmichel committed Oct 4, 2019
1 parent 9d5a7c6 commit 5f68368
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 4 deletions.
5 changes: 3 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ The `stream operators`_ are separated in 7 categories:
+--------------------+---------------------------------------------------------------------------------------+
| **transformation** | map_, enumerate_, starmap_, cycle_, chunks_ |
+--------------------+---------------------------------------------------------------------------------------+
| **selection** | take_, takelast_, skip_, skiplast_, getitem_, filter_, takewhile_, dropwhile_ |
| **selection** | take_, takelast_, skip_, skiplast_, getitem_, filter_, until_, takewhile_, dropwhile_ |
+--------------------+---------------------------------------------------------------------------------------+
| **combination** | map_, zip_, merge_, chain_, ziplatest_ |
+--------------------+---------------------------------------------------------------------------------------+
Expand Down Expand Up @@ -165,8 +165,9 @@ Vincent Michel: vxgmichel@gmail.com
.. _skiplast: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.skiplast
.. _getitem: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.getitem
.. _filter: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.filter
.. _dropwhile: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.dropwhile
.. _until: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.until
.. _takewhile: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.takewhile
.. _dropwhile: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.dropwhile

.. _chain: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.chain
.. _zip: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.zip
Expand Down
27 changes: 26 additions & 1 deletion aiostream/stream/select.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ..core import operator, streamcontext

__all__ = ['take', 'takelast', 'skip', 'skiplast',
'getitem', 'filter', 'dropwhile', 'takewhile']
'getitem', 'filter', 'until', 'dropwhile', 'takewhile']


@operator(pipable=True)
Expand Down Expand Up @@ -190,10 +190,35 @@ async def filter(source, func):
yield item


@operator(pipable=True)
async def until(source, func):
"""Forward an asynchronous sequence until a condition is met.
Contrary to the ``takewhile`` operator, the last tested element is included
in the sequence.
The given function takes the item as an argument and returns a boolean
corresponding to the condition to meet. The function can either be
synchronous or asynchronous.
"""
iscorofunc = asyncio.iscoroutinefunction(func)
async with streamcontext(source) as streamer:
async for item in streamer:
result = func(item)
if iscorofunc:
result = await result
yield item
if result:
return


@operator(pipable=True)
async def takewhile(source, func):
"""Forward an asynchronous sequence while a condition is met.
Contrary to the ``until`` operator, the last tested element is not included
in the sequence.
The given function takes the item as an argument and returns a boolean
corresponding to the condition to meet. The function can either be
synchronous or asynchronous.
Expand Down
2 changes: 2 additions & 0 deletions docs/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ Selection operators

.. autoclass:: filter

.. autoclass:: until

.. autoclass:: takewhile

.. autoclass:: dropwhile
Expand Down
2 changes: 1 addition & 1 deletion docs/table.rst.inc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| **transformation** | :class:`map`, :class:`enumerate`, :class:`starmap`, :class:`cycle`, :class:`chunks` |
+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| **selection** | :class:`take`, :class:`takelast`, :class:`skip`, :class:`skiplast`, :class:`getitem`, :class:`filter`, :class:`takewhile`, :class:`dropwhile` |
| **selection** | :class:`take`, :class:`takelast`, :class:`skip`, :class:`skiplast`, :class:`getitem`, :class:`filter`, :class:`until`, :class:`takewhile`, :class:`dropwhile` |
+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| **combination** | :class:`map`, :class:`zip`, :class:`merge`, :class:`chain`, :class:`ziplatest` |
+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Expand Down
20 changes: 20 additions & 0 deletions tests/test_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,26 @@ async def afunc(x):
assert event_loop.steps == [1]*10


@pytest.mark.asyncio
async def test_until(assert_run, event_loop):
with event_loop.assert_cleanup():
xs = (stream.range(1, 10)
| add_resource.pipe(1)
| pipe.until(lambda x: x == 3))
await assert_run(xs, [1, 2, 3])

async def afunc(x):
await asyncio.sleep(1)
return x == 3

with event_loop.assert_cleanup():
xs = (stream.range(1, 10)
| add_resource.pipe(1)
| pipe.until(afunc))
await assert_run(xs, [1, 2, 3])
assert event_loop.steps == [1] * 4


@pytest.mark.asyncio
async def test_takewhile(assert_run, event_loop):
with event_loop.assert_cleanup():
Expand Down

0 comments on commit 5f68368

Please sign in to comment.