Skip to content

Commit

Permalink
Add binding activation to OVS agent
Browse files Browse the repository at this point in the history
As part of the implementation of multiple port bindings [1], add binding
activation support to the OVS agent. This will enable the execution in
OVS agents of the complete sequence of steps outlined in [1] during an
instance migration:

1) Create inactive port bindings for destination host
2) Migrate the instance to the destination host and plug its VIFs
3) Activate the port bindings in the destination host
4) Delete the port bindings for the source host

[1] https://review.openstack.org/#/c/309416/

Change-Id: Iabca39364ec95633b2a8891fc295b3ada5f4f5e0
Partial-Bug: #1580880
  • Loading branch information
Miguel Lavalle committed Jul 18, 2018
1 parent 0694beb commit 5c3bf12
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 41 deletions.
43 changes: 31 additions & 12 deletions neutron/agent/rpc.py
Expand Up @@ -35,6 +35,7 @@
from neutron import objects

LOG = logging.getLogger(__name__)
BINDING_DEACTIVATE = 'binding_deactivate'


def create_consumers(endpoints, prefix, topic_details, start_listening=True):
Expand Down Expand Up @@ -201,14 +202,23 @@ def _legacy_notifier(self, rtype, event, trigger, context, resource_id,
the payloads the handlers are expecting (an ID).
"""
rtype = rtype.lower() # all legacy handlers don't camelcase
method, host = self._get_method_host(rtype, event, **kwargs)
method, host_with_activation, host_with_deactivation = (
self._get_method_host(rtype, event, **kwargs))
if not hasattr(self._legacy_interface, method):
# TODO(kevinbenton): once these notifications are stable, emit
# a deprecation warning for legacy handlers
return
payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id,
'host': host}
getattr(self._legacy_interface, method)(context, **payload)
# If there is a binding deactivation, we must also notify the
# corresponding activation
if method == BINDING_DEACTIVATE:
self._legacy_interface.binding_deactivate(
context, port_id=resource_id, host=host_with_deactivation)
self._legacy_interface.binding_activate(
context, port_id=resource_id, host=host_with_activation)
else:
payload = {rtype: {'id': resource_id},
'%s_id' % rtype: resource_id}
getattr(self._legacy_interface, method)(context, **payload)

def _get_method_host(self, rtype, event, **kwargs):
"""Constructs the name of method to be called in the legacy interface.
Expand All @@ -222,9 +232,10 @@ def _get_method_host(self, rtype, event, **kwargs):
is_delete = event == callback_events.AFTER_DELETE
suffix = 'delete' if is_delete else 'update'
method = "%s_%s" % (rtype, suffix)
host = None
host_with_activation = None
host_with_deactivation = None
if is_delete or rtype != callback_resources.PORT:
return method, host
return method, host_with_activation, host_with_deactivation

# A port update was received. Find out if it is a binding activation
# where a previous binding was deactivated
Expand All @@ -245,9 +256,10 @@ def _get_method_host(self, rtype, event, **kwargs):
getattr(kwargs['updated'], 'bindings', []),
constants.INACTIVE,
host=existing_active_binding.host)):
method = 'binding_deactivate'
host = existing_active_binding.host
return method, host
method = BINDING_DEACTIVATE
host_with_activation = updated_active_binding.host
host_with_deactivation = existing_active_binding.host
return method, host_with_activation, host_with_deactivation

