Skip to content

Commit

Permalink
Fix bug #368 and reopen #355
Browse files Browse the repository at this point in the history
  • Loading branch information
Insoleet committed Feb 10, 2016
1 parent 548a8ac commit 8e84045
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions src/sakia/core/net/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ def __init__(self, peer, uid, pubkey, block,
self._version = version
self._fork_window = fork_window
self._refresh_counter = 19
self._ws_connection = {'block': None,
self._ws_tasks = {'block': None,
'peer': None}
self._connected = {'block': False,
'peer': False}


@classmethod
async def from_address(cls, currency, address, port):
Expand Down Expand Up @@ -201,10 +204,10 @@ def jsonify(self):
return data

async def close_ws(self):
for ws in self._ws_connection.values():
for ws in self._ws_tasks.values():
if ws:
ws.cancel()
await asyncio.sleep(0)
await asyncio.sleep(0)

@property
def pubkey(self):
Expand Down Expand Up @@ -304,8 +307,8 @@ def refresh(self, manual=False):
Refresh all data of this node
:param bool manual: True if the refresh was manually initiated
"""
self._ws_connection['block'] = asyncio.ensure_future(self.connect_current_block())
self._ws_connection['peer'] = asyncio.ensure_future(self.connect_peers())
self._ws_tasks['block'] = asyncio.ensure_future(self.connect_current_block())
self._ws_tasks['peer'] = asyncio.ensure_future(self.connect_peers())

if self._refresh_counter % 20 == 0 or manual:
self.refresh_informations()
Expand All @@ -320,12 +323,13 @@ async def connect_current_block(self):
Connects to the websocket entry point of the node
If the connection fails, it tries the fallback mode on HTTP GET
"""
if not self._ws_connection['block']:
if not self._connected['block']:
try:
conn_handler = self.endpoint.conn_handler()
block_websocket = bma.ws.Block(conn_handler)
ws_connection = block_websocket.connect()
async with ws_connection as ws:
self._connected['block'] = True
logging.debug("Connected successfully to block ws : {0}".format(self.pubkey[:5]))
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
Expand All @@ -347,7 +351,8 @@ async def connect_current_block(self):
logging.debug("Validation error : {0}".format(self.pubkey[:5]))
self.state = Node.CORRUPTED
finally:
self._ws_connection['block'] = None
self._connected['block'] = False
self._ws_tasks['block'] = None

async def request_current_block(self):
"""
Expand Down Expand Up @@ -520,18 +525,19 @@ async def connect_peers(self):
Connects to the peer websocket entry point
If the connection fails, it tries the fallback mode on HTTP GET
"""
if not self._ws_connection['peer']:
if not self._connected['peer']:
try:
conn_handler = self.endpoint.conn_handler()
peer_websocket = bma.ws.Peer(conn_handler)
ws_connection = peer_websocket.connect()
async with ws_connection as ws:
self._connected['peer'] = True
logging.debug("Connected successfully to peer ws : {0}".format(self.pubkey[:5]))
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
logging.debug("Received a peer : {0}".format(self.pubkey[:5]))
peer_data = peer_websocket.parse_text(msg.data)
self.refresh_peer_data(peer_data)
await self.refresh_peer_data(peer_data)
elif msg.tp == aiohttp.MsgType.closed:
break
elif msg.tp == aiohttp.MsgType.error:
Expand All @@ -547,7 +553,8 @@ async def connect_peers(self):
logging.debug("Validation error : {0}".format(self.pubkey[:5]))
self.state = Node.CORRUPTED
finally:
self._ws_connection['peer'] = None
self._connected['peer'] = False
self._ws_tasks['peer'] = None

async def request_peers(self):
"""
Expand Down

0 comments on commit 8e84045

Please sign in to comment.