-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
245 lines (200 loc) · 8.52 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# standard
from sys import stdout
from asyncio import run, Future, Protocol, sleep, get_event_loop, new_event_loop, set_event_loop
from threading import Thread
from json import loads, dumps
from logging import getLogger, DEBUG, StreamHandler, Formatter, LoggerAdapter
from logging.handlers import TimedRotatingFileHandler
# pypi
from websockets import connect, ConnectionClosed
from websockets.server import serve
from serial_asyncio import create_serial_connection
class ReaderClosed(Exception): pass
log = getLogger('tm-csv-connector')
log.setLevel(DEBUG)
handler = StreamHandler(stdout)
handler.setLevel(DEBUG)
formatter = Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
# # this should be configurable
# getLogger('websockets').setLevel(DEBUG)
backenduri = 'ws://tm.localhost:8080/tm_reader'
PRIMARY = b'\x17'
SELECT = b'\x14'
# connection status is global -- is there any way to get a class status from an async protocol?
connected = False
# stop_reader flag
stop_reader = False
# queue messages from input protocol for sending to backend
queued_msgs = []
# save latest raceid
raceid = 0
class LoggerAdapter(LoggerAdapter):
"""Add connection ID and client IP address to websockets logs."""
def process(self, msg, kwargs):
try:
websocket = kwargs["extra"]["websocket"]
except KeyError:
return msg, kwargs
xff = websocket.request_headers.get("X-Forwarded-For")
return f"{websocket.id} {xff} {msg}", kwargs
class InputChunkProtocol(Protocol):
"""adapted from https://pyserial-asyncio.readthedocs.io/en/latest/shortintro.html#serial-transports-protocols-and-streams
handle serial protocol input
Args:
Protocol (asyncio.Protocol):
"""
def __init__(self):
super().__init__()
self.log_handler = None
global connected
connected = False
'''read from time machine, adapted from
https://pyserial-asyncio.readthedocs.io/en/latest/shortintro.html#reading-data-in-chunks'''
def connection_made(self, transport):
self.transport = transport
self.residual = b''
global connected
connected = True
# with connect(backenduri) as websocket:
# await self.send_to_backend(websocket, {'opcode': 'connected'})
def connection_lost(self, exc: Exception | None) -> None:
global connected
connected = False
# with connect(backenduri) as websocket:
# await self.send_to_backend(websocket, {'opcode': 'disconnected'})
return super().connection_lost(exc)
def data_received(self, data):
log.debug(f'data received: {data}')
msgs = data.split(b'\r\n')
# update first part of data with residual, making sure there's at least one item in msgs
if len(msgs) > 0:
msgs[0] = self.residual + msgs[0]
else:
msgs = [self.residual]
# last part is saved for later, may be empty, don't send to back end
# note the residual may be the only item in msgs
self.residual = msgs.pop()
if self.residual:
log.debug(f'residual: {self.residual}')
# don't connect if nothing to send
if msgs:
# connect to websocket, send each relevant message to the back end
# relevant message format on p3-1 of Time Machine User Manual
# assumes cross-country mode is used
for msg in msgs:
try:
log.debug(f'msg processed: {msg}')
# split message into parts
control = msg[0:1]
if control in [PRIMARY, SELECT]:
pos = int(msg[8:13])
time = msg[13:24].decode()
# check control character; queue message for handling in the background
global queued_msgs
if control == PRIMARY:
queued_msgs.append({'opcode': 'primary', 'raceid': raceid, 'pos': pos, 'time': time})
elif control == SELECT:
bib = int(msg[27:32].decode())
queued_msgs.append({'opcode': 'select', 'raceid': raceid, 'pos': pos, 'time': time, 'bibno': bib})
except ValueError:
log.error(f'could not decode message: {msg}')
# stop callbacks again immediately
self.pause_reading()
def pause_reading(self):
# This will stop the callbacks to data_received
self.transport.pause_reading()
def resume_reading(self):
# This will start the callbacks to data_received again with all data that has been received in the meantime.
self.transport.resume_reading()
def set_logging_path(self, logging_path):
self.logging_path = logging_path
log.error(f'need to set logging path in logger')
async def send_to_backend(websocket, data):
"""send data to backend
Args:
websocket (websocket): websocket to send to
data (dict): dict to serialize and send
"""
sending = dumps(data)
log.debug(f'sending: {sending}')
await websocket.send(sending)
async def reader(port, logging_path):
log.debug(f'async reader started with port {port}')
readloop = get_event_loop()
transport, protocol = await create_serial_connection(readloop, InputChunkProtocol, port)
protocol.set_logging_path(logging_path)
# ref https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html
# ref https://websockets.readthedocs.io/en/stable/topics/logging.html
async for websocket in connect(backenduri, logger=LoggerAdapter(getLogger("websockets.client"))):
log.debug(f'websocket to backend connected')
try:
while True:
global stop_reader
if stop_reader:
log.debug('reader stopped')
stop_reader = False
transport.close()
await websocket.close()
raise ReaderClosed
await sleep(0.3)
# send any queued messages
global queued_msgs
if queued_msgs:
while len(queued_msgs) > 0:
msg = queued_msgs.pop(0)
await send_to_backend(websocket, msg)
protocol.resume_reading()
except ConnectionClosed:
log.debug(f'websocket to backend closed, reconnecting')
continue
except ReaderClosed:
return
def reader_thread(port, logging_path):
log.debug(f'in reader_thread')
readloop = new_event_loop()
set_event_loop(readloop)
# readloop.run_until_complete(reader(port, logging_path))
# readloop.close()
run(reader(port, logging_path))
log.debug('exiting reader_thread()')
async def controller(websocket):
"""server for control commands
Args:
websocket (websocket): websocket from backend client
"""
async for message in websocket:
event = loads(message)
opcode = event['opcode']
# just wanna know what's going on
if opcode in ['open', 'close', 'raceid']:
log.debug(f'received {event}')
# backend opened the connection
if opcode == 'open':
port = event['port']
logging_path = event['loggingpath']
readloop_threadid = Thread(target=reader_thread, args=(port, logging_path)).start()
# readloop = get_event_loop()
# readloop.run_until_complete(reader(port, logging_path))
log.debug('returned from Thread')
# backend closed the connection
elif opcode == 'close':
# readloop = get_event_loop()
# readloop.stop()
# readloop.close()
# readloop = None
global stop_reader
stop_reader = True
# raceid updated from backend
elif opcode == 'raceid':
global raceid
raceid = event['raceid']
# backend wants to know if we're connected to time machine
elif opcode == 'is_connected':
await websocket.send(dumps({'connected': connected}))
async def main():
async with serve(controller, host="localhost", port=8081):
await Future() # run forever
if __name__ == "__main__":
run(main())