From c4da0cc3c277579e5909edd396747578e915c952 Mon Sep 17 00:00:00 2001 From: Will Mayner Date: Mon, 20 Aug 2018 18:58:39 -0500 Subject: [PATCH] Add `from_end` kwarg to `Stream.from_textfile()` Add a new keyword argument `from_end` to `Stream.from_textfile()` that allows streaming from the end of the file. This allows for streams that only emit lines that are appended to the file after `start()` is called. Addresses #199. Uses `file.seek(0, 2)` to seek to the end of the file before polling for new lines. --- streamz/sources.py | 8 +++++++- streamz/tests/test_core.py | 20 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/streamz/sources.py b/streamz/sources.py index 3f3b4eaa..06a7771b 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -51,6 +51,9 @@ class from_textfile(Source): start: bool (False) Whether to start running immediately; otherwise call stream.start() explicitly. + from_end: bool (False) + Whether to begin streaming from the end of the file (i.e., only emit + lines appended after the stream starts). Example ------- @@ -64,11 +67,12 @@ class from_textfile(Source): Stream """ def __init__(self, f, poll_interval=0.100, delimiter='\n', start=False, - **kwargs): + from_end=False, **kwargs): if isinstance(f, str): f = open(f) self.file = f self.delimiter = delimiter + self.from_end = from_end self.poll_interval = poll_interval super(from_textfile, self).__init__(ensure_io_loop=True, **kwargs) @@ -78,6 +82,8 @@ def __init__(self, f, poll_interval=0.100, delimiter='\n', start=False, def start(self): self.stopped = False + if self.from_end: + self.file.seek(0, 2) self.loop.add_callback(self.do_poll) @gen.coroutine diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index adb69c3d..39181d0e 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -785,6 +785,26 @@ def test_from_file(): yield gen.sleep(0.01) assert time() < start + 2 # reads within 2s + source = Stream.from_textfile(fn, poll_interval=0.010, + asynchronous=True, start=False, + from_end=True) + L = source.map(json.loads).pluck('x').sink_to_list() + + source.start() + + yield gen.sleep(0.10) + + assert L == [] + + f.write('{"x": 6, "y": 2}\n') + f.write('{"x": 7, "y": 2}\n') + f.flush() + + yield await_for(lambda: len(L) == 2, timeout=5) + + assert L == [6, 7] + + @gen_test() def test_filenames():