Skip to content

Commit

Permalink
Suppress task cancel traceback (#2812)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahopkins committed Aug 30, 2023
1 parent 4dde457 commit 38ff906
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions sanic/server/websockets/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,12 +532,11 @@ async def recv(self, timeout: Optional[float] = None) -> Optional[Data]:
raise WebsocketClosed(
"Cannot receive from websocket interface after it is closed."
)
assembler_get: Optional[asyncio.Task] = None
try:
self.recv_cancel = asyncio.Future()
tasks = (
self.recv_cancel,
asyncio.ensure_future(self.assembler.get(timeout)),
)
assembler_get = asyncio.create_task(self.assembler.get(timeout))
tasks = (self.recv_cancel, assembler_get)
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED,
Expand All @@ -551,6 +550,11 @@ async def recv(self, timeout: Optional[float] = None) -> Optional[Data]:
else:
self.recv_cancel.cancel()
return done_task.result()
except asyncio.CancelledError:
# recv was cancelled
if assembler_get:
assembler_get.cancel()
raise
finally:
self.recv_cancel = None
self.recv_lock.release()
Expand Down Expand Up @@ -584,16 +588,15 @@ async def recv_burst(self, max_recv=256) -> Sequence[Data]:
"Cannot receive from websocket interface after it is closed."
)
messages = []
assembler_get: Optional[asyncio.Task] = None
try:
# Prevent pausing the transport when we're
# receiving a burst of messages
self.can_pause = False
self.recv_cancel = asyncio.Future()
while True:
tasks = (
self.recv_cancel,
asyncio.ensure_future(self.assembler.get(timeout=0)),
)
assembler_get = asyncio.create_task(self.assembler.get(0))
tasks = (self.recv_cancel, assembler_get)
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED,
Expand All @@ -616,6 +619,11 @@ async def recv_burst(self, max_recv=256) -> Sequence[Data]:
# next message to pass into the Assembler
await asyncio.sleep(0)
self.recv_cancel.cancel()
except asyncio.CancelledError:
# recv_burst was cancelled
if assembler_get:
assembler_get.cancel()
raise
finally:
self.recv_cancel = None
self.can_pause = True
Expand Down

0 comments on commit 38ff906

Please sign in to comment.