Skip to content

Commit

Permalink
Fix websockets errors on close #355
Browse files Browse the repository at this point in the history
  • Loading branch information
Insoleet committed Feb 8, 2016
1 parent ba17117 commit 8333313
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 10 deletions.
6 changes: 5 additions & 1 deletion src/sakia/core/net/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,12 @@ async def stop_coroutines(self):
Stop network nodes crawling.
"""
self._must_crawl = False
close_tasks = []
for node in self.nodes:
await node.close_ws()
close_tasks.append(asyncio.ensure_future(node.close_ws()))
await asyncio.gather(*close_tasks)
logging.debug("Closed")


def continue_crawling(self):
return self._must_crawl
Expand Down
11 changes: 4 additions & 7 deletions src/sakia/core/net/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ def jsonify(self):
async def close_ws(self):
for ws in self._ws_connection.values():
if ws:
await ws.close()
ws.cancel()
await asyncio.sleep(0)

@property
def pubkey(self):
Expand Down Expand Up @@ -303,8 +304,8 @@ def refresh(self, manual=False):
Refresh all data of this node
:param bool manual: True if the refresh was manually initiated
"""
self.connect_current_block()
self.connect_peers()
self._ws_connection['block'] = asyncio.ensure_future(self.connect_current_block())
self._ws_connection['peer'] = asyncio.ensure_future(self.connect_peers())

if self._refresh_counter % 20 == 0 or manual:
self.refresh_informations()
Expand All @@ -314,7 +315,6 @@ def refresh(self, manual=False):
else:
self._refresh_counter += 1

@asyncify
async def connect_current_block(self):
"""
Connects to the websocket entry point of the node
Expand All @@ -326,7 +326,6 @@ async def connect_current_block(self):
block_websocket = bma.ws.Block(conn_handler)
ws_connection = block_websocket.connect()
async with ws_connection as ws:
self._ws_connection['block'] = ws
logging.debug("Connected successfully to block ws : {0}".format(self.pubkey[:5]))
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
Expand Down Expand Up @@ -516,7 +515,6 @@ async def refresh_uid(self):
logging.debug("Validation error : {0}".format(self.pubkey[:5]))
self.state = Node.CORRUPTED

@asyncify
async def connect_peers(self):
"""
Connects to the peer websocket entry point
Expand All @@ -528,7 +526,6 @@ async def connect_peers(self):
peer_websocket = bma.ws.Peer(conn_handler)
ws_connection = peer_websocket.connect()
async with ws_connection as ws:
self._ws_connection['peer'] = ws
logging.debug("Connected successfully to peer ws : {0}".format(self.pubkey[:5]))
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
Expand Down
2 changes: 1 addition & 1 deletion src/sakia/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

def async_exception_handler(loop, context):
"""
An exception handler which exists the program if the exception
An exception handler which exits the program if the exception
was not catch
:param loop: the asyncio loop
:param context: the exception context
Expand Down
1 change: 0 additions & 1 deletion src/sakia/tools/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def start_task():
def asyncify(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
logging.debug("Sheduling {0}".format(fn.__name__))
return asyncio.ensure_future(asyncio.coroutine(fn)(*args, **kwargs))

return wrapper

0 comments on commit 8333313

Please sign in to comment.