-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
controller.py
139 lines (111 loc) · 3.98 KB
/
controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python
"""A script to launch a controller with all its queues and connect it to a logger"""
import time
import logging
import zmq
from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
from zmq.log import handlers
from IPython.zmq import log
from IPython.zmq.parallel import controller, heartmonitor, streamsession as session
def setup():
"""setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat"""
ctx = zmq.Context()
loop = ioloop.IOLoop.instance()
# port config
# config={}
execfile('config.py', globals())
iface = config['interface']
logport = config['logport']
rport = config['regport']
cport = config['clientport']
cqport = config['cqueueport']
eqport = config['equeueport']
ctport = config['ctaskport']
etport = config['etaskport']
ccport = config['ccontrolport']
ecport = config['econtrolport']
hport = config['heartport']
nport = config['notifierport']
# setup logging
lsock = ctx.socket(zmq.PUB)
lsock.connect('%s:%i'%(iface,logport))
# connected=False
# while not connected:
# try:
# except:
# logport = logport + 1
# else:
# connected=True
#
handler = handlers.PUBHandler(lsock)
handler.setLevel(logging.DEBUG)
handler.root_topic = "controller"
log.logger.addHandler(handler)
time.sleep(.5)
### Engine connections ###
# Engine registrar socket
reg = ZMQStream(ctx.socket(zmq.XREP), loop)
reg.bind("%s:%i"%(iface, rport))
# heartbeat
hpub = ctx.socket(zmq.PUB)
hpub.bind("%s:%i"%(iface, hport))
hrep = ctx.socket(zmq.XREP)
hrep.bind("%s:%i"%(iface, hport+1))
hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500)
hb.start()
### Client connections ###
# Clientele socket
c = ZMQStream(ctx.socket(zmq.XREP), loop)
c.bind("%s:%i"%(iface, cport))
n = ZMQStream(ctx.socket(zmq.PUB), loop)
n.bind("%s:%i"%(iface, nport))
thesession = session.StreamSession(username="controller")
# build and launch the queue
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, "")
monport = sub.bind_to_random_port(iface)
sub = ZMQStream(sub, loop)
# Multiplexer Queue (in a Process)
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
q.bind_in("%s:%i"%(iface, cqport))
q.bind_out("%s:%i"%(iface, eqport))
q.connect_mon("%s:%i"%(iface, monport))
q.daemon=True
q.start()
# Control Queue (in a Process)
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
q.bind_in("%s:%i"%(iface, ccport))
q.bind_out("%s:%i"%(iface, ecport))
q.connect_mon("%s:%i"%(iface, monport))
q.daemon=True
q.start()
# Task Queue (in a Process)
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
q.bind_in("%s:%i"%(iface, ctport))
q.bind_out("%s:%i"%(iface, etport))
q.connect_mon("%s:%i"%(iface, monport))
q.daemon=True
q.start()
time.sleep(.25)
# build connection dicts
engine_addrs = {
'control' : "%s:%i"%(iface, ecport),
'queue': "%s:%i"%(iface, eqport),
'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)),
'task' : "%s:%i"%(iface, etport),
'monitor' : "%s:%i"%(iface, monport),
}
client_addrs = {
'control' : "%s:%i"%(iface, ccport),
'query': "%s:%i"%(iface, cport),
'queue': "%s:%i"%(iface, cqport),
'task' : "%s:%i"%(iface, ctport),
'notification': "%s:%i"%(iface, nport)
}
con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs)
return loop
if __name__ == '__main__':
loop = setup()
loop.start()