Skip to content

Commit

Permalink
Add heartbeat handling in websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
Insoleet committed Mar 16, 2016
1 parent 5d33c72 commit 5f2563a
Showing 1 changed file with 39 additions and 1 deletion.
40 changes: 39 additions & 1 deletion src/sakia/core/net/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,41 @@ def refresh(self, manual=False):
else:
self._refresh_counter += 1

async def heartbeat(self, ws):
"""
:param asyncio.WebsocketClientResponse ws: the ws to do heartbeat with
:return:
"""
future_pong = None

def pong_callback():
if future_pong:
future_pong.set_result(True)

async def ping_loop():
nonlocal future_pong
future_pong = asyncio.Future()
missed_heartbeats = 0
while not ws.closed:
await asyncio.sleep(15)
if not ws.closed:
try:
future_pong = asyncio.Future()
ws.ping()
await asyncio.wait_for(future_pong, 15)
missed_heartbeats = 0
except asyncio.TimeoutError:
logging.debug("{0} - Missed a pong...".format(self.pubkey[:5]))
missed_heartbeats += 1
finally:
if missed_heartbeats > 3:
logging.debug("{0} - Too many heartbeats missed. Closing ws".format(self.pubkey[:5]))
await ws.close()

asyncio.ensure_future(ping_loop())
return pong_callback

async def connect_current_block(self):
"""
Connects to the websocket entry point of the node
Expand All @@ -344,8 +379,9 @@ async def connect_current_block(self):
try:
conn_handler = self.endpoint.conn_handler()
block_websocket = bma.ws.Block(conn_handler)
ws_connection = block_websocket.connect()
ws_connection = block_websocket.connect(autoping=False)
async with ws_connection as ws:
heartbeat = await self.heartbeat(ws)
self._connected['block'] = True
logging.debug("Connected successfully to block ws : {0}".format(self.pubkey[:5]))
async for msg in ws:
Expand All @@ -357,6 +393,8 @@ async def connect_current_block(self):
break
elif msg.tp == aiohttp.MsgType.error:
break
elif msg.tp == aiohttp.MsgType.pong:
heartbeat()
except ValueError as e:
logging.debug("Websocket block {0} : {1} - {2}".format(type(e).__name__, str(e), self.pubkey[:5]))
asyncio.ensure_future(self.request_current_block())
Expand Down

0 comments on commit 5f2563a

Please sign in to comment.