diff --git a/docs/source/api.rst b/docs/source/api.rst index 2452e278..fc9d9f6c 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -41,6 +41,7 @@ Sources filenames from_kafka from_textfile + from_socket DaskStream ---------- diff --git a/streamz/sources.py b/streamz/sources.py index 2cd47545..d72fd4f5 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -147,6 +147,142 @@ def do_poll(self): break +@Stream.register_api(staticmethod) +class from_tcp(Source): + """ + Creates events by reading from a socket using tornado TCPServer + + The stream of incoming bytes is split on a given delimiter, and the parts + become the emitted events. + + Parameters + ---------- + port : int + The port to open and listen on. It only gets opened when the source + is started, and closed upon ``stop()`` + delimiter : bytes + The incoming data will be split on this value. The resulting events + will still have the delimiter at the end. + start : bool + Whether to immediately initiate the source. You probably want to + set up downstream nodes first. + server_kwargs : dict or None + If given, additional arguments to pass to TCPServer + + Example + ------- + + >>> source = Source.from_tcp(4567) # doctest: +SKIP + """ + def __init__(self, port, delimiter=b'\n', start=False, + server_kwargs=None): + super(from_tcp, self).__init__(ensure_io_loop=True) + self.stopped = True + self.server_kwargs = server_kwargs or {} + self.port = port + self.server = None + self.delimiter = delimiter + if start: # pragma: no cover + self.start() + + @gen.coroutine + def _start_server(self): + from tornado.tcpserver import TCPServer + from tornado.iostream import StreamClosedError + + class EmitServer(TCPServer): + source = self + + @gen.coroutine + def handle_stream(self, stream, address): + while True: + try: + data = yield stream.read_until(self.source.delimiter) + yield self.source._emit(data) + except StreamClosedError: + break + + self.server = EmitServer(**self.server_kwargs) + self.server.listen(self.port) + + def start(self): + if self.stopped: + self.loop.add_callback(self._start_server) + self.stopped = False + + def stop(self): + if not self.stopped: + self.server.stop() + self.server = None + self.stopped = True + + +@Stream.register_api(staticmethod) +class from_http_server(Source): + """Listen for HTTP POSTs on given port + + Each connection will emit one event, containing the body data of + the request + + Parameters + ---------- + port : int + The port to listen on + path : str + Specific path to listen on. Can be regex, but content is not used. + start : bool + Whether to immediately startup the server. Usually you want to connect + downstream nodes first, and then call ``.start()``. + server_kwargs : dict or None + If given, set of further parameters to pass on to HTTPServer + + Example + ------- + >>> source = Source.from_http_server(4567) # doctest: +SKIP + """ + + def __init__(self, port, path='/.*', start=False, server_kwargs=None): + self.port = port + self.path = path + self.server_kwargs = server_kwargs or {} + super(from_http_server, self).__init__(ensure_io_loop=True) + self.stopped = True + self.server = None + if start: # pragma: no cover + self.start() + + def _start_server(self): + from tornado.web import Application, RequestHandler + from tornado.httpserver import HTTPServer + + class Handler(RequestHandler): + source = self + + @gen.coroutine + def post(self): + yield self.source._emit(self.request.body) + self.write('OK') + + application = Application([ + (self.path, Handler), + ]) + self.server = HTTPServer(application, **self.server_kwargs) + self.server.listen(self.port) + + def start(self): + """Start HTTP server and listen""" + if self.stopped: + self.loop.add_callback(self._start_server) + self.stopped = False + + def stop(self): + """Shutdown HTTP server""" + if not self.stopped: + self.server.stop() + self.server = None + self.stopped = True + + @Stream.register_api(staticmethod) class from_kafka(Source): """ Accepts messages from Kafka diff --git a/streamz/tests/test_sources.py b/streamz/tests/test_sources.py new file mode 100644 index 00000000..f51dc38a --- /dev/null +++ b/streamz/tests/test_sources.py @@ -0,0 +1,83 @@ +import pytest +from streamz import Source +from streamz.utils_test import wait_for, await_for, gen_test +import socket + + +def test_tcp(): + port = 9876 + s = Source.from_tcp(port) + out = s.sink_to_list() + s.start() + wait_for(lambda: s.server is not None, 2, period=0.02) + + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("localhost", port)) + sock.send(b'data\n') + sock.close() + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("localhost", port)) + sock.send(b'data\n') + + sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock2.connect(("localhost", port)) + sock2.send(b'data2\n') + wait_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 2, + period=0.01) + finally: + s.stop() + sock.close() + sock2.close() + + +@gen_test(timeout=60) +def test_tcp_async(): + port = 9876 + s = Source.from_tcp(port) + out = s.sink_to_list() + s.start() + yield await_for(lambda: s.server is not None, 2, period=0.02) + + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("localhost", port)) + sock.send(b'data\n') + sock.close() + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("localhost", port)) + sock.send(b'data\n') + + sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock2.connect(("localhost", port)) + sock2.send(b'data2\n') + yield await_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 2, + period=0.01) + finally: + s.stop() + sock.close() + sock2.close() + + +def test_http(): + requests = pytest.importorskip('requests') + port = 9875 + s = Source.from_http_server(port) + out = s.sink_to_list() + s.start() + wait_for(lambda: s.server is not None, 2, period=0.02) + + r = requests.post('http://localhost:%i/' % port, data=b'data') + wait_for(lambda: out == [b'data'], 2, period=0.01) + assert r.ok + + r = requests.post('http://localhost:%i/other' % port, data=b'data2') + wait_for(lambda: out == [b'data', b'data2'], 2, period=0.01) + assert r.ok + + s.stop() + + with pytest.raises(requests.exceptions.RequestException): + requests.post('http://localhost:%i/other' % port, data=b'data2')