Skip to content

Commit

Permalink
Merge pull request #1300 from sekogan/fix_1298_case_2
Browse files Browse the repository at this point in the history
Fix case 2 from #1298
  • Loading branch information
michaelklishin committed Jan 26, 2021
2 parents adf89f3 + a439571 commit 8d894d4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
12 changes: 12 additions & 0 deletions pika/adapters/twisted_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,7 @@ def __init__(self, parameters=None, custom_reactor=None):
on_close_callback=self._on_connection_closed,
custom_reactor=custom_reactor,
)
self._calls = set()

def channel(self, channel_number=None): # pylint: disable=W0221
"""Create a new channel with the next available channel number or pass
Expand All @@ -1190,6 +1191,8 @@ def channel(self, channel_number=None): # pylint: disable=W0221
"""
d = defer.Deferred()
self._impl.channel(channel_number, d.callback)
self._calls.add(d)
d.addCallback(self._clear_call, d)
return d.addCallback(TwistedChannel)

@property
Expand Down Expand Up @@ -1239,6 +1242,11 @@ def _on_connection_failed(self, _connection, _error_message=None):
d.errback(exc)

def _on_connection_closed(self, _connection, exception):
# errback all pending calls
for d in self._calls:
d.errback(exception)
self._calls = set()

d, self.closed = self.closed, None
if d:
if isinstance(exception, Failure):
Expand All @@ -1247,6 +1255,10 @@ def _on_connection_closed(self, _connection, exception):
exception = exception.value
d.callback(exception)

def _clear_call(self, ret, d):
self._calls.discard(d)
return ret


class _TimerHandle(nbio_interface.AbstractTimerReference):
"""This module's adaptation of `nbio_interface.AbstractTimerReference`.
Expand Down
13 changes: 13 additions & 0 deletions tests/acceptance/twisted_adapter_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,19 @@ def check(result):
d.addCallback(check)
return d

@deferred(timeout=5.0)
def test_channel_errback_if_connection_closed(self):
# Verify calls to channel() that haven't had their callback invoked
# errback when the connection closes.
self.conn._on_connection_ready("dummy")

d = self.conn.channel()

self.conn._on_connection_closed("test conn", RuntimeError("testing"))

self.assertEqual(len(self.conn._calls), 0)
return self.assertFailure(d, RuntimeError)

def test_dataReceived(self):
# Verify that the data is transmitted to the callback method.
self.conn.dataReceived("testdata")
Expand Down

0 comments on commit 8d894d4

Please sign in to comment.