From 724413b412dfbefda5823b6b712d91b314006c3c Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Wed, 23 Sep 2015 17:30:07 -0700 Subject: [PATCH] use add_future instead of add_done_callback to help with interleaving --- tchannel/tornado/connection.py | 17 +++++++++++++---- tchannel/tornado/stream.py | 3 ++- tchannel/tornado/util.py | 5 ++++- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/tchannel/tornado/connection.py b/tchannel/tornado/connection.py index a2722135..4c8b0bdb 100644 --- a/tchannel/tornado/connection.py +++ b/tchannel/tornado/connection.py @@ -182,9 +182,12 @@ def on_read_size(read_size_future): size_bytes = read_size_future.result() size = frame.frame_rw.size_rw.read(BytesIO(size_bytes)) read_body_future = self.connection.read_bytes(size - size_width) - read_body_future.add_done_callback( + + tornado.ioloop.IOLoop.current().add_future( + read_body_future, lambda future: on_body(future, size) ) + return read_body_future def on_error(future): @@ -194,7 +197,12 @@ def on_error(future): self.close() size_width = frame.frame_rw.size_rw.width() - self.connection.read_bytes(size_width).add_done_callback(on_read_size) + read_bytes_future = self.connection.read_bytes(size_width) + + tornado.ioloop.IOLoop.current().add_future( + read_bytes_future, + on_read_size, + ) return message_future @@ -577,8 +585,9 @@ def stream_request(self, request): stream_future = self._stream(request, self.request_message_factory) - stream_future.add_done_callback( - lambda f: request.close_argstreams(force=True), + tornado.ioloop.IOLoop.current().add_future( + stream_future, + lambda f: request.close_argstreams(force=True) ) return stream_future diff --git a/tchannel/tornado/stream.py b/tchannel/tornado/stream.py index 506557cd..b7c9d12f 100644 --- a/tchannel/tornado/stream.py +++ b/tchannel/tornado/stream.py @@ -140,7 +140,8 @@ def read_chunk(future): # We're not ready yet if self.state != StreamState.completed and not len(self._stream): wait_future = self._condition.wait() - wait_future.add_done_callback( + tornado.ioloop.IOLoop.current().add_future( + wait_future, lambda f: f.exception() or read_chunk(read_future) ) return read_future diff --git a/tchannel/tornado/util.py b/tchannel/tornado/util.py index 6674b6a8..0078234f 100644 --- a/tchannel/tornado/util.py +++ b/tchannel/tornado/util.py @@ -85,7 +85,10 @@ def go(): except StopIteration: all_done_future.set_result(None) else: - func(arg).add_done_callback(handle) + tornado.ioloop.IOLoop.current().add_future( + func(arg), + handle, + ) go()