def get_devices_details_list_and_failed_devices(self, context, devices,
agent_id, host=None):
Expand All @@ -274,15 +286,22 @@ def get_device_details(self, context, device, agent_id, host=None):
if not segment:
LOG.debug("Device %s is not bound to any segment.", port_obj)
return {'device': device}
binding = utils.get_port_binding_by_status_and_host(
port_obj.bindings, constants.ACTIVE, raise_if_not_found=True,
port_id=port_obj.id)
if (port_obj.device_owner.startswith(
constants.DEVICE_OWNER_COMPUTE_PREFIX) and
binding[pb_ext.HOST] != host):
LOG.debug("Device %s has no active binding in this host",
port_obj)
return {'device': device,
n_const.NO_ACTIVE_BINDING: True}
net = self.remote_resource_cache.get_resource_by_id(
resources.NETWORK, port_obj.network_id)
net_qos_policy_id = net.qos_policy_id
# match format of old RPC interface
mac_addr = str(netaddr.EUI(str(port_obj.mac_address),
dialect=netaddr.mac_unix_expanded))
binding = utils.get_port_binding_by_status_and_host(
port_obj.bindings, constants.ACTIVE, raise_if_not_found=True,
port_id=port_obj.id)
entry = {
'device': device,
'network_id': port_obj.network_id,
Expand Down
3 changes: 3 additions & 0 deletions neutron/common/constants.py
Expand Up @@ -227,3 +227,6 @@
# Units base
SI_BASE = 1000
IEC_BASE = 1024

# Port bindings handling
NO_ACTIVE_BINDING = 'no_active_binding'
58 changes: 47 additions & 11 deletions neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
Expand Up @@ -55,6 +55,7 @@
from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import securitygroups_rpc as sg_rpc
from neutron.common import config
from neutron.common import constants as c_const
from neutron.common import utils as n_utils
from neutron.conf.agent import xenapi_conf
from neutron.plugins.ml2.drivers.agent import capabilities
Expand Down Expand Up @@ -124,7 +125,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
# 1.2 Support DVR (Distributed Virtual Router) RPC
# 1.3 Added param devices_to_update to security_groups_provider_updated
# 1.4 Added support for network_update
# 1.5 Added binding_deactivate
# 1.5 Added binding_activate and binding_deactivate
target = oslo_messaging.Target(version='1.5')

def __init__(self, bridge_classes, ext_manager, conf=None):
Expand Down Expand Up @@ -177,6 +178,8 @@ def __init__(self, bridge_classes, ext_manager, conf=None):
self.deleted_ports = set()
# Stores the port IDs whose binding has been deactivated
self.deactivated_bindings = set()
# Stores the port IDs whose binding has been activated
self.activated_bindings = set()

self.network_ports = collections.defaultdict(set)
# keeps association between ports and ofports to detect ofport change
Expand Down Expand Up @@ -423,6 +426,12 @@ def binding_deactivate(self, context, **kwargs):
port_id = kwargs.get('port_id')
self.deactivated_bindings.add(port_id)

def binding_activate(self, context, **kwargs):
if kwargs.get('host') != self.conf.host:
return
port_id = kwargs.get('port_id')
self.activated_bindings.add(port_id)

def _clean_network_ports(self, port_id):
for port_set in self.network_ports.values():
if port_id in port_set:
Expand Down Expand Up @@ -467,6 +476,12 @@ def process_deactivated_bindings(self, port_info):
LOG.debug(("Port id %s unplugged from integration bridge because "
"its binding was de-activated"), port_id)

def process_activated_bindings(self, port_info, activated_bindings_copy):
# Compute which ports for activated bindings are still present...
activated_bindings_copy &= port_info['current']
# ...and treat them as just added
port_info['added'] |= activated_bindings_copy

def tunnel_update(self, context, **kwargs):
LOG.debug("tunnel_update received")
if not self.enable_tunneling:
Expand Down Expand Up @@ -1534,6 +1549,7 @@ def cleanup_tunnel_port(self, br, tun_ofport, tunnel_type):
def treat_devices_added_or_updated(self, devices, provisioning_needed):
skipped_devices = []
need_binding_devices = []
binding_no_activated_devices = set()
devices_details_list = (
self.plugin_rpc.get_devices_details_list_and_failed_devices(
self.context,
Expand Down Expand Up @@ -1576,13 +1592,21 @@ def treat_devices_added_or_updated(self, devices, provisioning_needed):
details['network_id'])
self.ext_manager.handle_port(self.context, details)
else:
LOG.warning(
"Device %s not defined on plugin or binding failed",
device)
if c_const.NO_ACTIVE_BINDING in details:
# Port was added to the bridge, but its binding in this
# agent hasn't been activated yet. It will be treated as
# added when binding is activated
binding_no_activated_devices.add(device)
LOG.debug("Device %s has no active binding in host",
device)
else:
LOG.warning(
"Device %s not defined on plugin or binding failed",
device)
if (port and port.ofport != -1):
self.port_dead(port)
return (skipped_devices, need_binding_devices,
failed_devices)
return (skipped_devices, binding_no_activated_devices,
need_binding_devices, failed_devices)

def _update_port_network(self, port_id, network_id):
self._clean_network_ports(port_id)
Expand Down Expand Up @@ -1673,19 +1697,23 @@ def process_network_ports(self, port_info, provisioning_needed):
port_info.get('updated', set()))
need_binding_devices = []
skipped_devices = set()
binding_no_activated_devices = set()
if devices_added_updated:
start = time.time()
(skipped_devices, need_binding_devices,
failed_devices['added']) = (
(skipped_devices, binding_no_activated_devices,
need_binding_devices, failed_devices['added']) = (
self.treat_devices_added_or_updated(
devices_added_updated, provisioning_needed))
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_added_or_updated completed. "
"Skipped %(num_skipped)d devices of "
"%(num_current)d devices currently available. "
"Skipped %(num_skipped)d and no activated binding "
"devices %(num_no_active_binding)d of %(num_current)d "
"devices currently available. "
"Time elapsed: %(elapsed).3f",
{'iter_num': self.iter_num,
'num_skipped': len(skipped_devices),
'num_no_active_binding':
len(binding_no_activated_devices),
'num_current': len(port_info['current']),
'elapsed': time.time() - start})
# Update the list of current ports storing only those which
Expand All @@ -1695,7 +1723,8 @@ def process_network_ports(self, port_info, provisioning_needed):

# TODO(salv-orlando): Optimize avoiding applying filters
# unnecessarily, (eg: when there are no IP address changes)
added_ports = port_info.get('added', set()) - skipped_devices
added_ports = (port_info.get('added', set()) - skipped_devices -
binding_no_activated_devices)
self._add_port_tag_info(need_binding_devices)
self.sg_agent.setup_port_filters(added_ports,
port_info.get('updated', set()))
Expand Down Expand Up @@ -1810,6 +1839,7 @@ def _agent_has_updates(self, polling_manager):
self.updated_ports or
self.deleted_ports or
self.deactivated_bindings or
self.activated_bindings or
self.sg_agent.firewall_refresh_needed())

