Skip to content

Commit

Permalink
Merge bdcaeb0 into 52b877a
Browse files Browse the repository at this point in the history
  • Loading branch information
ysangkok committed Sep 7, 2018
2 parents 52b877a + bdcaeb0 commit 4cb31dc
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions electrum/interface.py
Expand Up @@ -31,7 +31,6 @@
import traceback
import aiorpcx
import asyncio
import concurrent.futures
from aiorpcx import ClientSession, Notification

import requests
Expand Down Expand Up @@ -159,7 +158,10 @@ async def run(self):
try:
await self.open_session(sslc, exit_early=False)
except (asyncio.CancelledError, ConnectionRefusedError, socket.gaierror, ssl.SSLError, TimeoutError) as e:
self.print_error('disconnecting due to: {}'.format(e))
if str(e):
self.print_error('disconnecting due to: {}'.format(e))
else:
self.print_error('disconnecting due to exception of type: {}'.format(type(e)))
self.exception = e
return
# should never get here (can only exit via exception)
Expand Down Expand Up @@ -232,14 +234,32 @@ async def open_session(self, sslc, exit_early):
self.mark_ready()
copy_header_queue = asyncio.Queue()
block_retriever = asyncio.get_event_loop().create_task(self.run_fetch_blocks(subscription_res, copy_header_queue))
while True:
# make event such that we can cancel waiting
# for the event without cancelling pm_task
connection_lost_evt = asyncio.Event()
session.pm_task.add_done_callback(lambda *args: connection_lost_evt.set())
while not connection_lost_evt.is_set():
try:
new_header = await asyncio.wait_for(header_queue.get(), 300)
self.tip_header = new_header
self.tip = new_header['block_height']
await copy_header_queue.put(new_header)
except concurrent.futures.TimeoutError:
async with aiorpcx.curio.TimeoutAfter(300) as deadline:
async with aiorpcx.TaskGroup(wait=any) as tg:
qtask = await tg.spawn(header_queue.get())
await tg.spawn(connection_lost_evt.wait())
except asyncio.CancelledError:
if not deadline.expired:
# if it wasn't because of the deadline, we are
# trying to shut down, and we shouldn't ping
raise
await asyncio.wait_for(session.send_request('server.ping'), 5)
else:
try:
if qtask.done() and not qtask.exception():
new_header = qtask.result()
self.tip_header = new_header
self.tip = new_header['block_height']
await copy_header_queue.put(new_header)
except asyncio.CancelledError:
pass
raise GracefulDisconnect("connection loop exited")

def close(self):
self.fut.cancel()
Expand Down

0 comments on commit 4cb31dc

Please sign in to comment.