This repository has been archived by the owner on Jul 21, 2020. It is now read-only.
/
pub_sub_api.py
360 lines (285 loc) · 10.6 KB
/
pub_sub_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import socket
import threading
import time
import uuid
import msgpack
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_service import loopingcall
import six
from dragonflow.common import exceptions
from dragonflow.common import utils as df_utils
from dragonflow.db import db_common
from dragonflow.db.models import core
LOG = logging.getLogger(__name__)
MONITOR_TABLES = [core.Chassis.table_name, core.Publisher.table_name]
def pack_message(message):
data = None
try:
data = msgpack.packb(message, encoding='utf-8')
except Exception:
LOG.exception("Error in pack_message: ")
return data
def unpack_message(message):
entry = None
try:
entry = msgpack.unpackb(message, encoding='utf-8')
except Exception:
LOG.exception("Error in unpack_message: ")
return entry
def generate_publisher_uuid():
"""
Generate a non-random uuid based on the fully qualified domain name.
This UUID is supposed to remain the same across service restarts.
"""
fqdn = socket.getfqdn()
process_name = df_utils.get_process_name()
return str(uuid.uuid5(uuid.NAMESPACE_DNS,
"{0}.{1}".format(process_name, fqdn)))
@six.add_metaclass(abc.ABCMeta)
class PubSubApi(object):
"""
API class to get the publisher and subscriber in the controller and neutron
plugin.
"""
@abc.abstractmethod
def get_publisher(self):
"""Return a Publisher Driver Object
:returns: an PublisherApi Object
"""
@abc.abstractmethod
def get_subscriber(self):
"""Return a Subscriber Driver Object
:returns: an PublisherApi Object. My return None if is_local is true,
and local and non-local publishers are the same.
"""
@six.add_metaclass(abc.ABCMeta)
class PublisherApi(object):
@abc.abstractmethod
def initialize(self):
"""Initialize the DB client
:param endpoint: ip:port
:type endpoint: string
:param trasport_proto: protocol to use tcp:epgm ...
:type trasport_proto: string
:param args: Additional args
:type args: dictionary of <string, object>
:returns: None
"""
@abc.abstractmethod
def _send_event(self, data, topic):
"""Publish data to a topic
:param data: Stream of data to publish
:type data: bytes
:param topic: topic to send event to
:type topic: bytes
:returns: None
"""
@abc.abstractmethod
def close(self):
"""Close the publisher. Release all used resources"""
def set_publisher_for_failover(self, pub, callback):
pass
def start_detect_for_failover(self):
pass
def process_ha(self):
pass
class PublisherAgentBase(PublisherApi):
def send_event(self, update, topic=None):
"""Publish the update
:param update: Encapsulates a Publisher update
:type update: DbUpdate object
:param topic: topic to send event to
:type topic: string
:returns: None
"""
if topic is None:
topic = update.topic or db_common.SEND_ALL_TOPIC
topic = topic.encode('utf8', 'ignore')
LOG.debug("Sending %s to %s", update, topic)
data = pack_message(update.to_dict())
self._send_event(data, topic)
@six.add_metaclass(abc.ABCMeta)
class SubscriberApi(object):
@abc.abstractmethod
def initialize(self, callback):
"""Initialize the DB client
:param callback: callback method to call for every db change
:type callback: callback method of type:
callback(table, key, action, value, topic)
table - table name
key - object key
action = 'create' / 'set' / 'delete' / 'sync'
value = new object value
topic - the topic with which the event was received
:param args: Additional args
:type args: dictionary of <string, object>
:returns: None
"""
@abc.abstractmethod
def register_listen_address(self, uri):
"""Will register publisher address to listen on
NOTE Must be called prior to calling daemonize
:param uri: uri to connect to
:type uri: string '<protocol>:address:port;....'
:returns: Boolean True if new
"""
@abc.abstractmethod
def unregister_listen_address(self, uri):
"""Will unregister publisher address to listen on
NOTE Must be called prior to calling daemonize
:param uri: uri to connect to
:type uri: string '<protocol>:address:port;....'
"""
@abc.abstractmethod
def daemonize(self):
"""Start the Subscriber thread
"""
@property
@abc.abstractmethod
def is_running(self):
"""Returns True if the subscriber is running, False otherwise"""
@abc.abstractmethod
def close(self):
"""Close the subscriber. Release all used resources"""
@abc.abstractmethod
def register_topic(self, topic):
"""Add a topic to the subscriber listening list
:param topic: topic to listen to
:type topic: string
:returns: Boolean True if new
"""
@abc.abstractmethod
def unregister_topic(self, topic):
"""Remove a topic to the subscriber listening list
:param topic: topic to remove
:type topic: string
"""
class SubscriberAgentBase(SubscriberApi):
def __init__(self):
super(SubscriberAgentBase, self).__init__()
self.db_changes_callback = None
self.daemon = None
self.topic_list = []
self.uri_list = []
def initialize(self, callback):
self.db_changes_callback = callback
self.daemon = threading.Thread(target=self.run)
self.daemon.daemon = True
def register_listen_address(self, uri):
if uri not in self.uri_list:
self.uri_list.append(uri)
return True
return False
def unregister_listen_address(self, topic):
self.uri_list.remove(topic)
def daemonize(self):
self.daemon.start()
@property
def is_running(self):
return self.daemon and self.daemon.is_alive()
def register_topic(self, topic):
LOG.info('Register topic %s', topic)
if topic not in self.topic_list:
self.topic_list.append(topic)
return True
return False
def unregister_topic(self, topic):
LOG.info('Unregister topic %s', topic)
self.topic_list.remove(topic)
def set_subscriber_for_failover(self, sub, callback):
pass
def register_hamsg_for_db(self):
pass
def process_ha(self):
pass
def _handle_incoming_event(self, data):
message = unpack_message(data)
self.db_changes_callback(
message['table'],
message['key'],
message['action'],
message['value'],
message['topic'],
)
class TableMonitor(object):
def __init__(self, table_name, driver, publisher, polling_time=10):
self._driver = driver
self._publisher = publisher
self._polling_time = polling_time
self._loopingcall = loopingcall.FixedIntervalLoopingCall(self.run)
self._table_name = table_name
self._cache = {}
def daemonize(self):
return self._loopingcall.start(self._polling_time,
initial_delay=self._polling_time)
def stop(self):
return self._loopingcall.stop()
def run(self):
self._cache = self._poll_once(self._cache)
def _poll_once(self, old_cache):
"""Create a new cache and send events for changes from the old cache"""
new_cache = {}
for entry_key in self._driver.get_all_keys(self._table_name):
entry_value = self._driver.get_key(
self._table_name,
entry_key)
if entry_value is None:
continue
old_value = old_cache.pop(entry_key, None)
if old_value is None:
self._send_event('create', entry_key, entry_value)
elif old_value != entry_value:
self._send_event('set', entry_key, entry_value)
new_cache[entry_key] = entry_value
for entry_key in old_cache:
self._send_event('delete', entry_key, None)
return new_cache
def _send_event(self, action, entry_id, entry_value):
db_update = db_common.DbUpdate(
self._table_name,
entry_id,
action,
entry_value,
)
self._publisher.send_event(db_update)
class StalePublisherMonitor(TableMonitor):
def __init__(self, driver, publisher, timeout, polling_time=10):
super(StalePublisherMonitor, self).__init__(
core.Publisher.table_name,
driver,
publisher,
polling_time
)
self._timeout = timeout
self._uuid = generate_publisher_uuid()
def _poll_once(self, old_cache):
"""Scan for stale entries of other publishers"""
for entry_key in self._driver.get_all_keys(self._table_name):
publisher_json = self._driver.get_key(self._table_name, entry_key)
if publisher_json is None:
continue
publisher = jsonutils.loads(publisher_json)
if publisher['id'] == self._uuid:
continue
last_activity_timestamp = publisher['last_activity_timestamp']
if last_activity_timestamp < time.time() - self._timeout:
LOG.info('Removing publisher %s', publisher_json)
try:
self._driver.delete_key(self._table_name, entry_key)
except exceptions.DBKeyNotFound:
# Publisher already deleted. Ignore.
pass
return super(StalePublisherMonitor, self)._poll_once(old_cache)