Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
sammchardy committed Apr 24, 2019
2 parents db4e09b + ebabb8a commit c616e40
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 33 deletions.
6 changes: 4 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,8 @@ Node RPC Websockets

See `API <https://python-binance-chain.readthedocs.io/en/latest/binance-chain.html#module-binance_chain.node_rpc.websockets>`_ docs for more information.

For subscribe query examples see the `documentation here <https://docs.binance.org/api-reference/node-rpc.html#631-subscribe>`_

.. code:: python
import asyncio
Expand Down Expand Up @@ -630,7 +632,7 @@ See `API <https://python-binance-chain.readthedocs.io/en/latest/binance-chain.ht
wrc = await WebsocketRpcClient.create(loop, handle_evt, env=node_env)
await wrc.subscribe('NewBlock')
await wrc.subscribe("tm.event = 'NewBlock'")
await wrc.abci_info()
while True:
Expand All @@ -649,7 +651,7 @@ See `API <https://python-binance-chain.readthedocs.io/en/latest/binance-chain.ht
# with an existing WebsocketRpcClient instance
await wrc.unsubscribe('NewBlock')
await wrc.unsubscribe("tm.event = 'NewBlock'")
**Unsubscribe All**

Expand Down
62 changes: 31 additions & 31 deletions binance_chain/websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,44 +36,42 @@ def _connect(self):
def _get_ws_endpoint_url(self):
return f"{self._env.wss_url}ws"

def _is_ssl_connection(self):
if self._get_ws_endpoint_url().startswith('wss'):
return True
return False

async def _run(self):

keep_waiting: bool = True

logging.info(f"connecting to {self._get_ws_endpoint_url()}")
async with ws.connect(self._get_ws_endpoint_url(), ssl=self._is_ssl_connection()) as socket:
self._on_connect(socket)

try:
while keep_waiting:
try:
evt = await asyncio.wait_for(self._socket.recv(), timeout=self._ping_timeout)
except asyncio.TimeoutError:
self._log.debug("no message in {} seconds".format(self._ping_timeout))
await self.send_keepalive()
except asyncio.CancelledError:
self._log.debug("cancelled error")
await self.ping()
else:
try:
async with ws.connect(self._get_ws_endpoint_url(), loop=self._loop) as socket:
self._on_connect(socket)

try:
while keep_waiting:
try:
evt_obj = json.loads(evt)
except ValueError:
pass
evt = await asyncio.wait_for(self._socket.recv(), timeout=self._ping_timeout)
except asyncio.TimeoutError:
self._log.debug("no message in {} seconds".format(self._ping_timeout))
await self.send_keepalive()
except asyncio.CancelledError:
self._log.debug("cancelled error")
await self.ping()
else:
await self._coro(evt_obj)
except ws.ConnectionClosed as e:
self._log.debug('conn closed:{}'.format(e))
keep_waiting = False
await self._reconnect()
except Exception as e:
self._log.debug('ws exception:{}'.format(e))
keep_waiting = False
await self._reconnect()
try:
evt_obj = json.loads(evt)
except ValueError:
pass
else:
await self._coro(evt_obj)
except ws.ConnectionClosed as e:
self._log.debug('conn closed:{}'.format(e))
keep_waiting = False
await self._reconnect()
except Exception as e:
self._log.debug('ws exception:{}'.format(e))
keep_waiting = False
await self._reconnect()
except Exception as e:
logging.info(f"websocket error: {e}")

def _on_connect(self, socket):
self._socket = socket
Expand Down Expand Up @@ -107,6 +105,8 @@ async def send_message(self, msg, retry_count=0):
if retry_count < 5:
await asyncio.sleep(1)
await self.send_message(msg, retry_count + 1)
else:
logging.info("Unable to send, not connected")
else:
await self._socket.send(json.dumps(msg, separators=(',', ':'), ensure_ascii=False))

Expand Down

0 comments on commit c616e40

Please sign in to comment.