Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 67 additions & 44 deletions binance/ws/reconnecting_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
from socket import gaierror
from typing import Optional
from typing import Optional, Union
from asyncio import sleep
from random import random

Expand All @@ -14,23 +14,7 @@
except ImportError:
pass

try:
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK # type: ignore
except ImportError:
from websockets import ConnectionClosedError, ConnectionClosedOK # type: ignore


Proxy = None
proxy_connect = None
try:
from websockets_proxy import Proxy as w_Proxy, proxy_connect as w_proxy_connect

Proxy = w_Proxy
proxy_connect = w_proxy_connect
except ImportError:
pass

import websockets as ws
import picows

from binance.exceptions import (
BinanceWebsocketClosed,
Expand All @@ -42,6 +26,57 @@
from binance.ws.constants import WSListenerState


_DISCONNECT_SENTINEL = object()


class _PicowsWebSocket(picows.WSListener):
def __init__(self):
self._transport = None
self._queue = asyncio.Queue()
self.closed = False

def on_ws_connected(self, transport):
self._transport = transport

def on_ws_frame(self, transport: picows.WSTransport, frame: picows.WSFrame) -> None:
if frame.msg_type == picows.WSMsgType.TEXT:
payload: Union[str, bytes] = frame.get_payload_as_utf8_text()
elif frame.msg_type == picows.WSMsgType.BINARY:
payload = frame.get_payload_as_bytes()
else:
return
self._queue.put_nowait(payload)

def on_ws_disconnected(self, transport: picows.WSTransport) -> None:
self.closed = True
self._queue.put_nowait(_DISCONNECT_SENTINEL)

async def recv(self):
if self.closed:
raise ConnectionError("WebSocket is closed")
msg = await self._queue.get()
self._queue.task_done()
if msg is _DISCONNECT_SENTINEL:
self.closed = True
raise ConnectionError("WebSocket disconnected")
return msg

async def send(self, payload: Union[str, bytes]) -> None:
if self.closed:
raise ConnectionError("WebSocket is closed")
if isinstance(payload, bytes):
self._transport.send(picows.WSMsgType.BINARY, payload)
else:
self._transport.send(picows.WSMsgType.TEXT, payload.encode("utf-8"))

async def close(self) -> None:
if self.closed:
return
self._transport.send_close()
self._transport.disconnect()
await self._transport.wait_disconnected()


class ReconnectingWebsocket:
MAX_RECONNECTS = 5
MAX_RECONNECT_SECONDS = 60
Expand Down Expand Up @@ -70,7 +105,7 @@ def __init__(
self._is_binary = is_binary
self._conn = None
self._socket = None
self.ws: Optional[ws.WebSocketClientProtocol] = None # type: ignore
self.ws: Optional[_PicowsWebSocket] = None
self.ws_state = WSListenerState.INITIALISING
self._queue = asyncio.Queue()
self._handle_read_loop = None
Expand Down Expand Up @@ -103,8 +138,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._exit_coro(self._path)
if self.ws:
await self.ws.close()
if self._conn and hasattr(self._conn, "protocol"):
await self._conn.__aexit__(exc_type, exc_val, exc_tb)
self.ws = None

async def connect(self):
Expand All @@ -116,21 +149,17 @@ async def connect(self):
f"{self._url}{getattr(self, '_prefix', '')}{getattr(self, '_path', '')}"
)

# handle https_proxy
if self._https_proxy:
if not Proxy or not proxy_connect:
raise ImportError(
"websockets_proxy is not installed, please install it to use a websockets proxy (pip install websockets_proxy)"
)
proxy = Proxy.from_url(self._https_proxy) # type: ignore
self._conn = proxy_connect(
ws_url, close_timeout=0.1, proxy=proxy, **self._ws_kwargs
) # type: ignore
else:
self._conn = ws.connect(ws_url, close_timeout=0.1, **self._ws_kwargs) # type: ignore

try:
self.ws = await self._conn.__aenter__()
if self._https_proxy and self._https_proxy.lower().startswith("https://"):
raise ValueError(
"picows does not support https:// proxy URLs; use http://, socks4://, or socks5://"
)
_, self.ws = await picows.ws_connect(
_PicowsWebSocket,
ws_url,
proxy=self._https_proxy,
**self._ws_kwargs,
)
except Exception as e: # noqa
self._log.error(f"Failed to connect to websocket: {e}")
self.ws_state = WSListenerState.RECONNECTING
Expand Down Expand Up @@ -189,10 +218,7 @@ async def _read_loop(self):
f"_read_loop {self._path} break for {self.ws_state}"
)
break
elif self.ws.state == ws.protocol.State.CLOSING: # type: ignore
await asyncio.sleep(0.1)
continue
elif self.ws.state == ws.protocol.State.CLOSED: # type: ignore
elif self.ws and self.ws.closed:
self._reconnect()
raise BinanceWebsocketClosed(
"Connection closed. Reconnecting..."
Expand Down Expand Up @@ -225,8 +251,7 @@ async def _read_loop(self):
except (
asyncio.IncompleteReadError,
gaierror,
ConnectionClosedError,
ConnectionClosedOK,
ConnectionError,
BinanceWebsocketClosed,
) as e:
# reports errors and continue loop
Expand Down Expand Up @@ -299,11 +324,9 @@ def _get_reconnect_wait(self, attempts: int) -> int:

async def before_reconnect(self):
if self.ws:
await self.ws.close()
self.ws = None

if self._conn and hasattr(self._conn, "protocol"):
await self._conn.__aexit__(None, None, None)

self._reconnects += 1

def _reconnect(self):
Expand Down
4 changes: 1 addition & 3 deletions binance/ws/websocket_api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from typing import Dict, Optional
import asyncio

from websockets import WebSocketClientProtocol # type: ignore

from .constants import WSListenerState
from .reconnecting_websocket import ReconnectingWebsocket
from binance.exceptions import BinanceAPIException, BinanceWebsocketUnableToConnect
Expand Down Expand Up @@ -92,7 +90,7 @@ async def _ensure_ws_connection(self) -> None:
try:
if (
self.ws is None
or (isinstance(self.ws, WebSocketClientProtocol) and self.ws.closed)
or self.ws.closed
or self.ws_state != WSListenerState.STREAMING
):
await self.connect()
Expand Down
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ aiohttp
dateparser
pycryptodome
requests
websockets
websockets_proxy; python_version >= '3.8'
picows
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"six",
"dateparser",
"aiohttp",
"websockets",
"picows",
"pycryptodome",
],
keywords="binance exchange rest api bitcoin ethereum btc eth neo",
Expand Down
Loading