def _port_info_has_changes(self, port_info):
Expand Down Expand Up @@ -2031,6 +2061,7 @@ def rpc_loop(self, polling_manager=None, bridges_monitor=None):
sync = False
ports = set()
updated_ports_copy = set()
activated_bindings_copy = set()
ancillary_ports = set()
tunnel_sync = True
ovs_restarted = False
Expand Down Expand Up @@ -2091,6 +2122,8 @@ def rpc_loop(self, polling_manager=None, bridges_monitor=None):
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
activated_bindings_copy = self.activated_bindings
self.activated_bindings = set()
(port_info, ancillary_port_info, consecutive_resyncs,
ports_not_ready_yet) = (self.process_port_info(
start, polling_manager, sync, ovs_restarted,
Expand All @@ -2100,6 +2133,8 @@ def rpc_loop(self, polling_manager=None, bridges_monitor=None):
sync = False
self.process_deleted_ports(port_info)
self.process_deactivated_bindings(port_info)
self.process_activated_bindings(port_info,
activated_bindings_copy)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
port_info.setdefault('updated', set()).update(
Expand Down Expand Up @@ -2154,6 +2189,7 @@ def rpc_loop(self, polling_manager=None, bridges_monitor=None):
LOG.exception("Error while processing VIF ports")
# Put the ports back in self.updated_port
self.updated_ports |= updated_ports_copy
self.activated_bindings |= activated_bindings_copy
sync = True
port_stats = self.get_port_stats(port_info, ancillary_port_info)
self.loop_count_and_wait(start, port_stats)
Expand Down
59 changes: 48 additions & 11 deletions neutron/tests/unit/agent/test_rpc.py
Expand Up @@ -16,6 +16,7 @@
import datetime

import mock
import netaddr
from neutron_lib.agent import topics as lib_topics
from neutron_lib.callbacks import events
from neutron_lib.callbacks import resources
Expand All @@ -24,6 +25,8 @@
from oslo_utils import uuidutils

from neutron.agent import rpc
from neutron.common import constants as n_const
from neutron.objects import network
from neutron.objects import ports
from neutron.tests import base

Expand Down Expand Up @@ -176,21 +179,36 @@ def setUp(self):
super(TestCacheBackedPluginApi, self).setUp()
self._api = rpc.CacheBackedPluginApi(lib_topics.PLUGIN)
self._api._legacy_interface = mock.Mock()
self._api.remote_resource_cache = mock.Mock()
self._network_id = uuidutils.generate_uuid()
self._segment_id = uuidutils.generate_uuid()
self._segment = network.NetworkSegment(
id=self._segment_id, network_id=self._network_id,
network_type=constants.TYPE_FLAT)
self._port_id = uuidutils.generate_uuid()
self._network = network.Network(id=self._network_id,
segments=[self._segment])
self._port = ports.Port(
id=self._port_id,
id=self._port_id, network_id=self._network_id,
mac_address=netaddr.EUI('fa:16:3e:ec:c7:d9'), admin_state_up=True,
security_group_ids=set([uuidutils.generate_uuid()]),
fixed_ips=[], allowed_address_pairs=[],
device_owner=constants.DEVICE_OWNER_COMPUTE_PREFIX,
bindings=[ports.PortBinding(port_id=self._port_id,
host='host1',
status=constants.ACTIVE)])
status=constants.ACTIVE,
profile={})],
binding_levels=[ports.PortBindingLevel(port_id=self._port_id,
host='host1',
segment=self._segment)])

