Skip to content

Commit

Permalink
Add support for read limit to esque IO (#197)
Browse files Browse the repository at this point in the history
* add support for read limit

* fix wrong help string

* fix unit test
  • Loading branch information
swenzel committed Sep 9, 2021
1 parent a008888 commit 20d97ae
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 17 deletions.
17 changes: 15 additions & 2 deletions esque/cli/commands.py
Expand Up @@ -8,7 +8,7 @@
from pathlib import Path
from shutil import copyfile
from time import sleep
from typing import List, Tuple
from typing import List, Optional, Tuple

import click
import yaml
Expand Down Expand Up @@ -37,6 +37,7 @@
from esque.controller.consumergroup_controller import ConsumerGroupController
from esque.errors import TopicAlreadyExistsException, TopicDoesNotExistException, ValidationException
from esque.io.pipeline import PipelineBuilder
from esque.io.stream_decorators import stop_after_nth_message
from esque.resources.broker import Broker
from esque.resources.consumergroup import ConsumerGroup
from esque.resources.topic import Topic, copy_to_local
Expand Down Expand Up @@ -1131,7 +1132,16 @@ def ping(state: State, times: int, wait: int):
metavar="<output_uri>",
default="pipe+json://stdout?kv__indent=2&h__skip_marker=1",
)
def io(input_uri: str, output_uri: str):
@click.option(
"-l",
"--limit",
help="Run until <limit> messages have been read. Will continue reading if the end of topic was reached."
"Stop with Ctrl-C. If not given, will read forever.",
metavar="<limit>",
default=None,
type=int,
)
def io(input_uri: str, output_uri: str, limit: Optional[int]):
"""Run a message pipeline.
Read all messages from the input configured by <input_uri> and write them to the output configured by <output_uri>.
Expand Down Expand Up @@ -1255,6 +1265,9 @@ def io(input_uri: str, output_uri: str):
builder.with_input_from_uri(input_uri)
builder.with_output_from_uri(output_uri)

if limit is not None:
builder.with_stream_decorator(stop_after_nth_message(limit))

pipeline = builder.build()

pipeline.run_pipeline()
Expand Down
32 changes: 18 additions & 14 deletions esque/io/stream_decorators.py
@@ -1,4 +1,4 @@
from typing import Iterable, TypeVar, Union
from typing import Callable, Iterable, TypeVar, Union

from esque.io.stream_events import EndOfStream, NthMessageRead, StreamEvent

Expand Down Expand Up @@ -28,22 +28,26 @@ def stop_at_temporary_end_of_stream(iterable: Iterable[Union[T, StreamEvent]]) -
break


def stop_after_nth_message(iterable: Iterable[Union[T, StreamEvent]], n: int) -> Iterable[Union[T, StreamEvent]]:
def stop_after_nth_message(n: int) -> Callable[[Iterable[Union[T, StreamEvent]]], Iterable[Union[T, StreamEvent]]]:
"""
Enables an iterator to be consumed until n messages have been read. Meant to be used with :meth:`BaseHandler.message_stream()`.
The method ignores any :class:StreamEvent objects that it encounters and only counts proper messages.
:param iterable: The iterable to be decorated
Creates a decorator that enables an iterator to be consumed until n messages have been read.
Meant to be used with :meth:`BaseHandler.message_stream()`.
The decorator ignores any :class:StreamEvent objects that it encounters and only counts proper messages.
:param n: The number of messages to consume before stopping
:return: The iterable that stops after the nth consumed message
:return: The iterable decorator which stops after the nth consumed message
"""
i = 0
for elem in iterable:
yield elem
if not isinstance(elem, StreamEvent):
i += 1
if i == n:
yield NthMessageRead(f"{n} messages have been read.")
break

def _stop_after_nth_message(iterable: Iterable[Union[T, StreamEvent]]):
i = 0
for elem in iterable:
yield elem
if not isinstance(elem, StreamEvent):
i += 1
if i == n:
yield NthMessageRead(f"{n} messages have been read.")
break

return _stop_after_nth_message


# def stop_at_message_timeout(iterable: Iterable[Union[T, StreamEvent]], message_timeout: int) -> Iterable[Union[T, StreamEvent]]:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/io/test_stream_decorators.py
Expand Up @@ -25,5 +25,5 @@ def test_stop_at_temporary_end_of_stream_with_permanent_end(
def test_reading_until_count_reached(binary_messages: List[BinaryMessage], dummy_handler: DummyHandler):
dummy_handler.set_messages(messages=binary_messages)
dummy_handler.insert_temporary_end_of_stream(1)
limit_ended_stream = stop_after_nth_message(dummy_handler.binary_message_stream(), 2)
limit_ended_stream = stop_after_nth_message(2)(dummy_handler.binary_message_stream())
assert list(skip_stream_events(limit_ended_stream)) == binary_messages[:2]

0 comments on commit 20d97ae

Please sign in to comment.