From bdcaeb00566813bfc4d6d8971e843095dd9f8189 Mon Sep 17 00:00:00 2001 From: Janus Date: Fri, 7 Sep 2018 14:28:09 +0200 Subject: [PATCH] aiorpcx: detect lost connection --- electrum/interface.py | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/electrum/interface.py b/electrum/interface.py index d0e63f2f9d5c..284e38ddfb25 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -31,7 +31,6 @@ import traceback import aiorpcx import asyncio -import concurrent.futures from aiorpcx import ClientSession, Notification import requests @@ -159,7 +158,10 @@ async def run(self): try: await self.open_session(sslc, exit_early=False) except (asyncio.CancelledError, ConnectionRefusedError, socket.gaierror, ssl.SSLError, TimeoutError) as e: - self.print_error('disconnecting due to: {}'.format(e)) + if str(e): + self.print_error('disconnecting due to: {}'.format(e)) + else: + self.print_error('disconnecting due to exception of type: {}'.format(type(e))) self.exception = e return # should never get here (can only exit via exception) @@ -232,14 +234,32 @@ async def open_session(self, sslc, exit_early): self.mark_ready() copy_header_queue = asyncio.Queue() block_retriever = asyncio.get_event_loop().create_task(self.run_fetch_blocks(subscription_res, copy_header_queue)) - while True: + # make event such that we can cancel waiting + # for the event without cancelling pm_task + connection_lost_evt = asyncio.Event() + session.pm_task.add_done_callback(lambda *args: connection_lost_evt.set()) + while not connection_lost_evt.is_set(): try: - new_header = await asyncio.wait_for(header_queue.get(), 300) - self.tip_header = new_header - self.tip = new_header['block_height'] - await copy_header_queue.put(new_header) - except concurrent.futures.TimeoutError: + async with aiorpcx.curio.TimeoutAfter(300) as deadline: + async with aiorpcx.TaskGroup(wait=any) as tg: + qtask = await tg.spawn(header_queue.get()) + await tg.spawn(connection_lost_evt.wait()) + except asyncio.CancelledError: + if not deadline.expired: + # if it wasn't because of the deadline, we are + # trying to shut down, and we shouldn't ping + raise await asyncio.wait_for(session.send_request('server.ping'), 5) + else: + try: + if qtask.done() and not qtask.exception(): + new_header = qtask.result() + self.tip_header = new_header + self.tip = new_header['block_height'] + await copy_header_queue.put(new_header) + except asyncio.CancelledError: + pass + raise GracefulDisconnect("connection loop exited") def close(self): self.fut.cancel()