Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion polygon/websocket/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .base import WebsocketBaseClient


class WebSocketClient(WebsocketBaseClient):
pass

92 changes: 74 additions & 18 deletions polygon/websocket/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
import os
from enum import Enum
from typing import Optional, Union, List
from .models import Feed, Market

from .models import (
Feed,
Market,
EquityAgg,
CurrencyAgg,
EquityTrade,
CryptoTrade,
EquityQuote,
ForexQuote,
CryptoQuote,
Imbalance,
LimitUpLimitDown,
Level2Book,
)
import websockets
import json
import inspect
Expand All @@ -10,6 +24,46 @@
env_key = "POLYGON_API_KEY"


def parse_single(data):
event_type = data["ev"]
if event_type == "A":
return EquityAgg.from_dict(data)
elif event_type == "AM":
return EquityAgg.from_dict(data)
elif event_type == "CA":
return CurrencyAgg.from_dict(data)
elif event_type == "XA":
return CurrencyAgg.from_dict(data)
elif event_type == "T":
return EquityTrade.from_dict(data)
elif event_type == "XT":
return CryptoTrade.from_dict(data)
elif event_type == "Q":
return EquityQuote.from_dict(data)
elif event_type == "C":
return ForexQuote.from_dict(data)
elif event_type == "XQ":
return CryptoQuote.from_dict(data)
elif event_type == "NOI":
return Imbalance.from_dict(data)
elif event_type == "LULD":
return LimitUpLimitDown.from_dict(data)
elif event_type == "XL2":
return Level2Book.from_dict(data)
return None


def parse(msg):
res = []
for m in msg:
parsed = parse_single(m)
if parsed is None:
print("bad message", m)
else:
res.append(parsed)
return res


class WebsocketBaseClient:
def __init__(
self,
Expand All @@ -20,7 +74,7 @@ def __init__(
verbose: bool = False,
subscriptions: List[str] = [],
max_reconnects: Optional[int] = 5,
**kwargs
**kwargs,
):
if api_key is None:
raise Exception(
Expand All @@ -43,42 +97,44 @@ def __init__(
self.websocket = None
self.scheduled_subs = set(subscriptions)
self.schedule_resub = True
print('subscriptions', subscriptions)
print('scheduled_subs', subscriptions)
print("subscriptions", subscriptions)
print("scheduled_subs", subscriptions)

# https://websockets.readthedocs.io/en/stable/reference/client.html#opening-a-connection
async def connect(self, processor, close_timeout=1, **kwargs):
reconnects = 0
isasync = inspect.iscoroutinefunction(processor)
if self.verbose:
print('connect', self.url)
async for s in websockets.connect(self.url, close_timeout=close_timeout, **kwargs):
print("connect", self.url)
async for s in websockets.connect(
self.url, close_timeout=close_timeout, **kwargs
):
self.websocket = s
try:
msg = await s.recv()
if self.verbose:
print('connected', msg)
print("connected", msg)
if self.verbose:
print('authing')
await s.send(json.dumps({"action":"auth","params":self.api_key}))
print("authing")
await s.send(json.dumps({"action": "auth", "params": self.api_key}))
msg = await s.recv()
if self.verbose:
print('authed', msg)
print("authed", msg)
while True:
if self.schedule_resub:
print('reconciling', self.subs, self.scheduled_subs)
print("reconciling", self.subs, self.scheduled_subs)
new_subs = self.scheduled_subs.difference(self.subs)
await self._subscribe(new_subs)
old_subs = self.subs.difference(self.scheduled_subs)
await self._unsubscribe(old_subs)
self.subs = self.scheduled_subs
self.subs = set(self.scheduled_subs)
print('reconciled')
self.schedule_resub = False

msg = await s.recv()
msgJson = json.loads(msg)
if msgJson[0]['ev'] == 'status' and self.verbose:
print('status', msgJson[0]['message'])
if msgJson[0]["ev"] == "status" and self.verbose:
print("status", msgJson[0]["message"])
continue
if not self.raw:
msg = parse(msgJson)
Expand All @@ -102,15 +158,15 @@ async def connect(self, processor, close_timeout=1, **kwargs):
async def _subscribe(self, topics):
if self.websocket is None or len(topics) == 0:
return
topics = ','.join(topics)
topics = ",".join(topics)
if self.verbose:
print('subbing', topics)
await self.websocket.send(json.dumps({"action":"subscribe","params":topics}))

async def _unsubscribe(self, topics):
if self.websocket is None or len(topics) == 0:
return
subs = ','.join(topics)
subs = ",".join(topics)
if self.verbose:
print('unsubbing', topics)
await self.websocket.send(json.dumps({"action":"unsubscribe","params":subs}))
Expand Down Expand Up @@ -165,10 +221,10 @@ def unsubscribe_all(self):

async def close(self):
if self.verbose:
print('closing')
print("closing")

if self.websocket:
await self.websocket.close()
self.websocket = None
else:
print('no websocket open to close')
print("no websocket open to close")
1 change: 1 addition & 0 deletions polygon/websocket/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .common import *
from .models import *
34 changes: 25 additions & 9 deletions polygon/websocket/models/common.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
from enum import Enum


class Feed(Enum):
Delayed = "delayed.polygon.io"
RealTime = "socket.polygon.io"
Nasdaq = "nasdaqfeed.polygon.io"
PolyFeed = "polyfeed.polygon.io"
PolyFeedPlus = "polyfeedplus.polygon.io"
Delayed = "delayed.polygon.io"
RealTime = "socket.polygon.io"
Nasdaq = "nasdaqfeed.polygon.io"
PolyFeed = "polyfeed.polygon.io"
PolyFeedPlus = "polyfeedplus.polygon.io"


class Market(Enum):
Stocks = "stocks"
Options = "options"
Forex = "forex"
Crypto = "crypto"
Stocks = "stocks"
Options = "options"
Forex = "forex"
Crypto = "crypto"


class EventType(Enum):
A = "A"
AM = "AM"
CA = "CA"
XA = "XA"
T = "T"
XT = "XT"
Q = "Q"
C = "C"
XQ = "XQ"
NOI = "NOI"
LULD = "LULD"
XL2 = "XL2"
Loading