forked from dxFeed/dxfeed-python-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathendpoint.py
108 lines (88 loc) · 3.68 KB
/
endpoint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from dxfeed.core import DXFeedPy as dxp
from dxfeed.wrappers.subscription import Subscription
from datetime import datetime
from typing import Union
class Endpoint(object):
"""
Class for connection management. After successful creation Instance will be connected to server
with provided credentials
Attributes
----------
connection_status: str
Status of current connection
address: str
Current connection endpoint address
"""
def __init__(self, connection_address: str = 'demo.dxfeed.com:7300', connect: bool = True):
"""
Parameters
----------
connection_address: str
One of possible connection addresses:
- the single address: `host:port` or just `host`
- address with credentials: `host:port[username=xxx,password=yyy]`
- multiple addresses: `(host1:port1)(host2)(host3:port3[username=xxx,password=yyy])`
Default: demo.dxfeed.com:7300
connect: bool
When True `connect` method is called during instance creation. Default - True
"""
self.__con_address = connection_address
self.__connection = dxp.ConnectionClass()
if connect:
self.connect()
def __del__(self):
self.close_connection()
@property
def connection_status(self):
return dxp.dxf_get_current_connection_status(self.__connection, return_str=True)
@property
def address(self):
return self.__con_address
def connect(self, reconnect: bool = True):
"""
Creates connection. If connection status differs from "Not connected" and `reconnect` is False, does nothing
Parameters
----------
reconnect: bool
When True closes previous connection. Default - True
Returns
-------
self: Endpoint
"""
if reconnect:
dxp.dxf_close_connection(self.__connection)
con_status = dxp.dxf_get_current_connection_status(self.__connection, return_str=True)
if con_status == 'Not connected':
self.__connection = dxp.dxf_create_connection(self.address)
return self
def create_subscription(self, event_type: str, date_time: Union[str, datetime] = None):
"""
Method creates certain event type subscription and returns Subscription class
Parameters
----------
event_type: str
One of possible event types: 'Trade', 'Quote', 'Summary', 'Profile', 'Order', 'TimeAndSale', 'Candle',
'TradeETH', 'SpreadOrder', 'Greeks', 'TheoPrice', 'Underlying', 'Series', 'Configuration' or ''
date_time: str or datetime.datetime
If present timed subscription will be created (conflated stream). For sting date format is following:
%Y-%m-%d %H:%M:%S.%f. If None - stream subscription will be created. Default - None.
Note
----
Some event types (e.g. Candle) support only timed subscription.
Returns
-------
subscription: Subscription
Subscription class related to current connection
"""
con_status = dxp.dxf_get_current_connection_status(self.__connection, return_str=False)
if con_status == 0 or con_status == 2:
raise ValueError('Connection is not established')
subscription = Subscription(connection=self.__connection,
event_type=event_type,
date_time=date_time)
return subscription
def close_connection(self):
"""
Method to close connections and all related subscriptions.
"""
dxp.dxf_close_connection(self.__connection)