Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Sources
filenames
from_kafka
from_textfile
from_socket

DaskStream
----------
Expand Down
136 changes: 136 additions & 0 deletions streamz/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,142 @@ def do_poll(self):
break


@Stream.register_api(staticmethod)
class from_tcp(Source):
"""
Creates events by reading from a socket using tornado TCPServer

The stream of incoming bytes is split on a given delimiter, and the parts
become the emitted events.

Parameters
----------
port : int
The port to open and listen on. It only gets opened when the source
is started, and closed upon ``stop()``
delimiter : bytes
The incoming data will be split on this value. The resulting events
will still have the delimiter at the end.
start : bool
Whether to immediately initiate the source. You probably want to
set up downstream nodes first.
server_kwargs : dict or None
If given, additional arguments to pass to TCPServer

Example
-------

>>> source = Source.from_tcp(4567) # doctest: +SKIP
"""
def __init__(self, port, delimiter=b'\n', start=False,
server_kwargs=None):
super(from_tcp, self).__init__(ensure_io_loop=True)
self.stopped = True
self.server_kwargs = server_kwargs or {}
self.port = port
self.server = None
self.delimiter = delimiter
if start: # pragma: no cover
self.start()

@gen.coroutine
def _start_server(self):
from tornado.tcpserver import TCPServer
from tornado.iostream import StreamClosedError

class EmitServer(TCPServer):
source = self

@gen.coroutine
def handle_stream(self, stream, address):
while True:
try:
data = yield stream.read_until(self.source.delimiter)
yield self.source._emit(data)
except StreamClosedError:
break

self.server = EmitServer(**self.server_kwargs)
self.server.listen(self.port)

def start(self):
if self.stopped:
self.loop.add_callback(self._start_server)
self.stopped = False

def stop(self):
if not self.stopped:
self.server.stop()
self.server = None
self.stopped = True


@Stream.register_api(staticmethod)
class from_http_server(Source):
"""Listen for HTTP POSTs on given port

Each connection will emit one event, containing the body data of
the request

Parameters
----------
port : int
The port to listen on
path : str
Specific path to listen on. Can be regex, but content is not used.
start : bool
Whether to immediately startup the server. Usually you want to connect
downstream nodes first, and then call ``.start()``.
server_kwargs : dict or None
If given, set of further parameters to pass on to HTTPServer

Example
-------
>>> source = Source.from_http_server(4567) # doctest: +SKIP
"""

def __init__(self, port, path='/.*', start=False, server_kwargs=None):
self.port = port
self.path = path
self.server_kwargs = server_kwargs or {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In one case you use tcp_kwargs and in the other server_kwargs. Maybe use server_kwargs in both places for consistency of the API?

super(from_http_server, self).__init__(ensure_io_loop=True)
self.stopped = True
self.server = None
if start: # pragma: no cover
self.start()

def _start_server(self):
from tornado.web import Application, RequestHandler
from tornado.httpserver import HTTPServer

class Handler(RequestHandler):
source = self

@gen.coroutine
def post(self):
yield self.source._emit(self.request.body)
self.write('OK')

application = Application([
(self.path, Handler),
])
self.server = HTTPServer(application, **self.server_kwargs)
self.server.listen(self.port)

def start(self):
"""Start HTTP server and listen"""
if self.stopped:
self.loop.add_callback(self._start_server)
self.stopped = False

def stop(self):
"""Shutdown HTTP server"""
if not self.stopped:
self.server.stop()
self.server = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You remove the server here, but not in the TCP solution. Is there a reason?

self.stopped = True


@Stream.register_api(staticmethod)
class from_kafka(Source):
""" Accepts messages from Kafka
Expand Down
83 changes: 83 additions & 0 deletions streamz/tests/test_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import pytest
from streamz import Source
from streamz.utils_test import wait_for, await_for, gen_test
import socket


def test_tcp():
port = 9876
s = Source.from_tcp(port)
out = s.sink_to_list()
s.start()
wait_for(lambda: s.server is not None, 2, period=0.02)

try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", port))
sock.send(b'data\n')
sock.close()

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", port))
sock.send(b'data\n')

sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect(("localhost", port))
sock2.send(b'data2\n')
wait_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 2,
period=0.01)
finally:
s.stop()
sock.close()
sock2.close()


@gen_test(timeout=60)
def test_tcp_async():
port = 9876
s = Source.from_tcp(port)
out = s.sink_to_list()
s.start()
yield await_for(lambda: s.server is not None, 2, period=0.02)

try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", port))
sock.send(b'data\n')
sock.close()

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", port))
sock.send(b'data\n')

sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect(("localhost", port))
sock2.send(b'data2\n')
yield await_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 2,
period=0.01)
finally:
s.stop()
sock.close()
sock2.close()


def test_http():
requests = pytest.importorskip('requests')
port = 9875
s = Source.from_http_server(port)
out = s.sink_to_list()
s.start()
wait_for(lambda: s.server is not None, 2, period=0.02)

r = requests.post('http://localhost:%i/' % port, data=b'data')
wait_for(lambda: out == [b'data'], 2, period=0.01)
assert r.ok

r = requests.post('http://localhost:%i/other' % port, data=b'data2')
wait_for(lambda: out == [b'data', b'data2'], 2, period=0.01)
assert r.ok

s.stop()

with pytest.raises(requests.exceptions.RequestException):
requests.post('http://localhost:%i/other' % port, data=b'data2')