diff --git a/streamz/sources.py b/streamz/sources.py index 2cd47545..b467f3d1 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -50,6 +50,9 @@ class from_textfile(Source): start: bool (False) Whether to start running immediately; otherwise call stream.start() explicitly. + from_end: bool (False) + Whether to begin streaming from the end of the file (i.e., only emit + lines appended after the stream starts). Example ------- @@ -63,26 +66,33 @@ class from_textfile(Source): Stream """ def __init__(self, f, poll_interval=0.100, delimiter='\n', start=False, - **kwargs): + from_end=False, **kwargs): if isinstance(f, str): f = open(f) self.file = f + self.from_end = from_end self.delimiter = delimiter self.poll_interval = poll_interval super(from_textfile, self).__init__(ensure_io_loop=True, **kwargs) self.stopped = True + self.started = False if start: self.start() def start(self): self.stopped = False + self.started = False self.loop.add_callback(self.do_poll) @gen.coroutine def do_poll(self): buffer = '' - while True: + if self.from_end: + # this only happens when we are ready to read + self.file.seek(0, 2) + while not self.stopped: + self.started = True line = self.file.read() if line: buffer = buffer + line @@ -93,8 +103,6 @@ def do_poll(self): yield self._emit(part + self.delimiter) else: yield gen.sleep(self.poll_interval) - if self.stopped: - break @Stream.register_api(staticmethod) diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 59e7f96d..3845de60 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -787,6 +787,25 @@ def test_from_file(): assert time() < start + 2 # reads within 2s +@gen_test() +def test_from_file_end(): + with tmpfile() as fn: + with open(fn, 'wt') as f: + f.write('data1\n') + f.flush() + + source = Stream.from_textfile(fn, poll_interval=0.010, + start=False, from_end=True) + out = source.sink_to_list() + source.start() + assert out == [] + yield await_for(lambda: source.started, 2, period=0.02) + + f.write('data2\n') + f.flush() + yield await_for(lambda: out == ['data2\n'], timeout=5, period=0.1) + + @gen_test() def test_filenames(): with tmpfile() as fn: