/
rpc.py
490 lines (447 loc) · 22.4 KB
/
rpc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.agent import topics
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import uplink_status_propagation as usp
from neutron_lib.callbacks import resources
from neutron_lib import constants as n_const
from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
from neutron_lib import rpc as n_rpc
from neutron_lib.services.qos import constants as qos_consts
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from osprofiler import profiler
from sqlalchemy.orm import exc
from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.db import l3_hamode_db
from neutron.db import provisioning_blocks
from neutron.plugins.ml2 import db as ml2_db
from neutron.plugins.ml2.drivers import type_tunnel
# REVISIT(kmestery): Allow the type and mechanism drivers to supply the
# mixins and eventually remove the direct dependencies on type_tunnel.
LOG = log.getLogger(__name__)
class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
# history
# 1.0 Initial version (from openvswitch/linuxbridge)
# 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list
# 1.3 get_device_details rpc signature upgrade to obtain 'host' and
# return value to include fixed_ips and device_owner for
# the device port
# 1.4 tunnel_sync rpc signature upgrade to obtain 'host'
# 1.5 Support update_device_list and
# get_devices_details_list_and_failed_devices
# 1.6 Support get_network_details
# 1.7 Support get_ports_by_vnic_type_and_host
# 1.8 Rename agent_restarted to refresh_tunnels in
# update_device_list to reflect its expanded purpose
target = oslo_messaging.Target(version='1.8')
def __init__(self, notifier, type_manager):
self.setup_tunnel_callback_mixin(notifier, type_manager)
super(RpcCallbacks, self).__init__()
def _get_new_status(self, host, port_context):
port = port_context.current
if not host or host == port_context.host:
new_status = (n_const.PORT_STATUS_BUILD if port['admin_state_up']
else n_const.PORT_STATUS_DOWN)
if port['status'] != new_status:
return new_status
@staticmethod
def _get_request_details(kwargs):
return (kwargs.get('agent_id'),
kwargs.get('host'),
kwargs.get('device') or kwargs.get('network'))
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id, host, device = self._get_request_details(kwargs)
# cached networks used for reducing number of network db calls
# for server internal usage only
cached_networks = kwargs.get('cached_networks')
LOG.debug("Device %(device)s details requested by agent "
"%(agent_id)s with host %(host)s",
{'device': device, 'agent_id': agent_id, 'host': host})
plugin = directory.get_plugin()
port_id = plugin._device_to_port_id(rpc_context, device)
port_context = plugin.get_bound_port_context(rpc_context,
port_id,
host,
cached_networks)
if not port_context:
LOG.debug("Device %(device)s requested by agent "
"%(agent_id)s not found in database",
{'device': device, 'agent_id': agent_id})
return {'device': device}
port = port_context.current
# caching information about networks for future use
if cached_networks is not None:
if port['network_id'] not in cached_networks:
cached_networks[port['network_id']] = (
port_context.network.current)
result = self._get_device_details(rpc_context, agent_id=agent_id,
host=host, device=device,
port_context=port_context)
if 'network_id' in result:
# success so we update status
new_status = self._get_new_status(host, port_context)
if new_status:
plugin.update_port_status(rpc_context,
port_id,
new_status,
host,
port_context.network.current)
return result
def _get_device_details(self, rpc_context, agent_id, host, device,
port_context):
segment = port_context.bottom_bound_segment
port = port_context.current
if not segment:
LOG.warning("Device %(device)s requested by agent "
"%(agent_id)s on network %(network_id)s not "
"bound, vif_type: %(vif_type)s",
{'device': device,
'agent_id': agent_id,
'network_id': port['network_id'],
'vif_type': port_context.vif_type})
return {'device': device}
if (port['device_owner'].startswith(
n_const.DEVICE_OWNER_COMPUTE_PREFIX) and
port[portbindings.HOST_ID] != host):
LOG.debug("Device %(device)s has no active binding in host "
"%(host)s", {'device': device,
'host': host})
return {'device': device,
n_const.NO_ACTIVE_BINDING: True}
network_qos_policy_id = port_context.network._network.get(
qos_consts.QOS_POLICY_ID)
entry = {'device': device,
'network_id': port['network_id'],
'port_id': port['id'],
'mac_address': port['mac_address'],
'admin_state_up': port['admin_state_up'],
'network_type': segment[api.NETWORK_TYPE],
'segmentation_id': segment[api.SEGMENTATION_ID],
'physical_network': segment[api.PHYSICAL_NETWORK],
'mtu': port_context.network._network.get('mtu'),
'fixed_ips': port['fixed_ips'],
'device_owner': port['device_owner'],
'allowed_address_pairs': port['allowed_address_pairs'],
'port_security_enabled': port.get(psec.PORTSECURITY, True),
'qos_policy_id': port.get(qos_consts.QOS_POLICY_ID),
'network_qos_policy_id': network_qos_policy_id,
'profile': port[portbindings.PROFILE],
'propagate_uplink_status': port.get(
usp.PROPAGATE_UPLINK_STATUS, False)}
LOG.debug("Returning: %s", entry)
return entry
def get_devices_details_list(self, rpc_context, **kwargs):
# cached networks used for reducing number of network db calls
cached_networks = {}
return [
self.get_device_details(
rpc_context,
device=device,
cached_networks=cached_networks,
**kwargs
)
for device in kwargs.pop('devices', [])
]
def get_devices_details_list_and_failed_devices(self,
rpc_context,
**kwargs):
devices = []
failed_devices = []
devices_to_fetch = kwargs.pop('devices', [])
plugin = directory.get_plugin()
host = kwargs.get('host')
bound_contexts = plugin.get_bound_ports_contexts(rpc_context,
devices_to_fetch,
host)
for device in devices_to_fetch:
if not bound_contexts.get(device):
# unbound bound
LOG.debug("Device %(device)s requested by agent "
"%(agent_id)s not found in database",
{'device': device,
'agent_id': kwargs.get('agent_id')})
devices.append({'device': device})
continue
try:
devices.append(self._get_device_details(
rpc_context,
agent_id=kwargs.get('agent_id'),
host=host,
device=device,
port_context=bound_contexts[device]))
except Exception:
LOG.exception("Failed to get details for device %s",
device)
failed_devices.append(device)
new_status_map = {ctxt.current['id']: self._get_new_status(host, ctxt)
for ctxt in bound_contexts.values() if ctxt}
# filter out any without status changes
new_status_map = {p: s for p, s in new_status_map.items() if s}
try:
plugin.update_port_statuses(rpc_context, new_status_map, host)
except Exception:
LOG.exception("Failure updating statuses, retrying all")
failed_devices = devices_to_fetch
devices = []
return {'devices': devices,
'failed_devices': failed_devices}
def get_network_details(self, rpc_context, **kwargs):
"""Agent requests network details."""
agent_id, host, network = self._get_request_details(kwargs)
LOG.debug("Network %(network)s details requested by agent "
"%(agent_id)s with host %(host)s",
{'network': network, 'agent_id': agent_id, 'host': host})
plugin = directory.get_plugin()
return plugin.get_network(rpc_context, network)
@profiler.trace("rpc")
def update_device_down(self, rpc_context, **kwargs):
"""Device no longer exists on agent."""
# TODO(garyk) - live migration and port status
agent_id, host, device = self._get_request_details(kwargs)
LOG.debug("Device %(device)s no longer exists at agent "
"%(agent_id)s",
{'device': device, 'agent_id': agent_id})
plugin = directory.get_plugin()
port_id = plugin._device_to_port_id(rpc_context, device)
port_exists = True
if (host and not plugin.port_bound_to_host(rpc_context,
port_id, host)):
LOG.debug("Device %(device)s not bound to the"
" agent host %(host)s",
{'device': device, 'host': host})
else:
try:
port_exists = bool(plugin.update_port_status(
rpc_context, port_id, n_const.PORT_STATUS_DOWN, host))
except exc.StaleDataError:
port_exists = False
LOG.debug("delete_port and update_device_down are being "
"executed concurrently. Ignoring StaleDataError.")
return {'device': device,
'exists': port_exists}
self.notify_l2pop_port_wiring(port_id, rpc_context,
n_const.PORT_STATUS_DOWN, host)
return {'device': device,
'exists': port_exists}
@profiler.trace("rpc")
def update_device_up(self, rpc_context, **kwargs):
"""Device is up on agent."""
refresh_tunnels = kwargs.pop('refresh_tunnels', False)
if not refresh_tunnels:
# For backward compatibility with older agents
refresh_tunnels = kwargs.pop('agent_restarted', False)
agent_id, host, device = self._get_request_details(kwargs)
LOG.debug("Device %(device)s up at agent %(agent_id)s",
{'device': device, 'agent_id': agent_id})
plugin = directory.get_plugin()
port_id = plugin._device_to_port_id(rpc_context, device)
port = plugin.port_bound_to_host(rpc_context, port_id, host)
if host and not port:
LOG.debug("Device %(device)s not bound to the"
" agent host %(host)s",
{'device': device, 'host': host})
# this might mean that a VM is in the process of live migration
# and vif was plugged on the destination compute node;
# need to notify nova explicitly
port = ml2_db.get_port(rpc_context, port_id)
# _device_to_port_id may have returned a truncated UUID if the
# agent did not provide a full one (e.g. Linux Bridge case).
if not port:
LOG.debug("Port %s not found, will not notify nova.", port_id)
return
else:
if port.device_owner.startswith(
n_const.DEVICE_OWNER_COMPUTE_PREFIX):
# NOTE(haleyb): It is possible for a test to override a
# config option after the plugin has been initialized so
# the nova_notifier attribute is not set on the plugin.
if (cfg.CONF.notify_nova_on_port_status_changes and
hasattr(plugin, 'nova_notifier')):
plugin.nova_notifier.notify_port_active_direct(port)
return
else:
self.update_port_status_to_active(port, rpc_context, port_id, host)
self.notify_l2pop_port_wiring(port_id, rpc_context,
n_const.PORT_STATUS_ACTIVE, host,
refresh_tunnels)
def update_port_status_to_active(self, port, rpc_context, port_id, host):
plugin = directory.get_plugin()
if port and port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE:
# NOTE(kevinbenton): we have to special case DVR ports because of
# the special multi-binding status update logic they have that
# depends on the host
plugin.update_port_status(rpc_context, port_id,
n_const.PORT_STATUS_ACTIVE, host)
else:
# _device_to_port_id may have returned a truncated UUID if the
# agent did not provide a full one (e.g. Linux Bridge case). We
# need to look up the full one before calling provisioning_complete
if not port:
port = ml2_db.get_port(rpc_context, port_id)
if not port:
# port doesn't exist, no need to add a provisioning block
return
provisioning_blocks.provisioning_complete(
rpc_context, port['id'], resources.PORT,
provisioning_blocks.L2_AGENT_ENTITY)
def notify_l2pop_port_wiring(self, port_id, rpc_context,
status, host, refresh_tunnels=False):
"""Notify the L2pop driver that a port has been wired/unwired.
The L2pop driver uses this notification to broadcast forwarding
entries to other agents on the same network as the port for port_id.
"""
plugin = directory.get_plugin()
l2pop_driver = plugin.mechanism_manager.mech_drivers.get(
'l2population')
if not l2pop_driver:
return
port = ml2_db.get_port(rpc_context, port_id)
if not port:
return
port_context = plugin.get_bound_port_context(
rpc_context, port_id, host)
if not port_context:
# port deleted
return
# NOTE: DVR ports are already handled and updated through l2pop
# and so we don't need to update it again here. But, l2pop did not
# handle DVR ports while restart neutron-*-agent, we need to handle
# it here.
if (port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE and
not refresh_tunnels):
return
port = port_context.current
if (port['device_owner'] != n_const.DEVICE_OWNER_DVR_INTERFACE and
status == n_const.PORT_STATUS_ACTIVE and
port[portbindings.HOST_ID] != host and
not l3_hamode_db.is_ha_router_port(rpc_context,
port['device_owner'],
port['device_id'])):
# don't setup ACTIVE forwarding entries unless bound to this
# host or if it's an HA or DVR port (which is special-cased in
# the mech driver)
return
port_context.current['status'] = status
port_context.current[portbindings.HOST_ID] = host
if status == n_const.PORT_STATUS_ACTIVE:
l2pop_driver.obj.update_port_up(port_context, refresh_tunnels)
else:
l2pop_driver.obj.update_port_down(port_context)
@profiler.trace("rpc")
def update_device_list(self, rpc_context, **kwargs):
devices_up = []
failed_devices_up = []
devices_down = []
failed_devices_down = []
devices = kwargs.get('devices_up')
if devices:
for device in devices:
try:
self.update_device_up(
rpc_context,
device=device,
**kwargs)
except Exception:
failed_devices_up.append(device)
LOG.error("Failed to update device %s up", device)
else:
devices_up.append(device)
devices = kwargs.get('devices_down')
if devices:
for device in devices:
try:
dev = self.update_device_down(
rpc_context,
device=device,
**kwargs)
except Exception:
failed_devices_down.append(device)
LOG.error("Failed to update device %s down", device)
else:
devices_down.append(dev)
return {'devices_up': devices_up,
'failed_devices_up': failed_devices_up,
'devices_down': devices_down,
'failed_devices_down': failed_devices_down}
def get_ports_by_vnic_type_and_host(self, rpc_context, vnic_type, host):
plugin = directory.get_plugin()
return plugin.get_ports_by_vnic_type_and_host(
rpc_context, vnic_type=vnic_type, host=host)
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
sg_rpc.SecurityGroupAgentRpcApiMixin,
type_tunnel.TunnelAgentRpcApiMixin):
"""Agent side of the openvswitch rpc API.
API version history:
1.0 - Initial version.
1.1 - Added get_active_networks_info, create_dhcp_port,
update_dhcp_port, and removed get_dhcp_port methods.
1.4 - Added network_update
1.5 - Added binding_activate and binding_deactivate
"""
def __init__(self, topic):
self.topic = topic
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
topics.DELETE)
self.topic_port_update = topics.get_topic_name(topic,
topics.PORT,
topics.UPDATE)
self.topic_port_delete = topics.get_topic_name(topic,
topics.PORT,
topics.DELETE)
self.topic_network_update = topics.get_topic_name(topic,
topics.NETWORK,
topics.UPDATE)
self.topic_port_binding_deactivate = topics.get_topic_name(
topic, topics.PORT_BINDING, topics.DEACTIVATE)
self.topic_port_binding_activate = topics.get_topic_name(
topic, topics.PORT_BINDING, topics.ACTIVATE)
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def network_delete(self, context, network_id):
cctxt = self.client.prepare(topic=self.topic_network_delete,
fanout=True)
cctxt.cast(context, 'network_delete', network_id=network_id)
def port_update(self, context, port, network_type, segmentation_id,
physical_network):
cctxt = self.client.prepare(topic=self.topic_port_update,
fanout=True)
cctxt.cast(context, 'port_update', port=port,
network_type=network_type, segmentation_id=segmentation_id,
physical_network=physical_network)
def port_delete(self, context, port_id):
cctxt = self.client.prepare(topic=self.topic_port_delete,
fanout=True)
cctxt.cast(context, 'port_delete', port_id=port_id)
def network_update(self, context, network):
cctxt = self.client.prepare(topic=self.topic_network_update,
fanout=True, version='1.4')
cctxt.cast(context, 'network_update', network=network)
def binding_deactivate(self, context, port_id, host, network_id):
cctxt = self.client.prepare(topic=self.topic_port_binding_deactivate,
fanout=True, version='1.5')
cctxt.cast(context, 'binding_deactivate', port_id=port_id, host=host,
network_id=network_id)
def binding_activate(self, context, port_id, host):
cctxt = self.client.prepare(topic=self.topic_port_binding_activate,
fanout=True, version='1.5')
cctxt.cast(context, 'binding_activate', port_id=port_id, host=host)