-
Notifications
You must be signed in to change notification settings - Fork 3
/
websockets.py
100 lines (73 loc) · 2.59 KB
/
websockets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from argparse import ArgumentParser
from functools import partial
import socket
from pyramid.path import DottedNameResolver
import tornado.ioloop
import tornado.web
import tornado.websocket
import zmq
from zmq.eventloop import ioloop, zmqstream
from mozsvc.config import load_into_settings
SOCKETS = {}
CONNECTIONS = 0
class SocketHandler(tornado.websocket.WebSocketHandler):
token = None
def open(self):
global CONNECTIONS
CONNECTIONS += 1
def on_message(self, message):
if message.startswith('token: '):
self.token = message.split(' ', 1)[-1]
SOCKETS[self.token] = self
def on_close(self):
global CONNECTIONS
CONNECTIONS = max(CONNECTIONS - 1, 0)
if self.token and self.token in SOCKETS:
del SOCKETS[self.token]
class Push(object):
def __init__(self, stream):
stream.on_recv(self.recv)
def recv(self, msg):
key, token, data = msg
self.send(token, data)
def send(self, token, data):
global CONNECTIONS
if token in SOCKETS:
try:
SOCKETS[token].write_message(data)
except Exception:
del SOCKETS[token]
CONNECTIONS = max(CONNECTIONS - 1, 0)
def report_status(storage, ip):
storage.add_edge_node(ip, CONNECTIONS)
application = tornado.web.Application([
('.*', SocketHandler),
])
def main(): # pragma: no cover
parser = ArgumentParser('Pubsub listener pushing to websockets.')
parser.add_argument('config', help='path to the config file')
args, settings = parser.parse_args(), {}
load_into_settings(args.config, settings)
config = settings['config']
ioloop.install()
sub_socket = zmq.Context().socket(zmq.SUB)
sub_socket.connect(config.get('zeromq', 'sub'))
sub_socket.setsockopt(zmq.SUBSCRIBE, 'PUSH')
print 'SUB sub_socket on', config.get('zeromq', 'sub')
loop = ioloop.IOLoop.instance()
port = config.get('websockets', 'port')
Push(zmqstream.ZMQStream(sub_socket, loop))
application.listen(port)
print 'websockets on :%s' % port
# Send a status report every 10 seconds.
cfg = config.get_map('storage')
storage = DottedNameResolver(None).resolve(cfg.pop('backend'))(**cfg)
ip = '%s:%s' % (socket.gethostbyname(socket.getfqdn()), port)
callback = partial(report_status, storage, ip)
period = config.get('monitor', 'period')
ioloop.PeriodicCallback(callback, period * 1000).start()
# Get in the pool right away.
callback()
loop.start()
if __name__ == '__main__':
main() # pragma: no cover