Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #291 from uber/closed
Browse files Browse the repository at this point in the history
Remove assertion for connection close instead throwing StreamClosedError
  • Loading branch information
jc-fireball committed Nov 7, 2015
2 parents 86c764d + 4012b26 commit 9a88a00
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Changes by Version
planned in 0.18.
- Reduced Zipkin submission failures to warnings.
- Limit the size of arg1 to 16KB.
- Fix bug which prevented requests from being retried if the candidate
connection was previously terminated.


0.18.3 (unreleased)
Expand Down
19 changes: 7 additions & 12 deletions tchannel/tornado/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import tornado.queues as queues

from tornado import stack_context
from tornado.iostream import StreamClosedError


from .. import errors
Expand Down Expand Up @@ -163,6 +164,9 @@ def _on_close(self):
except queues.QueueEmpty:
pass

if self._close_cb:
self._close_cb()

def await(self):
"""Get the next call to this TChannel."""
if self._loop_running:
Expand Down Expand Up @@ -208,10 +212,7 @@ def on_read_size(read_size_future):
return read_body_future

def on_error(future):
exception = future.exception()

if isinstance(exception, tornado.iostream.StreamClosedError):
self.close()
log.info("Failed to read data: %s", future.exception())

size_width = frame.frame_rw.size_rw.width()
read_bytes_future = self.connection.read_bytes(size_width)
Expand Down Expand Up @@ -290,7 +291,6 @@ def send(self, message):
:returns:
A Future containing the response for the message
"""
assert not self.closed
assert self._loop_running, "Perform a handshake first."
assert message.message_type in self.CALL_REQ_TYPES, (
"Message '%s' can't use send" % repr(message)
Expand All @@ -314,8 +314,6 @@ def write(self, message):
:param message:
Message to write.
"""
assert not self.closed

message.id = message.id or self.next_message_id()

if message.message_type in self.CALL_REQ_TYPES:
Expand Down Expand Up @@ -350,10 +348,8 @@ def _write(self, message):
return self.connection.write(body)

def close(self):
if not self.connection.closed():
if not self.closed:
self.connection.close()
if self._close_cb:
self._close_cb()

@tornado.gen.coroutine
def initiate_handshake(self, headers):
Expand Down Expand Up @@ -453,7 +449,7 @@ def outgoing(cls, hostport, process_name=None, serve_hostport=None,
log.debug("Connecting to %s", hostport)
try:
yield stream.connect((host, int(port)))
except socket.error as e:
except (StreamClosedError, socket.error) as e:
log.warn("Couldn't connect to %s", hostport)
raise NetworkError(
"Couldn't connect to %s" % hostport, e
Expand Down Expand Up @@ -623,7 +619,6 @@ def send_request(self, request):
:returns:
A Future containing the response for the request
"""
assert not self.closed
assert self._loop_running, "Perform a handshake first."

assert request.id not in self._outstanding, (
Expand Down
16 changes: 14 additions & 2 deletions tchannel/tornado/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@
from collections import deque
from itertools import takewhile, dropwhile
from tornado import gen
from tornado.iostream import StreamClosedError

from ..schemes import DEFAULT as DEFAULT_SCHEME
from ..retry import (
DEFAULT as DEFAULT_RETRY, DEFAULT_RETRY_LIMIT
)
from tchannel.event import EventType

from ..context import get_current_context
from ..errors import BadRequestError
from ..errors import NoAvailablePeerError
from ..errors import TChannelError
from ..errors import NetworkError
from ..event import EventType
from ..glossary import DEFAULT_TIMEOUT
from ..glossary import MAX_SIZE_OF_ARG1
from ..zipkin.annotation import Endpoint
Expand Down Expand Up @@ -409,6 +411,17 @@ def _send(self, connection, req):
with timeout(response_future, req.ttl):
try:
response = yield response_future
except StreamClosedError as error:
network_error = NetworkError(
id=req.id,
description=error.message,
tracing=req.tracing,
)
# event: after_receive_error
self.tchannel.event_emitter.fire(
EventType.after_receive_error, req, error,
)
raise network_error
except TChannelError as error:
# event: after_receive_error
self.tchannel.event_emitter.fire(
Expand All @@ -429,7 +442,6 @@ def send_with_retry(self, request, peer, retry_limit, connection):
try:
response = yield self._send(connection, request)
raise gen.Return(response)
# Why are we retying on all errors????
except TChannelError as error:
blacklist.add(peer.hostport)
(peer, connection) = yield self._prepare_for_retry(
Expand Down
4 changes: 4 additions & 0 deletions tchannel/tornado/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import tornado
import tornado.gen
from tornado.iostream import StreamClosedError

from tchannel import retry

Expand Down Expand Up @@ -177,6 +178,9 @@ def should_retry_on_error(self, error):
if retry_flag == retry.NEVER:
return False

if isinstance(error, StreamClosedError):
return True

if error.code in [ErrorCode.bad_request, ErrorCode.cancelled,
ErrorCode.unhealthy]:
return False
Expand Down
25 changes: 24 additions & 1 deletion tests/integration/test_client_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import pytest

from tchannel import tcurl
from tchannel import TChannel
from tchannel.errors import NetworkError
from tchannel.errors import BadRequestError
from tchannel import TChannel
from tchannel.tornado.connection import StreamConnection
from tests.util import big_arg

Expand Down Expand Up @@ -109,3 +109,26 @@ def test_endpoint_not_found(mock_server):
endpoint='fooo',
hostport=mock_server.hostport,
)


@pytest.mark.gen_test
def test_connection_close(mock_server):
tchannel = TChannel(name='test')

# use a bad call to finish the hand shake and build the connection.
with pytest.raises(BadRequestError):
yield tchannel.raw(
service='test-service',
hostport=mock_server.hostport,
endpoint='testg',
)

# close the server and close the connection.
mock_server.tchannel._dep_tchannel.close()

with pytest.raises(NetworkError):
yield tchannel.raw(
service='test-service',
hostport=mock_server.hostport,
endpoint='testg',
)
7 changes: 4 additions & 3 deletions tests/tornado/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ def test_close_callback_is_called():
server = TChannel('server')
server.listen()

close_cb = mock.Mock()
cb_future = tornado.gen.Future()

conn = yield StreamConnection.outgoing(
server.hostport, tchannel=mock.MagicMock()
)
conn.set_close_callback(close_cb)
conn.set_close_callback(lambda: cb_future.set_result(True))

conn.close()
close_cb.assert_called_once_with()

assert (yield cb_future)

0 comments on commit 9a88a00

Please sign in to comment.