This repository has been archived by the owner on Jul 21, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 53
/
topology.py
317 lines (271 loc) · 12.6 KB
/
topology.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from neutron_lib import constants as n_const
from oslo_log import log
from dragonflow.common import constants
from dragonflow.db import db_store
from dragonflow.db.models import l2
from dragonflow.db.models import migration
from dragonflow.db.models import switch
LOG = log.getLogger(__name__)
# This tuple is used as the key for ovs_to_lport_mapping, which maps an OVS
# port to its Logical Port. lport_id is the Logical Port's ID. topic is the
# tenant (or project).
OvsLportMapping = collections.namedtuple('OvsLportMapping',
('lport_id', 'topic'))
_SWITCH_PORT_TYPES = (constants.SWITCH_COMPUTE_INTERFACE,
constants.SWITCH_TUNNEL_INTERFACE)
class Topology(object):
def __init__(self, controller, enable_selective_topology_distribution):
# Stores topics(tenants) subscribed by lports in the current local
# controller. I,e, {tenant1:{lport1, lport2}, tenant2:{lport3}}
self.topic_subscribed = {}
self.enable_selective_topo_dist = \
enable_selective_topology_distribution
self.ovs_to_lport_mapping = {}
self.controller = controller
self.nb_api = controller.get_nb_api()
self.chassis_name = controller.get_chassis_name()
self.db_store = db_store.get_instance()
# TODO(snapiri) this should not be ovs specific
switch.SwitchPort.register_created(self.switch_port_updated)
switch.SwitchPort.register_updated(self.switch_port_updated)
switch.SwitchPort.register_deleted(self.switch_port_deleted)
def switch_port_updated(self, switch_port, orig_switch_port=None):
"""
Changes in ovs port status will be monitored by ovsdb monitor thread
and notified to topology. This method is the entry port to process
port online/update event
@param switch_port:
@return : None
"""
LOG.info("Ovs port updated: %r", switch_port)
if switch_port.type not in _SWITCH_PORT_TYPES:
LOG.info("Unmanaged port online: %r", switch_port)
return
try:
if orig_switch_port is None:
self._handle_switch_port_added(switch_port)
else:
self._handle_switch_port_updated(switch_port)
except Exception:
LOG.exception(
"Exception occurred when handling port online event")
def _handle_switch_port_added(self, switch_port):
port_type = switch_port.type
if port_type == constants.SWITCH_COMPUTE_INTERFACE:
self._compute_port_added(switch_port)
elif port_type == constants.SWITCH_TUNNEL_INTERFACE:
self._tunnel_port_added(switch_port)
else:
LOG.warning('Invalid port type on %r', switch_port)
def _handle_switch_port_updated(self, switch_port):
port_type = switch_port.type
if port_type == constants.SWITCH_COMPUTE_INTERFACE:
self._compute_port_updated(switch_port)
elif port_type == constants.SWITCH_TUNNEL_INTERFACE:
self._tunnel_port_updated(switch_port)
else:
LOG.warning('Invalid port type on %r', switch_port)
def switch_port_deleted(self, switch_port):
"""
Changes in ovs port status will be monitored by ovsdb monitor thread
and notified to topology. This method is the entrance port to process
port offline event
@param switch_port:
@return : None
"""
if switch_port.type not in _SWITCH_PORT_TYPES:
LOG.info("Unmanaged port offline: %r", switch_port)
return
try:
self._handle_switch_port_deleted(switch_port)
except Exception:
LOG.exception("Exception occurred when handling "
"ovs port offline event")
def _handle_switch_port_deleted(self, switch_port):
port_type = switch_port.type
if port_type == constants.SWITCH_COMPUTE_INTERFACE:
self._compute_port_deleted(switch_port)
elif port_type == constants.SWITCH_TUNNEL_INTERFACE:
self._tunnel_port_deleted(switch_port)
else:
LOG.warning('Invalid port type on %r', switch_port)
def _tunnel_port_added(self, switch_port):
self._tunnel_port_updated(switch_port)
def _process_ovs_tunnel_port(self, switch_port, action):
tunnel_type = switch_port.tunnel_type
if not tunnel_type:
return
lswitches = self.db_store.get_all(
l2.LogicalSwitch(network_type=tunnel_type),
l2.LogicalSwitch.get_index('network_type'))
for lswitch in lswitches:
index = l2.LogicalPort.get_index('lswitch_id')
lports = self.db_store.get_all(l2.LogicalPort(lswitch=lswitch),
index=index)
for lport in lports:
if lport.is_local:
continue
# Update of virtual tunnel port should update remote port in
# the lswitch of same type.
try:
if action == "set":
self.controller.update(lport)
else:
self.controller.delete(lport)
except Exception:
LOG.exception("Failed to process logical port"
"when %(action)s tunnel %(lport)s",
{'action': action, 'lport': lport})
def _tunnel_port_updated(self, switch_port):
self._process_ovs_tunnel_port(switch_port, "set")
def _tunnel_port_deleted(self, switch_port):
self._process_ovs_tunnel_port(switch_port, "delete")
def _compute_port_added(self, switch_port):
self._compute_port_updated(switch_port)
self.controller.notify_port_status(
switch_port, n_const.PORT_STATUS_ACTIVE)
def _compute_port_updated(self, switch_port):
lport = self._get_lport(switch_port)
if lport is None:
LOG.warning("No logical port found for ovs port: %r",
switch_port)
return
topic = lport.topic
if not topic:
return
self._add_to_topic_subscribed(topic, lport.id)
self.ovs_to_lport_mapping[switch_port.id] = OvsLportMapping(
lport_id=lport.id, topic=topic)
chassis = lport.binding.chassis
# check if migration occurs
if chassis.id != self.chassis_name:
device_owner = lport.device_owner
if n_const.DEVICE_OWNER_COMPUTE_PREFIX in device_owner:
LOG.info("Prepare migrate lport %(lport)s to %(chassis)s",
{"lport": lport.id, "chassis": chassis})
self.nb_api.create(migration.Migration(
id=lport.id, dest_chassis=self.chassis_name,
status=migration.MIGRATION_STATUS_DEST_PLUG))
return
cached_lport = switch_port.lport.get_object()
if not cached_lport:
# If the logical port is not in db store it has not been applied
# to dragonflow apps. We need to update it in dragonflow controller
LOG.info("A local logical port(%s) is online", lport)
try:
self.controller.update(lport)
except Exception:
LOG.exception('Failed to process logical port online '
'event: %s', lport)
def _compute_port_deleted(self, switch_port):
switch_port_id = switch_port.id
lport_ref = switch_port.lport
lport = lport_ref.get_object()
if lport is None:
lport_mapping = self.ovs_to_lport_mapping.get(switch_port_id)
if lport_mapping is None:
return
topic = lport_mapping.topic
del self.ovs_to_lport_mapping[switch_port_id]
self._del_from_topic_subscribed(topic, lport_mapping.lport_id)
return
topic = lport.topic
LOG.info("The logical port(%s) is offline", lport)
try:
self.controller.delete(lport)
except Exception:
LOG.exception('Failed to process logical port offline event %s',
lport_ref.id)
finally:
self.controller.notify_port_status(
switch_port, n_const.PORT_STATUS_DOWN)
migration_obj = self.nb_api.get(
migration.Migration(id=lport_ref.id))
if migration_obj and migration_obj.chassis:
LOG.info("Sending migrating event for %s", lport_ref.id)
migration_obj.lport = lport_ref
migration_obj.status = migration.MIGRATION_STATUS_SRC_UNPLUG
self.nb_api.update(migration_obj)
del self.ovs_to_lport_mapping[switch_port_id]
self._del_from_topic_subscribed(topic, lport_ref.id)
def _add_to_topic_subscribed(self, topic, lport_id):
if not self.enable_selective_topo_dist or not topic:
return
if topic not in self.topic_subscribed:
LOG.info("Subscribe topic: %(topic)s by lport: %(id)s",
{"topic": topic, "id": lport_id})
self.controller.register_topic(topic)
self.topic_subscribed[topic] = set([lport_id])
else:
self.topic_subscribed[topic].add(lport_id)
def _del_from_topic_subscribed(self, topic, lport_id):
if not self.enable_selective_topo_dist or not topic:
return
port_ids = self.topic_subscribed[topic]
port_ids.remove(lport_id)
if len(port_ids) == 0:
LOG.info("Unsubscribe topic: %(topic)s by lport: %(id)s",
{"topic": topic, "id": lport_id})
del self.topic_subscribed[topic]
self.controller.unregister_topic(topic)
def get_subscribed_topics(self):
if not self.enable_selective_topo_dist:
# Just return None when enable_selective_topo_dist is False
return
# Return the actual topics that are subscribed. It could be empty
# set, which represents no topic is subscribed now.
return set(self.topic_subscribed)
def _get_lport(self, switch_port):
if not switch_port.lport:
return None
lport = switch_port.lport.get_object()
if lport is None:
lport = self.nb_api.get(switch_port.lport)
return lport
def check_topology_info(self):
"""
In order to prevent the situation that the connection between
df controller and df db break down, we should recheck the local
ovs ports to make sure all the topics of these ovs ports could
be subscribed and all the vms could work well.
"""
new_ovs_to_lport_mapping = {}
add_ovs_to_lport_mapping = {}
delete_ovs_to_lport_mapping = self.ovs_to_lport_mapping
for switch_port in self.db_store.get_all(switch.SwitchPort):
key = switch_port.id
if switch_port.type == constants.SWITCH_COMPUTE_INTERFACE:
lport = self._get_lport(switch_port)
if lport is None:
LOG.warning("No logical port found for ovs port: %r",
switch_port)
continue
topic = lport.topic
if not topic:
continue
new_ovs_to_lport_mapping[key] = OvsLportMapping(
lport_id=lport.id, topic=topic)
if not delete_ovs_to_lport_mapping.pop(key, None):
add_ovs_to_lport_mapping[key] = OvsLportMapping(
lport_id=lport.id, topic=topic)
self.ovs_to_lport_mapping = new_ovs_to_lport_mapping
for value in add_ovs_to_lport_mapping.values():
lport_id = value.lport_id
topic = value.topic
self._add_to_topic_subscribed(topic, lport_id)
for value in delete_ovs_to_lport_mapping.values():
lport_id = value.lport_id
topic = value.topic
self._del_from_topic_subscribed(topic, lport_id)