-
Notifications
You must be signed in to change notification settings - Fork 150
Add socket source #227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add socket source #227
Conversation
|
I would also like to make a HTTP server source, i.e., one event for each incoming GET, say, containing the URL parameters and data. How, though, do you tell a tornado Application or Server to start on a given event loop, or otherwise get it to run in the right place? |
|
I'm not certain, I'd like this as well, since I've been looking at making a ZMQ source. |
|
@CJ-Wright OK, solved the tornado server too. |
|
@mrocklin , can you review, and we should come to an agreement on how to move ahead regarding the stalling kafka tests. |
|
I think unbatched kafka source failures could be fixed by this commit from #216. |
mrocklin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments/questions
streamz/tests/test_sources.py
Outdated
| time.sleep(0.02) | ||
| assert l == [] | ||
| sock.send(b'\n') | ||
| time.sleep(0.02) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've found that fixed time sleeps like this can be brittle, especially on travis-ci, where a stray GC call may take more than 20ms (or a full second if you're unlucky).
Typically rather than sleeps I do something like
start = time.time()
while not l:
time.sleep(0.01 )
assert time.time() < start + 2Or, even better, you could find some way to trigger when things show up.
streamz/tests/test_sources.py
Outdated
| sock2.close() | ||
| sock.close() | ||
|
|
||
| assert l == [b'data\n', b'data\n', b'data2\n'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nice also to have an async test, if you're comfortable with the async API
streamz/sources.py
Outdated
| self.sock.bind(("", self.port)) | ||
| self.sock.listen(128) | ||
| self.loop.add_handler(self.sock.fileno(), self.connection_ready, | ||
| self.loop.READ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm somewhat surprised to see the use of raw sockets here. I'm curious, was there a reason to go this route rather than create a Tornado IOStream here directly? I see that you use them later on.
I don't know a ton about managing nonblocking sockets manually, I'm probably just missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is adapted from the tornado IOLoop documentation: as far as I understand it, you do not have any IOStream until you accept an incoming connection on the open socket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, maybe the right abstraction here then is a Tornado TCPServer?
I didn't remove the socket server verion yet, so that thhe two could be compared
|
@mrocklin , I added a tornado.TCPServer version of the same. I could just remove the direct socket version, but I am not convinced that either is better than the other. |
Codecov Report
@@ Coverage Diff @@
## master #227 +/- ##
==========================================
+ Coverage 93.38% 93.53% +0.14%
==========================================
Files 13 13
Lines 1481 1546 +65
==========================================
+ Hits 1383 1446 +63
- Misses 98 100 +2
Continue to review full report at Codecov.
|
Yeah, I don't know either. I don't know non-blocking sockets that well. I tend to just let Tornado/asyncio handle it. |
I strongly suspect that the two forms are actually identical, so it might just be a matter of choice - or leave both. |
I recommend that we stick with TCPServer if that's ok. Any comments on the async test and timing questions above? |
Sorry, missed that. I added an async test, and changed sleeps to (a)wait_for. |
mrocklin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this. A few minor comments/requests. Hopefully nothing major though.
| """Shutdown HTTP server""" | ||
| if not self.stopped: | ||
| self.server.stop() | ||
| self.server = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You remove the server here, but not in the TCP solution. Is there a reason?
streamz/sources.py
Outdated
| application = Application([ | ||
| (self.path, Handler), | ||
| ]) | ||
| self.server = HTTPServer(application) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can imagine wanting to pass in keyword arguments to the constructor and have them percolate to here (same with TCPServer) . I'm not sure though.
streamz/tests/test_sources.py
Outdated
| finally: | ||
| s.stop() | ||
|
|
||
| wait_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 2, period=0.01) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to clean up the sockets here? Does this leak file descriptors?
streamz/tests/test_sources.py
Outdated
| s = Source.from_http_server(port) | ||
| out = s.sink_to_list() | ||
| s.start() | ||
| time.sleep(0.02) # allow loop to run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? If so, why this time?
In general, I'll probably challenge any sleep like this. If it's necessary to make things run then it's likely too short in all cases (GC on travis can take arbitrarily long times) and so results in intermittent failures. I spend a non-trivial time tracking these things down in the dask/distributed codebase, so I've grown fairly allergic to them.
- Removed sleeps - Closed sockets at test end - Allowed for kwargs to pass to tornado servers (not tested - don't know what parameters may be reasonable here) - remove tcp server instance on stop; not necessary, but reasonable
| def __init__(self, port, path='/.*', start=False, server_kwargs=None): | ||
| self.port = port | ||
| self.path = path | ||
| self.server_kwargs = server_kwargs or {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In one case you use tcp_kwargs and in the other server_kwargs. Maybe use server_kwargs in both places for consistency of the API?
|
Thanks for this @martindurant . Should be fun to play with ! |
Could be used, for example, with python logging-to-socket