Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Commit

Permalink
use add_future instead of add_done_callback to help with interleaving
Browse files Browse the repository at this point in the history
  • Loading branch information
blampe committed Sep 24, 2015
1 parent bcecb95 commit 724413b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
17 changes: 13 additions & 4 deletions tchannel/tornado/connection.py
Expand Up @@ -182,9 +182,12 @@ def on_read_size(read_size_future):
size_bytes = read_size_future.result()
size = frame.frame_rw.size_rw.read(BytesIO(size_bytes))
read_body_future = self.connection.read_bytes(size - size_width)
read_body_future.add_done_callback(

tornado.ioloop.IOLoop.current().add_future(
read_body_future,
lambda future: on_body(future, size)
)

return read_body_future

def on_error(future):
Expand All @@ -194,7 +197,12 @@ def on_error(future):
self.close()

size_width = frame.frame_rw.size_rw.width()
self.connection.read_bytes(size_width).add_done_callback(on_read_size)
read_bytes_future = self.connection.read_bytes(size_width)

tornado.ioloop.IOLoop.current().add_future(
read_bytes_future,
on_read_size,
)

return message_future

Expand Down Expand Up @@ -577,8 +585,9 @@ def stream_request(self, request):

stream_future = self._stream(request, self.request_message_factory)

stream_future.add_done_callback(
lambda f: request.close_argstreams(force=True),
tornado.ioloop.IOLoop.current().add_future(
stream_future,
lambda f: request.close_argstreams(force=True)
)

return stream_future
Expand Down
3 changes: 2 additions & 1 deletion tchannel/tornado/stream.py
Expand Up @@ -140,7 +140,8 @@ def read_chunk(future):
# We're not ready yet
if self.state != StreamState.completed and not len(self._stream):
wait_future = self._condition.wait()
wait_future.add_done_callback(
tornado.ioloop.IOLoop.current().add_future(
wait_future,
lambda f: f.exception() or read_chunk(read_future)
)
return read_future
Expand Down
5 changes: 4 additions & 1 deletion tchannel/tornado/util.py
Expand Up @@ -85,7 +85,10 @@ def go():
except StopIteration:
all_done_future.set_result(None)
else:
func(arg).add_done_callback(handle)
tornado.ioloop.IOLoop.current().add_future(
func(arg),
handle,
)

go()

Expand Down

0 comments on commit 724413b

Please sign in to comment.