This repository has been archived by the owner on Jun 26, 2020. It is now read-only.
/
control_bus.py
173 lines (144 loc) · 6.39 KB
/
control_bus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# Copyright (c) 2016 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import functools
import json
import time
import eventlet
from oslo_log import log as logging
from congress.dse2 import data_service
LOG = logging.getLogger()
def drop_cast_echos(wrapped):
@functools.wraps(wrapped)
def wrapper(rpc_endpoint, message_context, *args, **kwargs):
node = rpc_endpoint.dse_bus.node
if message_context['node_id'] == node.node_id:
LOG.trace("<%s> Ignoring my echo", node.node_id)
return
return wrapped(rpc_endpoint, message_context, *args, **kwargs)
return wrapper
class HeartbeatEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
# Let the base class default method handle all other cases
return json.JSONEncoder.default(self, obj)
class _DseControlBusEndpoint(object):
def __init__(self, dse_bus):
self.dse_bus = dse_bus
@drop_cast_echos
def accept_heartbeat(self, client_ctxt, args):
LOG.debug("<%s> Accepted heartbeat: context=%s, args='%s'",
self.dse_bus.node.node_id, client_ctxt, args)
hb = json.loads(args)
# convert dict to set
for target in hb['subscribed_tables']:
hb['subscribed_tables'][target] = set(
hb['subscribed_tables'][target])
peer_id = client_ctxt['node_id']
new_status = {
'node_id': peer_id,
'instance': client_ctxt['instance'],
'services': hb['services'],
'subscribed_tables': hb['subscribed_tables']
}
old_status = self.dse_bus.peers.get(peer_id)
if old_status:
# TODO(pballand): validate instance, services
LOG.trace("<%s> Refreshed peer '%s' with services %s",
self.dse_bus.node.node_id, peer_id,
[s['service_id'] for s in new_status['services']])
else:
LOG.info("<%s> New peer '%s' with services %s",
self.dse_bus.node.node_id, peer_id,
[s['service_id'] for s in new_status['services']])
self.dse_bus.peers[peer_id] = new_status
# TODO(pballand): handle time going backwards
self.dse_bus.peers[peer_id]['last_hb_time'] = time.time()
# Note(thread-safety): blocking function
@drop_cast_echos
def list_services(self, client_ctxt):
LOG.debug("<%s> Peer '%s' requested updated service list",
self.dse_bus.node.node_id, client_ctxt['node_id'])
# Note(thread-safety): blocking call
self.dse_bus._publish_heartbeat()
class DseNodeControlBus(data_service.DataService):
"""Maintain DSE connection for a DseNode.
The DSE maintains a common directory of data services and their
corresponding exported tables and RPCs. This control bus maintains
this view using oslo.messaging RPC primitives.
"""
HEARTBEAT_INTERVAL = 1
def __init__(self, node):
self.node = node
self.control_bus_ep = _DseControlBusEndpoint(self)
self.peers = {}
super(DseNodeControlBus, self).__init__('_control_bus')
def rpc_endpoints(self):
return [self.control_bus_ep]
# Note(thread-safety): blocking function
def _publish_heartbeat(self):
args = json.dumps(
{'services': [s.info.to_dict()
for s in self.node.get_services(True)],
# FIXME(ekcs): suppress subscriber details for each subscribed
# table to avoid unnecessary network traffic. Only binary
# information needed over HB.
'subscribed_tables': self.node.subscriptions},
cls=HeartbeatEncoder)
# Note(thread-safety): blocking call
self.node.broadcast_service_rpc(self.service_id, 'accept_heartbeat',
{'args': args})
def _call_heartbeat_callbacks(self):
for service in self.node.get_services():
heartbeat_callbacks = service.heartbeat_callbacks.values()
for f in heartbeat_callbacks:
if not service._running:
break
# Note(thread-safety): potentially blocking call
f()
# Note(thread-safety): blocking function
def _heartbeat_loop(self):
while self._running:
self._publish_heartbeat()
self.node._update_tables_with_subscriber()
self._call_heartbeat_callbacks()
eventlet.sleep(self.HEARTBEAT_INTERVAL)
# Note(thread-safety): blocking function
def _refresh_peers(self):
# Request immediate status refresh from peers
LOG.debug("<%s> Requesting service list from all peers",
self.node.node_id)
self.node.broadcast_service_rpc(self.service_id, 'list_services')
# Note(thread-safety): blocking function
def start(self):
if self._running:
LOG.debug('control bus on %s already started.' % self.node.node_id)
return
LOG.debug("<%s> Starting DSE control bus", self.node.node_id)
super(DseNodeControlBus, self).start()
# TODO(pballand): ensure I am not currently running
# Add an instance UUID to the node status, have timeout on nodes
self._refresh_peers()
# TODO(pballand): before enabling self, check if my node ID is
# already present (no consensus service, so use timeout heuristic)
self._heartbeat_thread = eventlet.spawn(self._heartbeat_loop)
def stop(self):
LOG.debug("<%s> Stopping DSE control bus", self.node.node_id)
super(DseNodeControlBus, self).stop()
eventlet.greenthread.kill(self._heartbeat_thread)
def dse_status(self):
"""Return latest observation of DSE status."""
# TODO(pballand): include node status [JOINING, JOINED]
return {'peers': self.peers}