-
-
Notifications
You must be signed in to change notification settings - Fork 27
/
server.py
577 lines (474 loc) · 19.4 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
"""Server API.
This module currently only implements `OSCThreadServer`, a thread based server.
"""
from threading import Thread
import os
import re
import inspect
from sys import platform
from time import sleep, time
from functools import partial
from select import select
import socket
from oscpy import __version__
from oscpy.parser import read_packet, UNICODE
from oscpy.client import send_bundle, send_message
from oscpy.stats import Stats
def ServerClass(cls):
"""Decorate classes with for methods implementing OSC endpoints.
This decorator is necessary on your class if you want to use the
`address_method` decorator on its methods, see
`:meth:OSCThreadServer.address_method`'s documentation.
"""
cls_init = cls.__init__
def __init__(self, *args, **kwargs):
cls_init(self, *args, **kwargs)
for m in dir(self):
meth = getattr(self, m)
if hasattr(meth, '_address'):
server, address, sock, get_address = meth._address
server.bind(address, meth, sock, get_address=get_address)
cls.__init__ = __init__
return cls
class OSCThreadServer(object):
"""A thread-based OSC server.
Listens for osc messages in a thread, and dispatches the messages
values to callbacks from there.
The '/_oscpy/' namespace is reserved for metadata about the OSCPy
internals, please see package documentation for further details.
"""
def __init__(
self, drop_late_bundles=False, timeout=0.01, advanced_matching=False,
encoding='', encoding_errors='strict', default_handler=None
):
"""Create an OSCThreadServer.
- `timeout` is a number of seconds used as a time limit for
select() calls in the listening thread, optiomal, defaults to
0.01.
- `drop_late_bundles` instruct the server not to dispatch calls
from bundles that arrived after their timetag value.
(optional, defaults to False)
- `advanced_matching` (defaults to False), setting this to True
activates the pattern matching part of the specification, let
this to False if you don't need it, as it triggers a lot more
computation for each received message.
- `encoding` if defined, will be used to encode/decode all
strings sent/received to/from unicode/string objects, if left
empty, the interface will only accept bytes and return bytes
to callback functions.
- `encoding_errors` if `encoding` is set, this value will be
used as `errors` parameter in encode/decode calls.
- `default_handler` if defined, will be used to handle any
message that no configured address matched, the received
arguments will be (address, *values).
"""
self.addresses = {}
self.sockets = []
self.timeout = timeout
self.default_socket = None
self.drop_late_bundles = drop_late_bundles
self.advanced_matching = advanced_matching
self.encoding = encoding
self.encoding_errors = encoding_errors
self.default_handler = default_handler
self.stats_received = Stats()
self.stats_sent = Stats()
t = Thread(target=self._listen)
t.daemon = True
t.start()
self._smart_address_cache = {}
self._smart_part_cache = {}
def bind(self, address, callback, sock=None, get_address=False):
"""Bind a callback to an osc address.
A socket in the list of existing sockets of the server can be
given. If no socket is provided, the default socket of the
server is used, if no default socket has been defined, a
RuntimeError is raised.
Multiple callbacks can be bound to the same address.
"""
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')
if isinstance(address, UNICODE) and self.encoding:
address = address.encode(
self.encoding, errors=self.encoding_errors)
if self.advanced_matching:
address = self.create_smart_address(address)
callbacks = self.addresses.get((sock, address), [])
cb = (callback, get_address)
if cb not in callbacks:
callbacks.append(cb)
self.addresses[(sock, address)] = callbacks
def create_smart_address(self, address):
"""Create an advanced matching address from a string.
The address will be split by '/' and each part will be converted
into a regexp, using the rules defined in the OSC specification.
"""
cache = self._smart_address_cache
if address in cache:
return cache[address]
else:
parts = address.split(b'/')
smart_parts = tuple(
re.compile(self._convert_part_to_regex(part)) for part in parts
)
cache[address] = smart_parts
return smart_parts
def _convert_part_to_regex(self, part):
cache = self._smart_part_cache
if part in cache:
return cache[part]
else:
r = [b'^']
for i, _ in enumerate(part):
# getting a 1 char byte string instead of an int in
# python3
c = part[i:i + 1]
if c == b'?':
r.append(b'.')
elif c == b'*':
r.append(b'.*')
elif c == b'[':
r.append(b'[')
elif c == b'!' and r and r[-1] == b'[':
r.append(b'^')
elif c == b']':
r.append(b']')
elif c == b'{':
r.append(b'(')
elif c == b',':
r.append(b'|')
elif c == b'}':
r.append(b')')
else:
r.append(c)
r.append(b'$')
smart_part = re.compile(b''.join(r))
cache[part] = smart_part
return smart_part
def unbind(self, address, callback, sock=None):
"""Unbind a callback from an OSC address.
See `bind` for `sock` documentation.
"""
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')
callbacks = self.addresses.get((sock, address), [])
to_remove = []
for cb in callbacks:
if cb[0] == callback:
to_remove.append(cb)
while to_remove:
callbacks.remove(to_remove.pop())
self.addresses[(sock, address)] = callbacks
def listen(
self, address='localhost', port=0, default=False, family='inet'
):
"""Start listening on an (address, port).
- if `port` is 0, the system will allocate a free port
- if `default` is True, the instance will save this socket as the
default one for subsequent calls to methods with an optional socket
- `family` accepts the 'unix' and 'inet' values, a socket of the
corresponding type will be created.
If family is 'unix', then the address must be a filename, the
`port` value won't be used. 'unix' sockets are not defined on
Windows.
The socket created to listen is returned, and can be used later
with methods accepting the `sock` parameter.
"""
if family == 'unix':
family_ = socket.AF_UNIX
elif family == 'inet':
family_ = socket.AF_INET
else:
raise ValueError(
"Unknown socket family, accepted values are 'unix' and 'inet'"
)
sock = socket.socket(family_, socket.SOCK_DGRAM)
if family == 'unix':
addr = address
else:
addr = (address, port)
sock.bind(addr)
# sock.setblocking(0)
self.sockets.append(sock)
if default and not self.default_socket:
self.default_socket = sock
elif default:
raise RuntimeError(
'Only one default socket authorized! Please set '
'default=False to other calls to listen()'
)
self.bind_meta_routes(sock)
return sock
def close(self, sock=None):
"""Close a socket opened by the server."""
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')
if platform != 'win32' and sock.family == socket.AF_UNIX:
os.unlink(sock.getsockname())
else:
sock.close()
if sock == self.default_socket:
self.default_socket = None
def getaddress(self, sock=None):
"""Wrap call to getsockname.
If `sock` is None, uses the default socket for the server.
Returns (ip, port) for an inet socket, or filename for an unix
socket.
"""
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')
return sock.getsockname()
def stop(self, s=None):
"""Close and remove a socket from the server's sockets.
If `sock` is None, uses the default socket for the server.
"""
if not s and self.default_socket:
s = self.default_socket
if s in self.sockets:
read = select([s], [], [], 0)
s.close()
if s in read:
s.recvfrom(65535)
self.sockets.remove(s)
else:
raise RuntimeError('{} is not one of my sockets!'.format(s))
def stop_all(self):
"""Call stop on all the existing sockets."""
for s in self.sockets[:]:
self.stop(s)
def _listen(self):
"""(internal) Busy loop to listen for events.
This method is called in a thread by the `listen` method, and
will be the one actually listening for messages on the server's
sockets, and calling the callbacks when messages are received.
"""
match = self._match_address
advanced_matching = self.advanced_matching
addresses = self.addresses
stats = self.stats_received
while True:
drop_late = self.drop_late_bundles
if not self.sockets:
sleep(.01)
continue
else:
try:
read, write, error = select(self.sockets, [], [], self.timeout)
except (ValueError, socket.error):
continue
for sender_socket in read:
data, sender = sender_socket.recvfrom(65535)
for address, tags, values, offset in read_packet(
data, drop_late=drop_late, encoding=self.encoding,
encoding_errors=self.encoding_errors
):
stats.calls += 1
stats.bytes += offset
stats.params += len(values)
stats.types.update(tags)
matched = False
if advanced_matching:
for sock, addr in addresses:
if sock == sender_socket and match(addr, address):
matched = True
for cb, get_address in addresses[(sock, addr)]:
if get_address:
cb(address, *values)
else:
cb(*values)
else:
if (sender_socket, address) in addresses:
matched = True
for cb, get_address in addresses.get(
(sender_socket, address), []
):
if get_address:
cb(address, *values)
else:
cb(*values)
if not matched and self.default_handler:
self.default_handler(address, *values)
@staticmethod
def _match_address(smart_address, target_address):
"""(internal) Check if provided `smart_address` matches address.
A `smart_address` is a list of regexps to match
against the parts of the `target_address`.
"""
target_parts = target_address.split(b'/')
if len(target_parts) != len(smart_address):
return False
return all(
model.match(part)
for model, part in
zip(smart_address, target_parts)
)
def send_message(
self, osc_address, values, ip_address, port, sock=None, safer=False
):
"""Shortcut to the client's `send_message` method.
Use the default_socket of the server by default.
See `client.send_message` for more info about the parameters.
"""
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')
stats = send_message(
osc_address,
values,
ip_address,
port,
sock=sock,
safer=safer,
encoding=self.encoding,
encoding_errors=self.encoding_errors
)
self.stats_sent += stats
return stats
def send_bundle(
self, messages, ip_address, port, timetag=None, sock=None, safer=False
):
"""Shortcut to the client's `send_bundle` method.
Use the `default_socket` of the server by default.
See `client.send_bundle` for more info about the parameters.
"""
if not sock and self.default_socket:
sock = self.default_socket
elif not sock:
raise RuntimeError('no default socket yet and no socket provided')
stats = send_bundle(
messages,
ip_address,
port,
sock=sock,
safer=safer,
encoding=self.encoding,
encoding_errors=self.encoding_errors
)
self.stats_sent += stats
return stats
def answer(
self, address=None, values=None, bundle=None, timetag=None,
safer=False, port=None
):
"""Answers a message or bundle to a client.
This method can only be called from a callback, it will lookup
the sender of the packet that triggered the callback, and send
the given message or bundle to it.
`timetag` is only used if `bundle` is True.
See `send_message` and `send_bundle` for info about the parameters.
Only one of `values` or `bundle` should be defined, if `values`
is defined, `send_message` is used with it, if `bundle` is
defined, `send_bundle` is used with its value.
"""
if not values:
values = []
frames = inspect.getouterframes(inspect.currentframe())
for frame, filename, line, function, lines, index in frames:
if function == '_listen' and __file__.startswith(filename):
break
else:
raise RuntimeError('answer() not called from a callback')
ip_address, response_port = frame.f_locals.get('sender')
if port is not None:
response_port = port
sock = frame.f_locals.get('sender_socket')
if bundle:
return self.send_bundle(
bundle, ip_address, response_port, timetag=timetag, sock=sock,
safer=safer
)
else:
return self.send_message(
address, values, ip_address, response_port, sock=sock
)
def address(self, address, sock=None, get_address=False):
"""Decorate functions to bind them from their definition.
`address` is the osc address to bind to the callback.
if `get_address` is set to True, the first parameter the
callback will receive will be the address that matched (useful
with advanced matching).
example:
server = OSCThreadServer()
server.listen('localhost', 8000, default=True)
@server.address(b'/printer')
def printer(values):
print(values)
send_message(b'/printer', [b'hello world'])
note:
This won't work on methods as it'll call them as normal
functions, and the callback won't get a `self` argument.
To bind a method use the `address_method` decorator.
"""
def decorator(callback):
self.bind(address, callback, sock, get_address=get_address)
return callback
return decorator
def address_method(self, address, sock=None, get_address=False):
"""Decorate methods to bind them from their definition.
The class defining the method must itself be decorated with the
`ServerClass` decorator, the methods will be bound to the
address when the class is instantiated.
See `address` for more information about the parameters.
example:
osc = OSCThreadServer()
osc.listen(default=True)
@ServerClass
class MyServer(object):
@osc.address_method(b'/test')
def success(self, *args):
print("success!", args)
"""
def decorator(decorated):
decorated._address = (self, address, sock, get_address)
return decorated
return decorator
def bind_meta_routes(self, sock=None):
"""This module implements osc routes to probe the internal state of a
live OSCPy server. These routes are placed in the /_oscpy/ namespace,
and provide information such as the version, the existing routes, and
usage statistics of the server over time.
These requests will be sent back to the client's address/port that sent
them, with the osc address suffixed with '/answer'.
examples:
'/_oscpy/version' -> '/_oscpy/version/answer'
'/_oscpy/stats/received' -> '/_oscpy/stats/received/answer'
messages to these routes require a port number as argument, to
know to which port to send to.
"""
self.bind(b'/_oscpy/version', self._get_version, sock=sock)
self.bind(b'/_oscpy/routes', self._get_routes, sock=sock)
self.bind(b'/_oscpy/stats/received', self._get_stats_received, sock=sock)
self.bind(b'/_oscpy/stats/sent', self._get_stats_sent, sock=sock)
def _get_version(self, port, *args):
self.answer(
b'/_oscpy/version/answer',
(__version__, ),
port=port
)
def _get_routes(self, port, *args):
self.answer(
b'/_oscpy/routes/answer',
[a[1] for a in self.addresses],
port=port
)
def _get_stats_received(self, port, *args):
self.answer(
b'/_oscpy/stats/received/answer',
self.stats_received.to_tuple(),
port=port
)
def _get_stats_sent(self, port, *args):
self.answer(
b'/_oscpy/stats/sent/answer',
self.stats_sent.to_tuple(),
port=port
)