/
supervisors.py
216 lines (167 loc) · 6.65 KB
/
supervisors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
"""
Services to ensure that all instances and load balancers are running and
discoverable.
"""
import json
import xmlrpclib
from twisted.web import xmlrpc, server
from twisted.internet import reactor, defer, task
from twisted.internet.protocol import Factory
from twisted.protocols.portforward import ProxyFactory
from treq import get, post
from stretch import utils#, models
from stretch.agent import objects
LB_SUPERVISOR_PORT = 24226
ENDPOINT_SUPERVISOR_PORT = 24227
class TCPLoadBalancerFactory(Factory):
endpoints = []
def buildProtocol(self, addr):
try:
endpoint, factory = self.endpoints.pop(0)[1]
except IndexError:
factory = ProxyFactory(None, None)
else:
self.endpoints.append((endpoint, factory))
return factory.buildProtocol(addr)
def add_endpoint(self, host, port):
endpoint = tuple(host, port)
if endpoint not in dict(self.endpoints):
self.endpoints.append((endpoint, ProxyFactory(host, port)))
else:
raise ObjectExists('endpoint already exists in load balancer')
def remove_endpoint(self, endpoint):
endpoint = tuple(endpoint)
try:
self.endpoints.remove(endpoint)
except ValueError:
raise ObjectDoesNotExist('endpoint does not exist in load '
'balancer')
class TCPLoadBalancerServer(xmlrpc.XMLRPC):
load_balancers = {}
def xmlrpc_start_lb(self, lb_id):
if lb_id in self.load_balancers:
raise LoadBalancerException('load balancer with id "%s" is '
'already running' % lb_id)
factory = TCPLoadBalancerFactory()
port = reactor.listenTCP(0, factory)
self.load_balancers[lb_id] = dict(port=port, factory=factory)
return port.getHost().port
def xmlrpc_add_endpoint(self, lb_id, host, port):
self._get_lb(lb_id)['factory'].add_endpoint(host, port)
return True
def xmlrpc_remove_endpoint(self, lb_id, host, port):
self._get_lb(lb_id)['factory'].remove_endpoint(host, port)
return True
def xmlrpc_stop_lb(self, lb_id):
lb = self._get_lb(lb_id)
defer.maybeDeferred(lb['port'].stopListening)
return True
def _get_lb(self, lb_id):
try:
return self.load_balancers[lb_id]
except KeyError:
raise ObjectDoesNotExist('load balancer with id "%s" does not '
'exist' % lb_id)
class LoadBalancerException(Exception):
pass
class ObjectDoesNotExist(LoadBalancerException):
pass
class ObjectExists(LoadBalancerException):
pass
def run_lb_supervisor():
# Set new persistent endpoints
lb_server = TCPLoadBalancerServer()
for lb in models.LoadBalancer.all():
host, port = '127.0.0.1', lb_server.xmlrpc_start_lb(lb.pk)
# TODO: set lb endpoint in etcd
reactor.listenTCP(LB_SUPERVISOR_PORT, server.Site(lb_server))
reactor.run()
@utils.memoized
def lb_supervisor_client():
return xmlrpclib.ServerProxy('http://127.0.0.1:%s/' % LB_SUPERVISOR_PORT)
class EndpointSupervisor(xmlrpc.XMLRPC):
groups = []
blocked_instances = []
def __init__(self, groups):
xmlrpc.XMLRPC.__init__(self)
[self.xmlrpc_add_group(group.pk, group.config_key) for group in groups]
def xmlrpc_add_group(self, group_id, config_key):
if group_id not in self.groups:
self.groups.append(group_id)
self.watch(group_id, config_key)
return True
def xmlrpc_remove_group(self, group_id):
self.groups.remove(group_id)
return True
def xmlrpc_block_instance(self, instance_id):
if instance_id not in self.blocked_instances:
self.blocked_instances.append(instance_id)
return True
def xmlrpc_unblock_instance(self, instance_id):
if instance_id in self.blocked_instances:
self.blocked_instances.remove(instance_id)
return True
def watch(self, group_id, config_key, index=None):
def handle_response(response):
response.addCallback(key_changed)
def key_changed(result):
if group_id in self.groups:
key = result['key'].lstrip(config_key)
if key != 'lb' and key not in self.blocked_instances:
if result.get('newKey'):
# add endpoint
endpoint = json.loads(result['value'])
self.add_endpoint(group_id, endpoint)
elif result['action'] == 'DELETE':
# remove endpoint
endpoint = json.loads(result['prevValue'])
self.remove_endpoint(group_id, endpoint)
self.watch(group_id, config_key, result['index'])
url = 'http://127.0.0.1:4001/v1/watch%s' % config_key
if index:
deferred = post(url, data={'index': index})
else:
deferred = get(url)
deferred.addCallback(handle_response)
return True
def add_endpoint(group_id, endpoint):
group = models.Group.objects.get(pk=group_id)
group.load_balancer.add_endpoint(endpoint)
def remove_endpoint(group_id, endpoint):
group = models.Group.objects.get(pk=group_id)
group.load_balancer.remove_endpoint(endpoint)
def run_endpoint_supervisor():
"""
Listens for changes over etcd and adds/removes endpoints where necessary
# Gets populated with all loadbalancers from all services and listens for
# create/remove hooks,
"""
# Load all groups that use a load balancer
# TODO: that uses a Docker backend
groups = [lb.group for lb in models.LoadBalancer.objects.all()]
reactor.listenTCP(ENDPOINT_SUPERVISOR_PORT,
server.Site(EndpointSupervisor(groups)))
reactor.run()
@utils.memoized
def endpoint_supervisor_client():
return xmlrpclib.ServerProxy('http://127.0.0.1:%s/' %
ENDPOINT_SUPERVISOR_PORT)
def check_instances():
cids = utils.run_cmd(['docker', 'ps', '-q'])[0].splitlines()
for instance in objects.Instance.get_instances():
cid = instance.data['cid']
if cid in cids:
# Instance is running
# Set endpoint key
instance.set_endpoint()
else:
# Instance is down
instance.data['cid'] = None
instance.data['endpoint'] = None
instance.save()
# Log the event and start the instance
instance.start()
def run_instance_supervisor():
t = task.LoopingCall(check_instances)
t.start(10.0)
reactor.run()