diff --git a/docs/changelog.rst b/docs/changelog.rst index 77e9da0de..5f22a06eb 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -59,6 +59,9 @@ Also: * Improved support for sending fragmented messages by accepting asynchronous iterators in :meth:`~protocol.WebSocketCommonProtocol.send`. +* Prevented spurious log messages about :exc:`~exceptions.ConnectionClosed` + exceptions in keepalive ping task. + * Avoided a crash of a ``extra_headers`` callable returns ``None``. * Enabled readline in the interactive client. diff --git a/src/websockets/protocol.py b/src/websockets/protocol.py index c07aef99f..d6462cc16 100644 --- a/src/websockets/protocol.py +++ b/src/websockets/protocol.py @@ -887,7 +887,8 @@ async def read_data_frame(self, max_size: int) -> Optional[Frame]: while ping_id != frame.data: ping_id, pong_waiter = self.pings.popitem(last=False) ping_ids.append(ping_id) - pong_waiter.set_result(None) + if not pong_waiter.done(): + pong_waiter.set_result(None) pong_hex = binascii.hexlify(frame.data).decode() or "[empty]" logger.debug( "%s - received solicited pong: %s", self.side, pong_hex @@ -980,6 +981,7 @@ async def keepalive_ping(self) -> None: This coroutine exits when the connection terminates and one of the following happens: + - :meth:`ping` raises :exc:`ConnectionClosed`, or - :meth:`close_connection` cancels :attr:`keepalive_ping_task`. @@ -991,11 +993,12 @@ async def keepalive_ping(self) -> None: while True: await asyncio.sleep(self.ping_interval, loop=self.loop) - # ping() cannot raise ConnectionClosed, only CancelledError: - # - If the connection is CLOSING, keepalive_ping_task will be - # canceled by close_connection() before ping() returns. - # - If the connection is CLOSED, keepalive_ping_task must be - # canceled already. + # ping() raises CancelledError if the connection is closed, + # when close_connection() cancels self.keepalive_ping_task. + + # ping() raises ConnectionClosed if the connection is lost, + # when connection_lost() calls abort_keepalive_pings(). + ping_waiter = await self.ping() if self.ping_timeout is not None: @@ -1011,6 +1014,9 @@ async def keepalive_ping(self) -> None: except asyncio.CancelledError: raise + except ConnectionClosed: + pass + except Exception: logger.warning("Unexpected exception in keepalive ping task", exc_info=True) diff --git a/tests/test_protocol.py b/tests/test_protocol.py index 938e54d8d..57cef89e0 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -864,6 +864,35 @@ def test_acknowledge_previous_pings(self): self.assertTrue(pings[1][0].done()) self.assertFalse(pings[2][0].done()) + def test_acknowledge_aborted_ping(self): + ping = self.loop.run_until_complete(self.protocol.ping()) + ping_frame = self.last_sent_frame() + # Clog incoming queue. This lets connection_lost() abort pending pings + # with a ConnectionClosed exception before transfer_data_task + # terminates and close_connection cancels keepalive_ping_task. + self.protocol.max_queue = 1 + self.receive_frame(Frame(True, OP_TEXT, b"1")) + self.receive_frame(Frame(True, OP_TEXT, b"2")) + # Add pong frame to the queue. + pong_frame = Frame(True, OP_PONG, ping_frame.data) + self.receive_frame(pong_frame) + # Connection drops. + self.receive_eof() + self.loop.run_until_complete(self.protocol.wait_closed()) + # Ping receives a ConnectionClosed exception. + with self.assertRaises(ConnectionClosed): + ping.result() + + with self.assertLogs("websockets", level=logging.ERROR) as logs: + # We want to test that no error log is emitted. + # Unfortunately assertLogs expects at least one log message. + logging.getLogger("websockets").error("dummy") + # Unclog incoming queue. + self.loop.run_until_complete(self.protocol.recv()) + self.loop.run_until_complete(self.protocol.recv()) + # transfer_data doesn't crash, which would be logged. + self.assertEqual(logs.output[1:], []) + def test_canceled_ping(self): ping = self.loop.run_until_complete(self.protocol.ping()) ping_frame = self.last_sent_frame() @@ -1074,6 +1103,31 @@ def test_keepalive_ping_stops_when_connection_closed(self): # The keepalive ping task terminated. self.assertTrue(self.protocol.keepalive_ping_task.cancelled()) + def test_keepalive_ping_does_not_crash_when_connection_lost(self): + self.restart_protocol_with_keepalive_ping() + # Clog incoming queue. This lets connection_lost() abort pending pings + # with a ConnectionClosed exception before transfer_data_task + # terminates and close_connection cancels keepalive_ping_task. + self.protocol.max_queue = 1 + self.receive_frame(Frame(True, OP_TEXT, b"1")) + self.receive_frame(Frame(True, OP_TEXT, b"2")) + # Ping is sent at 3ms. + self.loop.run_until_complete(asyncio.sleep(4 * MS)) + ping_waiter, = tuple(self.protocol.pings.values()) + # Connection drops. + self.receive_eof() + self.loop.run_until_complete(self.protocol.wait_closed()) + + # The ping waiter receives a ConnectionClosed exception. + with self.assertRaises(ConnectionClosed): + ping_waiter.result() + # The keepalive ping task terminated properly. + self.assertIsNone(self.protocol.keepalive_ping_task.result()) + + # Unclog incoming queue to terminate the test quickly. + self.loop.run_until_complete(self.protocol.recv()) + self.loop.run_until_complete(self.protocol.recv()) + def test_keepalive_ping_with_no_ping_interval(self): self.restart_protocol_with_keepalive_ping(ping_interval=None)