diff --git a/polygon/websocket/__init__.py b/polygon/websocket/__init__.py index 6d505e1b..e8f1712a 100644 --- a/polygon/websocket/__init__.py +++ b/polygon/websocket/__init__.py @@ -1,5 +1,5 @@ from .base import WebsocketBaseClient + class WebSocketClient(WebsocketBaseClient): pass - diff --git a/polygon/websocket/base.py b/polygon/websocket/base.py index 3e61d6e7..59da6462 100644 --- a/polygon/websocket/base.py +++ b/polygon/websocket/base.py @@ -1,7 +1,21 @@ import os from enum import Enum from typing import Optional, Union, List -from .models import Feed, Market + +from .models import ( + Feed, + Market, + EquityAgg, + CurrencyAgg, + EquityTrade, + CryptoTrade, + EquityQuote, + ForexQuote, + CryptoQuote, + Imbalance, + LimitUpLimitDown, + Level2Book, +) import websockets import json import inspect @@ -10,6 +24,46 @@ env_key = "POLYGON_API_KEY" +def parse_single(data): + event_type = data["ev"] + if event_type == "A": + return EquityAgg.from_dict(data) + elif event_type == "AM": + return EquityAgg.from_dict(data) + elif event_type == "CA": + return CurrencyAgg.from_dict(data) + elif event_type == "XA": + return CurrencyAgg.from_dict(data) + elif event_type == "T": + return EquityTrade.from_dict(data) + elif event_type == "XT": + return CryptoTrade.from_dict(data) + elif event_type == "Q": + return EquityQuote.from_dict(data) + elif event_type == "C": + return ForexQuote.from_dict(data) + elif event_type == "XQ": + return CryptoQuote.from_dict(data) + elif event_type == "NOI": + return Imbalance.from_dict(data) + elif event_type == "LULD": + return LimitUpLimitDown.from_dict(data) + elif event_type == "XL2": + return Level2Book.from_dict(data) + return None + + +def parse(msg): + res = [] + for m in msg: + parsed = parse_single(m) + if parsed is None: + print("bad message", m) + else: + res.append(parsed) + return res + + class WebsocketBaseClient: def __init__( self, @@ -20,7 +74,7 @@ def __init__( verbose: bool = False, subscriptions: List[str] = [], max_reconnects: Optional[int] = 5, - **kwargs + **kwargs, ): if api_key is None: raise Exception( @@ -43,42 +97,44 @@ def __init__( self.websocket = None self.scheduled_subs = set(subscriptions) self.schedule_resub = True - print('subscriptions', subscriptions) - print('scheduled_subs', subscriptions) + print("subscriptions", subscriptions) + print("scheduled_subs", subscriptions) # https://websockets.readthedocs.io/en/stable/reference/client.html#opening-a-connection async def connect(self, processor, close_timeout=1, **kwargs): reconnects = 0 isasync = inspect.iscoroutinefunction(processor) if self.verbose: - print('connect', self.url) - async for s in websockets.connect(self.url, close_timeout=close_timeout, **kwargs): + print("connect", self.url) + async for s in websockets.connect( + self.url, close_timeout=close_timeout, **kwargs + ): self.websocket = s try: msg = await s.recv() if self.verbose: - print('connected', msg) + print("connected", msg) if self.verbose: - print('authing') - await s.send(json.dumps({"action":"auth","params":self.api_key})) + print("authing") + await s.send(json.dumps({"action": "auth", "params": self.api_key})) msg = await s.recv() if self.verbose: - print('authed', msg) + print("authed", msg) while True: if self.schedule_resub: - print('reconciling', self.subs, self.scheduled_subs) + print("reconciling", self.subs, self.scheduled_subs) new_subs = self.scheduled_subs.difference(self.subs) await self._subscribe(new_subs) old_subs = self.subs.difference(self.scheduled_subs) await self._unsubscribe(old_subs) + self.subs = self.scheduled_subs self.subs = set(self.scheduled_subs) - print('reconciled') self.schedule_resub = False msg = await s.recv() msgJson = json.loads(msg) - if msgJson[0]['ev'] == 'status' and self.verbose: - print('status', msgJson[0]['message']) + if msgJson[0]["ev"] == "status" and self.verbose: + print("status", msgJson[0]["message"]) continue if not self.raw: msg = parse(msgJson) @@ -102,7 +158,7 @@ async def connect(self, processor, close_timeout=1, **kwargs): async def _subscribe(self, topics): if self.websocket is None or len(topics) == 0: return - topics = ','.join(topics) + topics = ",".join(topics) if self.verbose: print('subbing', topics) await self.websocket.send(json.dumps({"action":"subscribe","params":topics})) @@ -110,7 +166,7 @@ async def _subscribe(self, topics): async def _unsubscribe(self, topics): if self.websocket is None or len(topics) == 0: return - subs = ','.join(topics) + subs = ",".join(topics) if self.verbose: print('unsubbing', topics) await self.websocket.send(json.dumps({"action":"unsubscribe","params":subs})) @@ -165,10 +221,10 @@ def unsubscribe_all(self): async def close(self): if self.verbose: - print('closing') + print("closing") if self.websocket: await self.websocket.close() self.websocket = None else: - print('no websocket open to close') + print("no websocket open to close") diff --git a/polygon/websocket/models/__init__.py b/polygon/websocket/models/__init__.py index 55e5f844..1775e516 100644 --- a/polygon/websocket/models/__init__.py +++ b/polygon/websocket/models/__init__.py @@ -1 +1,2 @@ from .common import * +from .models import * diff --git a/polygon/websocket/models/common.py b/polygon/websocket/models/common.py index b49ad498..3c94d7a5 100644 --- a/polygon/websocket/models/common.py +++ b/polygon/websocket/models/common.py @@ -1,15 +1,31 @@ from enum import Enum + class Feed(Enum): - Delayed = "delayed.polygon.io" - RealTime = "socket.polygon.io" - Nasdaq = "nasdaqfeed.polygon.io" - PolyFeed = "polyfeed.polygon.io" - PolyFeedPlus = "polyfeedplus.polygon.io" + Delayed = "delayed.polygon.io" + RealTime = "socket.polygon.io" + Nasdaq = "nasdaqfeed.polygon.io" + PolyFeed = "polyfeed.polygon.io" + PolyFeedPlus = "polyfeedplus.polygon.io" + class Market(Enum): - Stocks = "stocks" - Options = "options" - Forex = "forex" - Crypto = "crypto" + Stocks = "stocks" + Options = "options" + Forex = "forex" + Crypto = "crypto" + +class EventType(Enum): + A = "A" + AM = "AM" + CA = "CA" + XA = "XA" + T = "T" + XT = "XT" + Q = "Q" + C = "C" + XQ = "XQ" + NOI = "NOI" + LULD = "LULD" + XL2 = "XL2" diff --git a/polygon/websocket/models/models.py b/polygon/websocket/models/models.py new file mode 100644 index 00000000..a8d25f3d --- /dev/null +++ b/polygon/websocket/models/models.py @@ -0,0 +1,295 @@ +from typing import Optional, List +from .common import EventType +from dataclasses import dataclass + + +@dataclass +class EquityAgg: + "EquityAgg contains aggregate data for either stock tickers or option contracts." + event_type: Optional[EventType] = None + symbol: Optional[str] = None + volume: Optional[float] = None + accumulated_volume: Optional[float] = None + official_open_price: Optional[float] = None + vwap: Optional[float] = None + open: Optional[float] = None + close: Optional[float] = None + high: Optional[float] = None + low: Optional[float] = None + aggregate_vwap: Optional[float] = None + average_size: Optional[float] = None + start_timestamp: Optional[int] = None + end_timestamp: Optional[int] = None + + @staticmethod + def from_dict(d): + return EquityAgg( + d.get("ev", None), + d.get("sym", None), + d.get("v", None), + d.get("av", None), + d.get("op", None), + d.get("vw", None), + d.get("o", None), + d.get("c", None), + d.get("h", None), + d.get("l", None), + d.get("a", None), + d.get("z", None), + d.get("s", None), + d.get("e", None), + ) + + +@dataclass +class CurrencyAgg: + "CurrencyAgg contains aggregate data for either forex currency pairs or crypto pairs." + event_type: Optional[EventType] = None + pair: Optional[str] = None + open: Optional[float] = None + close: Optional[float] = None + high: Optional[float] = None + low: Optional[float] = None + volume: Optional[float] = None + vwap: Optional[float] = None + start_timestamp: Optional[int] = None + end_timestamp: Optional[int] = None + avg_trade_size: Optional[int] = None + + @staticmethod + def from_dict(d): + return CurrencyAgg( + d.get("ev", None), + d.get("pair", None), + d.get("o", None), + d.get("c", None), + d.get("h", None), + d.get("l", None), + d.get("v", None), + d.get("vw", None), + d.get("s", None), + d.get("e", None), + d.get("z", None), + ) + + +@dataclass +class EquityTrade: + "EquityTrade contains trade data for either stock tickers or option contracts." + event_type: Optional[EventType] = None + symbol: Optional[str] = None + exchange: Optional[int] = None + id: Optional[str] = None + tape: Optional[int] = None + price: Optional[float] = None + size: Optional[int] = None + conditions: Optional[List[int]] = None + timestamp: Optional[int] = None + sequence_number: Optional[int] = None + + @staticmethod + def from_dict(d): + return EquityTrade( + d.get("ev", None), + d.get("sym", None), + d.get("x", None), + d.get("i", None), + d.get("z", None), + d.get("p", None), + d.get("s", None), + d.get("c", None), + d.get("t", None), + d.get("q", None), + ) + + +@dataclass +class CryptoTrade: + "CryptoTrade contains trade data for a crypto pair." + event_type: Optional[EventType] = None + symbol: Optional[str] = None + exchange: Optional[int] = None + id: Optional[str] = None + price: Optional[float] = None + size: Optional[float] = None + conditions: Optional[List[int]] = None + timestamp: Optional[int] = None + received_timestamp: Optional[int] = None + + @staticmethod + def from_dict(d): + return CryptoTrade( + d.get("ev", None), + d.get("sym", None), + d.get("x", None), + d.get("i", None), + d.get("p", None), + d.get("s", None), + d.get("c", None), + d.get("t", None), + d.get("r", None), + ) + + +@dataclass +class EquityQuote: + "EquityQuote contains quote data for either stock tickers or option contracts." + event_type: Optional[EventType] = None + symbol: Optional[str] = None + bid_exchange_id: Optional[int] = None + bid_price: Optional[float] = None + bid_size: Optional[int] = None + ask_exchange_id: Optional[int] = None + ask_price: Optional[float] = None + ask_size: Optional[int] = None + condition: Optional[int] = None + timestamp: Optional[int] = None + tape: Optional[int] = None + sequence_number: Optional[int] = None + + @staticmethod + def from_dict(d): + return EquityQuote( + d.get("ev", None), + d.get("sym", None), + d.get("bx", None), + d.get("bp", None), + d.get("bs", None), + d.get("ax", None), + d.get("ap", None), + d.get("as", None), + d.get("c", None), + d.get("t", None), + d.get("z", None), + d.get("q", None), + ) + + +@dataclass +class ForexQuote: + "ForexQuote contains quote data for a forex currency pair." + event_type: Optional[EventType] = None + pair: Optional[str] = None + exchange_id: Optional[int] = None + ask_price: Optional[float] = None + bid_price: Optional[float] = None + timestamp: Optional[int] = None + + @staticmethod + def from_dict(d): + return ForexQuote( + d.get("ev", None), + d.get("p", None), + d.get("x", None), + d.get("a", None), + d.get("b", None), + d.get("t", None), + ) + + +@dataclass +class CryptoQuote: + "CryptoQuote contains quote data for a crypto pair." + event_type: Optional[EventType] = None + pair: Optional[str] = None + bid_price: Optional[int] = None + bid_size: Optional[float] = None + ask_price: Optional[int] = None + ask_size: Optional[int] = None + timestamp: Optional[float] = None + exchange_id: Optional[int] = None + received_timestamp: Optional[int] = None + + @staticmethod + def from_dict(d): + return CryptoQuote( + d.get("ev", None), + d.get("pair", None), + d.get("bp", None), + d.get("bs", None), + d.get("ap", None), + d.get("as", None), + d.get("t", None), + d.get("x", None), + d.get("r", None), + ) + + +@dataclass +class Imbalance: + "Imbalance contains imbalance event data for a given stock ticker symbol." + event_type: Optional[EventType] = None + symbol: Optional[str] = None + time_stamp: Optional[int] = None + auction_time: Optional[int] = None + auction_type: Optional[str] = None + symbol_sequence: Optional[int] = None + exchange_id: Optional[int] = None + imbalance_quantity: Optional[int] = None + paired_quantity: Optional[int] = None + book_clearing_price: Optional[float] = None + + @staticmethod + def from_dict(d): + return Imbalance( + d.get("ev", None), + d.get("T", None), + d.get("t", None), + d.get("at", None), + d.get("a", None), + d.get("i", None), + d.get("x", None), + d.get("o", None), + d.get("p", None), + d.get("b", None), + ) + + +@dataclass +class LimitUpLimitDown: + "LimitUpLimitDown contains LULD event data for a given stock ticker symbol." + event_type: Optional[EventType] = None + symbol: Optional[str] = None + high_price: Optional[float] = None + low_price: Optional[float] = None + indicators: Optional[List[int]] = None + tape: Optional[int] = None + timestamp: Optional[int] = None + sequence_number: Optional[int] = None + + @staticmethod + def from_dict(d): + return LimitUpLimitDown( + d.get("ev", None), + d.get("T", None), + d.get("h", None), + d.get("l", None), + d.get("i", None), + d.get("z", None), + d.get("t", None), + d.get("q", None), + ) + + +@dataclass +class Level2Book: + "Level2Book contains level 2 book data for a given crypto pair." + event_type: Optional[EventType] = None + pair: Optional[str] = None + bid_prices: Optional[float] = None + ask_prices: Optional[float] = None + timestamp: Optional[int] = None + exchange_id: Optional[int] = None + received_timestamp: Optional[int] = None + + @staticmethod + def from_dict(d): + return Level2Book( + d.get("ev", None), + d.get("pair", None), + d.get("b", None), + d.get("a", None), + d.get("t", None), + d.get("x", None), + d.get("r", None), + )