-
Notifications
You must be signed in to change notification settings - Fork 3
/
client.py
203 lines (172 loc) · 5.44 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
'''Client for a cobra server.
Copyright (c) 2018-2019 Machine Zone, Inc. All rights reserved.
'''
import logging
import asyncio
import functools
import rapidjson as json
import websockets
from cobras.client.connection import (
Connection,
AuthException,
HandshakeException,
ActionException,
)
DEFAULT_CLIENT_WAIT_TIME = 1
async def client(url, creds, clientCallback, waitTime=None):
'''Main client. Does authenticate then invoke the clientCallback which
takes control.
'''
# Wait N seconds by default before retrying to connect after an error
if waitTime is None:
waitTime = DEFAULT_CLIENT_WAIT_TIME
while True:
try:
connection = Connection(url, creds)
await connection.connect()
return await clientCallback(connection)
except TimeoutError as e:
logging.error(e)
await asyncio.sleep(waitTime)
pass
except ConnectionRefusedError as e:
logging.error(e)
await asyncio.sleep(waitTime)
pass
except ConnectionResetError as e:
logging.error(e)
await asyncio.sleep(waitTime)
pass
except websockets.exceptions.ConnectionClosedOK as e:
logging.error(e)
await asyncio.sleep(waitTime)
pass
except websockets.exceptions.ConnectionClosedError as e:
logging.error(e)
await asyncio.sleep(waitTime)
pass
except websockets.exceptions.InvalidMessage as e:
logging.error(e)
await asyncio.sleep(waitTime)
pass
except OSError as e:
logging.error(e)
await asyncio.sleep(waitTime)
pass
except EOFError as e:
logging.error(e)
await asyncio.sleep(waitTime)
pass
except HandshakeException as e:
logging.error(e)
await asyncio.sleep(waitTime)
pass
except AuthException as e:
logging.error(e)
await asyncio.sleep(waitTime)
pass
except ActionException as e:
logging.error(e)
await asyncio.sleep(waitTime)
pass
async def subscribeHandler(connection, **args):
channel = args['channel']
position = args['position']
fsqlFilter = args['fsqlFilter']
messageHandlerClass = args['messageHandlerClass']
messageHandlerArgs = args['messageHandlerArgs']
subscriptionId = args.get('subscription_id', channel)
messageHandlerArgs['subscription_id'] = subscriptionId
resumeFromLastPosition = args['resumeFromLastPosition']
resumeFromLastPositionId = args['resumeFromLastPositionId']
batchSize = args['batchSize']
return await connection.subscribe(
channel,
position,
fsqlFilter,
messageHandlerClass,
messageHandlerArgs,
subscriptionId,
resumeFromLastPosition,
resumeFromLastPositionId,
batchSize,
)
async def subscribeClient(
url,
credentials,
channel,
position,
fsqlFilter,
messageHandlerClass,
messageHandlerArgs,
waitTime=None,
resumeFromLastPosition=False,
resumeFromLastPositionId=None,
batchSize=1,
):
subscribeHandlerPartial = functools.partial(
subscribeHandler,
channel=channel,
position=position,
fsqlFilter=fsqlFilter,
messageHandlerClass=messageHandlerClass,
messageHandlerArgs=messageHandlerArgs,
resumeFromLastPosition=resumeFromLastPosition,
resumeFromLastPositionId=resumeFromLastPositionId,
batchSize=batchSize,
)
ret = await client(url, credentials, subscribeHandlerPartial, waitTime)
return ret
async def unsafeSubcribeClient(
url,
credentials,
channel,
position,
fsqlFilter,
messageHandlerClass,
messageHandlerArgs,
resumeFromLastPosition=False,
resumeFromLastPositionId=None,
batchSize=1,
):
'''
No retry or exception handling
Used by the health check, where we want to die hard and fast if there's a problem
'''
connection = Connection(url, credentials)
await connection.connect()
try:
message = await connection.subscribe(
channel,
position,
fsqlFilter,
messageHandlerClass,
messageHandlerArgs,
subscriptionId=channel,
resumeFromLastPosition=resumeFromLastPosition,
resumeFromLastPositionId=resumeFromLastPositionId,
batchSize=batchSize,
)
except Exception:
message = None
return message
async def readHandler(websocket, **args):
position = args.get('position')
channel = args.get('channel')
handler = args.get('handler')
readPdu = {"action": "rtm/read", "body": {"channel": channel}, "id": 3} # FIXME
if position is not None:
readPdu['body']['position'] = position
print(f"> {readPdu}")
await websocket.send(json.dumps(readPdu))
readResponse = await websocket.recv()
print(f"< {readResponse}")
data = json.loads(readResponse)
msg = data['body']['message'] # FIXME data missing
await handler(msg)
async def readClient(url, credentials, channel, position, handler):
readHandlerPartial = functools.partial(
readHandler, channel=channel, position=position, handler=handler
)
ret = await client(url, credentials, readHandlerPartial)
return ret