From dc0a7a3d51193a9ffccc630214f94ccc0edf670a Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 20 Mar 2019 17:25:45 -0400 Subject: [PATCH 1/2] Reimplement #201 Some effect as #201, but should now pass tests --- streamz/sources.py | 9 ++++++++- streamz/tests/test_core.py | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/streamz/sources.py b/streamz/sources.py index 2cd47545..22486e57 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -50,6 +50,9 @@ class from_textfile(Source): start: bool (False) Whether to start running immediately; otherwise call stream.start() explicitly. + from_end: bool (False) + Whether to begin streaming from the end of the file (i.e., only emit + lines appended after the stream starts). Example ------- @@ -63,10 +66,11 @@ class from_textfile(Source): Stream """ def __init__(self, f, poll_interval=0.100, delimiter='\n', start=False, - **kwargs): + from_end=False, **kwargs): if isinstance(f, str): f = open(f) self.file = f + self.from_end = from_end self.delimiter = delimiter self.poll_interval = poll_interval @@ -82,6 +86,9 @@ def start(self): @gen.coroutine def do_poll(self): buffer = '' + if self.from_end: + # this only happens when we are ready to read + self.file.seek(0, 2) while True: line = self.file.read() if line: diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 59e7f96d..11db1ca7 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -787,6 +787,25 @@ def test_from_file(): assert time() < start + 2 # reads within 2s +@gen_test() +def test_from_file_end(): + with tmpfile() as fn: + with open(fn, 'wt') as f: + f.write('data1\n') + f.flush() + + source = Stream.from_textfile(fn, poll_interval=0.010, + start=False, from_end=True) + out = source.sink_to_list() + source.start() + assert out == [] + yield gen.sleep(0.01) + + f.write('data2\n') + f.flush() + yield await_for(lambda: out == ['data2\n'], timeout=5, period=0.1) + + @gen_test() def test_filenames(): with tmpfile() as fn: From af2a8002c6e645aa0d157a7b2548509ad6a11242 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 21 Mar 2019 10:08:00 -0400 Subject: [PATCH 2/2] await_for instead of sleep in test_from_file_end --- streamz/sources.py | 7 ++++--- streamz/tests/test_core.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/streamz/sources.py b/streamz/sources.py index 22486e57..b467f3d1 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -76,11 +76,13 @@ def __init__(self, f, poll_interval=0.100, delimiter='\n', start=False, self.poll_interval = poll_interval super(from_textfile, self).__init__(ensure_io_loop=True, **kwargs) self.stopped = True + self.started = False if start: self.start() def start(self): self.stopped = False + self.started = False self.loop.add_callback(self.do_poll) @gen.coroutine @@ -89,7 +91,8 @@ def do_poll(self): if self.from_end: # this only happens when we are ready to read self.file.seek(0, 2) - while True: + while not self.stopped: + self.started = True line = self.file.read() if line: buffer = buffer + line @@ -100,8 +103,6 @@ def do_poll(self): yield self._emit(part + self.delimiter) else: yield gen.sleep(self.poll_interval) - if self.stopped: - break @Stream.register_api(staticmethod) diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 11db1ca7..3845de60 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -799,7 +799,7 @@ def test_from_file_end(): out = source.sink_to_list() source.start() assert out == [] - yield gen.sleep(0.01) + yield await_for(lambda: source.started, 2, period=0.02) f.write('data2\n') f.flush()