def test__legacy_notifier_resource_delete(self):
self._api._legacy_notifier(resources.PORT, events.AFTER_DELETE, self,
mock.ANY, resource_id=self._port_id,
existing=self._port)
self._api._legacy_interface.port_update.assert_not_called()
self._api._legacy_interface.port_delete.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
mock.ANY, port={'id': self._port_id}, port_id=self._port_id)
self._api._legacy_interface.binding_deactivate.assert_not_called()

def test__legacy_notifier_resource_update(self):
Expand All @@ -201,8 +219,7 @@ def test__legacy_notifier_resource_update(self):
existing=self._port, updated=updated_port)
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.port_update.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
mock.ANY, port={'id': self._port_id}, port_id=self._port_id)
self._api._legacy_interface.binding_deactivate.assert_not_called()

def _test__legacy_notifier_binding_activated(self):
Expand All @@ -225,8 +242,9 @@ def _test__legacy_notifier_binding_activated(self):
def test__legacy_notifier_new_binding_activated(self):
self._test__legacy_notifier_binding_activated()
self._api._legacy_interface.binding_deactivate.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host='host1')
mock.ANY, host='host1', port_id=self._port_id)
self._api._legacy_interface.binding_activate.assert_called_once_with(
mock.ANY, host='host2', port_id=self._port_id)

def test__legacy_notifier_no_new_binding_activated(self):
updated_port = ports.Port(
Expand All @@ -240,8 +258,7 @@ def test__legacy_notifier_no_new_binding_activated(self):
resource_id=self._port_id,
existing=self._port, updated=updated_port)
self._api._legacy_interface.port_update.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
mock.ANY, port={'id': self._port_id}, port_id=self._port_id)
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.binding_deactivate.assert_not_called()

Expand All @@ -257,11 +274,31 @@ def test__legacy_notifier_existing_or_updated_is_none(self):
resource_id=self._port_id,
existing=self._port, updated=None)
call = mock.call(mock.ANY, port={'id': self._port_id},
port_id=self._port_id, host=None)
port_id=self._port_id)
self._api._legacy_interface.port_update.assert_has_calls([call, call])
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.binding_deactivate.assert_not_called()

def test__legacy_notifier_binding_activated_not_supported(self):
del self._api._legacy_interface.binding_deactivate
self._test__legacy_notifier_binding_activated()

def test_get_device_details_binding_in_host(self):
self._api.remote_resource_cache.get_resource_by_id.side_effect = [
self._port, self._network]
entry = self._api.get_device_details(mock.ANY, self._port_id, mock.ANY,
'host1')
self.assertEqual(self._port_id, entry['device'])
self.assertEqual(self._port_id, entry['port_id'])
self.assertEqual(self._network_id, entry['network_id'])
self.assertNotIn(n_const.NO_ACTIVE_BINDING, entry)

def test_get_device_details_binding_not_in_host(self):
self._api.remote_resource_cache.get_resource_by_id.side_effect = [
self._port, self._network]
entry = self._api.get_device_details(mock.ANY, self._port_id, mock.ANY,
'host2')
self.assertEqual(self._port_id, entry['device'])
self.assertNotIn('port_id', entry)
self.assertNotIn('network_id', entry)
self.assertIn(n_const.NO_ACTIVE_BINDING, entry)

0 comments on commit 5c3bf12

Please sign in to comment.