Skip to content

Commit

Permalink
Merge branch 'feature/websocket_multiplex' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
sammchardy committed Dec 8, 2017
2 parents 568274b + c741572 commit 5daa9c3
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 9 deletions.
2 changes: 1 addition & 1 deletion PYPIREADME.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Quick Start
# start trade websocket
def process_message(msg):
print("message type:" + msg['e'])
print("message type: {}".format(msg['e']))
print(msg)
# do something
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Quick Start
# start trade websocket
def process_message(msg):
print("message type:" + msg['e'])
print("message type: {}".format(msg['e']))
print(msg)
# do something
Expand Down
44 changes: 38 additions & 6 deletions binance/websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
from .enums import KLINE_INTERVAL_1MINUTE, WEBSOCKET_DEPTH_1
from .exceptions import BinanceAPIException

BINANCE_STREAM_URL = 'wss://stream.binance.com:9443/ws/'


class BinanceClientProtocol(WebSocketClientProtocol):

Expand Down Expand Up @@ -58,6 +56,8 @@ def clientConnectionLost(self, connector, reason):

class BinanceSocketManager(threading.Thread):

STREAM_URL = 'wss://stream.binance.com:9443/'

_user_timeout = 50 * 60 # 50 minutes

def __init__(self, client):
Expand All @@ -74,11 +74,12 @@ def __init__(self, client):
self._user_callback = None
self._client = client

def _start_socket(self, path, callback):
def _start_socket(self, path, callback, prefix='ws/'):
if path in self._conns:
return False

factory = BinanceClientFactory(BINANCE_STREAM_URL + path)
factory_url = self.STREAM_URL + prefix + path
factory = BinanceClientFactory(factory_url)
factory.protocol = BinanceClientProtocol
factory.callback = callback
context_factory = ssl.ClientContextFactory()
Expand All @@ -89,7 +90,7 @@ def _start_socket(self, path, callback):
def start_depth_socket(self, symbol, callback, depth=WEBSOCKET_DEPTH_1):
"""Start a websocket for symbol market depth
https://www.binance.com/restapipub.html#depth-wss-endpoint
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#partial-book-depth-streams
:param symbol: required
:type symbol: str
Expand Down Expand Up @@ -143,7 +144,7 @@ def start_depth_socket(self, symbol, callback, depth=WEBSOCKET_DEPTH_1):
def start_kline_socket(self, symbol, callback, interval=KLINE_INTERVAL_1MINUTE):
"""Start a websocket for symbol kline data
https://www.binance.com/restapipub.html#kline-wss-endpoint
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#klinecandlestick-streams
:param symbol: required
:type symbol: str
Expand Down Expand Up @@ -189,6 +190,8 @@ def start_kline_socket(self, symbol, callback, interval=KLINE_INTERVAL_1MINUTE):
def start_trade_socket(self, symbol, callback):
"""Start a websocket for symbol trade data
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#trade-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
Expand Down Expand Up @@ -220,6 +223,8 @@ def start_trade_socket(self, symbol, callback):
def start_aggtrade_socket(self, symbol, callback):
"""Start a websocket for symbol trade data
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#aggregate-trade-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
Expand Down Expand Up @@ -251,6 +256,8 @@ def start_aggtrade_socket(self, symbol, callback):
def start_symbol_ticker_socket(self, symbol, callback):
"""Start a websocket for a symbol's ticker data
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#individual-symbol-ticker-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
Expand Down Expand Up @@ -296,6 +303,8 @@ def start_ticker_socket(self, callback):
By default all markets are included in an array.
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#all-market-tickers-stream
:param callback: callback function to handle messages
:type callback: function
Expand Down Expand Up @@ -333,6 +342,29 @@ def start_ticker_socket(self, callback):
"""
return self._start_socket('!ticker@arr', callback)

def start_multiplex_socket(self, streams, callback):
"""Start a multiplexed socket using a list of socket names.
User stream sockets can not be included.
Symbols in socket name must be lowercase i.e bnbbtc@aggTrade, neobtc@ticker
Combined stream events are wrapped as follows: {"stream":"<streamName>","data":<rawPayload>}
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md
:param streams: list of stream names in lower case
:type streams: list
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format - see Binance API docs for all types
"""
stream_path = 'streams={}'.format('/'.join(streams))
return self._start_socket(stream_path, callback, 'stream?')

def start_user_socket(self, callback):
"""Start a websocket for user data
Expand Down
21 changes: 20 additions & 1 deletion docs/websockets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,30 @@ A callback to process messages would take the format
.. code:: python
def process_message(msg):
print("message type:" + msg['e'])
print("message type: {}".format(msg['e']))
print(msg)
# do something
`Multiplex Socket <binance.html#binance.websockets.BinanceSocketManager.start_multiplex_socket>`_
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Create a socket combining multiple streams.

These streams can include the depth, kline, ticker and trade streams but not the user stream which requires extra authentication.

Symbols in socket name must be lowercase i.e bnbbtc@aggTrade, neobtc@ticker

See the `Binance Websocket Streams API documentation <https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md>`_ for details on socket names.

.. code:: python
def process_m_message(msg):
print("stream: {} data: {}".format(msg['stream'], msg['data']))
# pass a list of stream names
conn_key = bm.start_multiplex_socket(['bnbbtc@aggTrade', 'neobtc@ticker'], process_m_message)
`Depth Socket <binance.html#binance.websockets.BinanceSocketManager.start_depth_socket>`_
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Expand Down

0 comments on commit 5daa9c3

Please sign in to comment.