Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Commit

Permalink
connection: read_message should propagate StreamClosedError
Browse files Browse the repository at this point in the history
If the IOStream has already been closed by the time read_message is called, it
will fail with a synchronous StreamClosedError which read_message needs to
catch.
  • Loading branch information
abhinav committed Mar 14, 2016
1 parent a9dce49 commit 9420020
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
11 changes: 8 additions & 3 deletions tchannel/tornado/connection.py
Expand Up @@ -24,7 +24,6 @@
import os
import socket
import sys
from functools import partial

import tornado.gen
import tornado.iostream
Expand Down Expand Up @@ -816,8 +815,14 @@ def on_read_size(future):
size = frame.frame_rw.size_rw.read(BytesIO(size_bytes))
io_loop.add_future(
stream.read_bytes(size - FRAME_SIZE_WIDTH),
partial(on_body, size),
lambda f: on_body(size, f)
)

io_loop.add_future(stream.read_bytes(FRAME_SIZE_WIDTH), on_read_size)
try:
# read_bytes may fail if the stream has already been closed
read_size_future = stream.read_bytes(FRAME_SIZE_WIDTH)
except Exception:
answer.set_exc_info(sys.exc_info())
else:
read_size_future.add_done_callback(on_read_size)
return answer
20 changes: 20 additions & 0 deletions tests/tornado/test_connection.py
Expand Up @@ -164,3 +164,23 @@ def test_writer_write_error():
writer.io_stream.close()
with pytest.raises(StreamClosedError):
yield writer.put(messages.PingResponseMessage())


@pytest.mark.gen_test
def test_reader_read_error():
server, client = socket.socketpair()
reader = connection.Reader(IOStream(server))
writer = connection.Writer(IOStream(client))

yield writer.put(messages.PingRequestMessage())
ping = yield reader.get()
assert isinstance(ping, messages.PingRequestMessage)

reader.io_stream.close()
future = reader.get()
with pytest.raises(gen.TimeoutError):
yield gen.with_timeout(timedelta(milliseconds=100), future)

# TODO(abg): Need to fix Reader.get to actually raise StreamClosedError
# rather than freezing forever. Then this can be changed to assert that a
# StreamClosedError is raised.

0 comments on commit 9420020

Please sign in to comment.