Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions neutron/common/ovn/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class StandardAttributeIDNotFound(n_exc.NeutronException):


class HashRingIsEmpty(n_exc.NeutronException):
message = _('Hash Ring returned empty when hashing "%(key)s". '
'This should never happen in a normal situation, please '
'check the status of your cluster')
message = _('Hash Ring returned empty when hashing "%(key)s". All '
'%(node_count)d nodes were found offline. This should never '
'happen in a normal situation, please check the status '
'of your cluster')
9 changes: 8 additions & 1 deletion neutron/common/ovn/hash_ring_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, group_name):
# Flag to rate limit the caching log
self._prev_num_nodes = -1
self.admin_ctx = context.get_admin_context()
self._offline_node_count = 0

@property
def _wait_startup_before_caching(self):
Expand Down Expand Up @@ -92,6 +93,11 @@ def _load_hash_ring(self, refresh=False):
self._hash_ring = hashring.HashRing({node.node_uuid
for node in nodes})
self._last_time_loaded = timeutils.utcnow()
self._offline_node_count = db_hash_ring.count_offline_nodes(
self.admin_ctx, constants.HASH_RING_NODES_TIMEOUT,
self._group)
LOG.debug("Hash Ring loaded. %d active nodes. %d offline nodes",
len(nodes), self._offline_node_count)

def refresh(self):
self._load_hash_ring(refresh=True)
Expand All @@ -108,4 +114,5 @@ def get_node(self, key):
# KeyError is raised
return self._hash_ring[key].pop()
except KeyError:
raise exceptions.HashRingIsEmpty(key=key)
raise exceptions.HashRingIsEmpty(
key=key, node_count=self._offline_node_count)
41 changes: 33 additions & 8 deletions neutron/db/ovn_hash_ring_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

from neutron_lib.db import api as db_api
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
from oslo_utils import uuidutils

from neutron.db.models import ovn as ovn_models

CONF = cfg.CONF
LOG = log.getLogger(__name__)


# NOTE(ralonsoh): this was migrated from networking-ovn to neutron and should
Expand All @@ -34,6 +36,8 @@ def add_node(context, group_name, node_uuid=None):
with db_api.CONTEXT_WRITER.using(context):
context.session.add(ovn_models.OVNHashRing(
node_uuid=node_uuid, hostname=CONF.host, group_name=group_name))
LOG.info('Node %s from host "%s" and group "%s" added to the Hash Ring',
node_uuid, CONF.host, group_name)
return node_uuid


Expand All @@ -42,6 +46,8 @@ def remove_nodes_from_host(context, group_name):
context.session.query(ovn_models.OVNHashRing).filter(
ovn_models.OVNHashRing.hostname == CONF.host,
ovn_models.OVNHashRing.group_name == group_name).delete()
LOG.info('Nodes from host "%s" and group "%s" removed from the Hash Ring',
CONF.host, group_name)


def _touch(context, **filter_args):
Expand All @@ -58,12 +64,31 @@ def touch_node(context, node_uuid):
_touch(context, node_uuid=node_uuid)


def get_active_nodes(context, interval, group_name, from_host=False):
def _get_nodes_query(context, interval, group_name, offline=False,
from_host=False):
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
with db_api.CONTEXT_READER.using(context):
query = context.session.query(ovn_models.OVNHashRing).filter(
ovn_models.OVNHashRing.updated_at >= limit,
ovn_models.OVNHashRing.group_name == group_name)
if from_host:
query = query.filter_by(hostname=CONF.host)
return query.all()
query = context.session.query(ovn_models.OVNHashRing).filter(
ovn_models.OVNHashRing.group_name == group_name)

if offline:
query = query.filter(ovn_models.OVNHashRing.updated_at < limit)
else:
query = query.filter(ovn_models.OVNHashRing.updated_at >= limit)

if from_host:
query = query.filter_by(hostname=CONF.host)

return query


@db_api.CONTEXT_READER
def get_active_nodes(context, interval, group_name, from_host=False):
query = _get_nodes_query(context, interval, group_name,
from_host=from_host)
return query.all()


@db_api.CONTEXT_READER
def count_offline_nodes(context, interval, group_name):
query = _get_nodes_query(context, interval, group_name, offline=True)
return query.count()
9 changes: 9 additions & 0 deletions neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from oslo_db import exception as os_db_exc
from oslo_log import log
from oslo_utils import timeutils
from ovsdbapp.backend.ovs_idl import idlutils

from neutron._i18n import _
from neutron.common.ovn import acl as ovn_acl
Expand Down Expand Up @@ -1322,6 +1323,14 @@ def delete_agent(self, context, id, _driver=None):
chassis_name = agent['configurations']['chassis_name']
_driver.sb_ovn.chassis_del(chassis_name, if_exists=True).execute(
check_error=True)
if _driver.sb_ovn.is_table_present('Chassis_Private'):
# TODO(ralonsoh): implement the corresponding chassis_private
# commands in ovsdbapp.
try:
_driver.sb_ovn.db_destroy('Chassis_Private', chassis_name).execute(
check_error=True)
except idlutils.RowNotFound:
pass
# Send a specific event that all API workers can get to delete the agent
# from their caches. Ideally we could send a single transaction that both
# created and deleted the key, but alas python-ovs is too "smart"
Expand Down
4 changes: 2 additions & 2 deletions neutron/services/qos/qos_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,9 +651,9 @@ def _validate_create_network_callback(self, resource, event, trigger,
network_id = payload.resource_id
network = network_object.Network.get_object(context, id=network_id)

policy_id = network.qos_policy_id
if policy_id is None:
if not network or not getattr(network, 'qos_policy_id', None):
return
policy_id = network.qos_policy_id

policy = policy_object.QosPolicy.get_object(
context.elevated(), id=policy_id)
Expand Down
30 changes: 25 additions & 5 deletions neutron/services/trunk/drivers/ovn/trunk_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ def _set_sub_ports(self, parent_port, subports):
context = n_context.get_admin_context()
db_parent_port = port_obj.Port.get_object(context, id=parent_port)
parent_port_status = db_parent_port.status
parent_port_bindings = db_parent_port.bindings[0]
for subport in subports:
with db_api.CONTEXT_WRITER.using(context), (
txn(check_error=True)) as ovn_txn:
port = self._set_binding_profile(context, subport, parent_port,
parent_port_status, ovn_txn)
parent_port_status,
parent_port_bindings, ovn_txn)
db_rev.bump_revision(context, port, ovn_const.TYPE_PORTS)

def _unset_sub_ports(self, subports):
Expand All @@ -67,7 +69,8 @@ def _unset_sub_ports(self, subports):

@db_base_plugin_common.convert_result_to_dict
def _set_binding_profile(self, context, subport, parent_port,
parent_port_status, ovn_txn):
parent_port_status,
parent_port_bindings, ovn_txn):
LOG.debug("Setting parent %s for subport %s",
parent_port, subport.port_id)
db_port = port_obj.Port.get_object(context, id=subport.port_id)
Expand All @@ -79,6 +82,9 @@ def _set_binding_profile(self, context, subport, parent_port,
check_rev_cmd = self.plugin_driver.nb_ovn.check_revision_number(
db_port.id, db_port, ovn_const.TYPE_PORTS)
ovn_txn.add(check_rev_cmd)
parent_binding_host = ''
if parent_port_bindings.host:
parent_binding_host = parent_port_bindings.host
try:
# NOTE(flaviof): We expect binding's host to be set. Otherwise,
# sub-port will not transition from DOWN to ACTIVE.
Expand All @@ -94,6 +100,7 @@ def _set_binding_profile(self, context, subport, parent_port,
port_obj.PortBinding.update_object(
context,
{'profile': binding.profile,
'host': parent_binding_host,
'vif_type': portbindings.VIF_TYPE_OVS},
port_id=subport.port_id,
host=binding.host)
Expand Down Expand Up @@ -155,6 +162,14 @@ def _unset_binding_profile(self, context, subport, ovn_txn):
LOG.debug("Done unsetting parent for subport %s", subport.port_id)
return db_port

def trunk_updated(self, trunk):
# Check if parent port is handled by OVN.
if not self.plugin_driver.nb_ovn.lookup('Logical_Switch_Port',
trunk.port_id, default=None):
return
if trunk.sub_ports:
self._set_sub_ports(trunk.port_id, trunk.sub_ports)

def trunk_created(self, trunk):
# Check if parent port is handled by OVN.
if not self.plugin_driver.nb_ovn.lookup('Logical_Switch_Port',
Expand Down Expand Up @@ -189,6 +204,8 @@ def subports_deleted(self, trunk, subports):
def trunk_event(self, resource, event, trunk_plugin, payload):
if event == events.AFTER_CREATE:
self.trunk_created(payload.states[0])
elif event == events.AFTER_UPDATE:
self.trunk_updated(payload.states[0])
elif event == events.AFTER_DELETE:
self.trunk_deleted(payload.states[0])

Expand All @@ -215,13 +232,16 @@ def register(self, resource, event, trigger, payload=None):
super(OVNTrunkDriver, self).register(
resource, event, trigger, payload=payload)
self._handler = OVNTrunkHandler(self.plugin_driver)
for trunk_event in (events.AFTER_CREATE, events.AFTER_DELETE):
for _event in (events.AFTER_CREATE, events.AFTER_UPDATE,
events.AFTER_DELETE):
registry.subscribe(self._handler.trunk_event,
resources.TRUNK,
trunk_event)
_event)

for _event in (events.AFTER_CREATE, events.AFTER_DELETE):
registry.subscribe(self._handler.subport_event,
resources.SUBPORTS,
trunk_event)
_event)

@classmethod
def create(cls, plugin_driver):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,19 @@ def _create_test_agent(self):
_, status = self.plugin.create_or_update_agent(self.context, agent)
return status['id']

def _check_chassis_registers(self, present=True):
chassis = self.sb_api.lookup('Chassis', self.chassis, default=None)
chassis_name = chassis.name if chassis else None
if self.sb_api.is_table_present('Chassis_Private'):
ch_private = self.sb_api.lookup(
'Chassis_Private', self.chassis, default=None)
ch_private_name = ch_private.name if ch_private else None
self.assertEqual(chassis_name, ch_private_name)
if present:
self.assertEqual(self.chassis, chassis_name)
else:
self.assertIsNone(chassis)

def test_agent_show(self):
for agent_id in self.agent_types.values():
self.assertTrue(self.plugin.get_agent(self.context, agent_id))
Expand Down Expand Up @@ -1217,12 +1230,15 @@ def test_agent_delete(self):
self.assertRaises(agent_exc.AgentNotFound, self.plugin.get_agent,
self.context, agent_id)

# OVN controller agent deletion, that triggers the "Chassis" register
# deletion. The "Chassis" register deletion triggers the host OVN
# agents deletion, both controller and metadata if present.
# OVN controller agent deletion, that triggers the "Chassis" and
# "Chassis_Private" registers deletion. The registers deletion triggers
# the host OVN agents deletion, both controller and metadata if
# present.
controller_id = self.agent_types[ovn_const.OVN_CONTROLLER_AGENT]
metadata_id = self.agent_types[ovn_const.OVN_METADATA_AGENT]
self._check_chassis_registers()
self.plugin.delete_agent(self.context, controller_id)
self._check_chassis_registers(present=False)
self.assertRaises(agent_exc.AgentNotFound, self.plugin.get_agent,
self.context, controller_id)
self.assertEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@

import contextlib

from neutron.services.trunk import plugin as trunk_plugin
from neutron.tests.functional import base
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants as n_consts
from neutron_lib.objects import registry as obj_reg
from neutron_lib.db import api as db_api
from neutron_lib.plugins import utils
from neutron_lib.services.trunk import constants as trunk_consts
from oslo_utils import uuidutils

from neutron.common.ovn import constants as ovn_const
from neutron.objects import ports as port_obj
from neutron.services.trunk import plugin as trunk_plugin
from neutron.tests.functional import base


class TestOVNTrunkDriver(base.TestOVNFunctionalBase):

def setUp(self):
super(TestOVNTrunkDriver, self).setUp()
def setUp(self, **kwargs):
super().setUp(**kwargs)
self.trunk_plugin = trunk_plugin.TrunkPlugin()
self.trunk_plugin.add_segmentation_type(
trunk_consts.SEGMENTATION_TYPE_VLAN,
Expand All @@ -39,7 +41,8 @@ def trunk(self, sub_ports=None):
sub_ports = sub_ports or []
with self.network() as network:
with self.subnet(network=network) as subnet:
with self.port(subnet=subnet) as parent_port:
with self.port(subnet=subnet,
device_owner='compute:nova') as parent_port:
tenant_id = uuidutils.generate_uuid()
trunk = {'trunk': {
'port_id': parent_port['port']['id'],
Expand All @@ -64,17 +67,14 @@ def _get_ovn_trunk_info(self):
if row.parent_name and row.tag:
device_owner = row.external_ids[
ovn_const.OVN_DEVICE_OWNER_EXT_ID_KEY]
revision_number = row.external_ids[
ovn_const.OVN_REV_NUM_EXT_ID_KEY]
ovn_trunk_info.append({'port_id': row.name,
'parent_port_id': row.parent_name,
'tag': row.tag,
'device_owner': device_owner,
'revision_number': revision_number,
})
return ovn_trunk_info

def _verify_trunk_info(self, trunk, has_items):
def _verify_trunk_info(self, trunk, has_items, host=''):
ovn_subports_info = self._get_ovn_trunk_info()
neutron_subports_info = []
for subport in trunk.get('sub_ports', []):
Expand All @@ -83,19 +83,27 @@ def _verify_trunk_info(self, trunk, has_items):
'parent_port_id': [trunk['port_id']],
'tag': [subport['segmentation_id']],
'device_owner': trunk_consts.TRUNK_SUBPORT_OWNER,
'revision_number': '2',
})
# Check that the subport has the binding is active.
binding = obj_reg.load_class('PortBinding').get_object(
self.context, port_id=subport['port_id'], host='')
self.assertEqual(n_consts.PORT_STATUS_ACTIVE, binding['status'])
# Check the subport binding.
pb = port_obj.PortBinding.get_object(
self.context, port_id=subport['port_id'], host=host)
self.assertEqual(n_consts.PORT_STATUS_ACTIVE, pb.status)
self.assertEqual(host, pb.host)

self.assertCountEqual(ovn_subports_info, neutron_subports_info)
self.assertEqual(has_items, len(neutron_subports_info) != 0)

if trunk.get('status'):
self.assertEqual(trunk_consts.TRUNK_ACTIVE_STATUS, trunk['status'])

def _bind_port(self, port_id, host):
with db_api.CONTEXT_WRITER.using(self.context):
pb = port_obj.PortBinding.get_object(self.context,
port_id=port_id, host='')
pb.delete()
port_obj.PortBinding(self.context, port_id=port_id, host=host,
vif_type=portbindings.VIF_TYPE_OVS).create()

def test_trunk_create(self):
with self.trunk() as trunk:
self._verify_trunk_info(trunk, has_items=False)
Expand All @@ -113,10 +121,22 @@ def test_subport_add(self):
new_trunk = self.trunk_plugin.get_trunk(self.context,
trunk['id'])
self._verify_trunk_info(new_trunk, has_items=True)
# Bind parent port. That will trigger the binding of the
# trunk subports too, using the same host ID.
self._bind_port(trunk['port_id'], 'host1')
self.mech_driver.set_port_status_up(trunk['port_id'])
self._verify_trunk_info(new_trunk, has_items=True,
host='host1')

def test_subport_delete(self):
with self.subport() as subport:
with self.trunk([subport]) as trunk:
# Bind parent port.
self._bind_port(trunk['port_id'], 'host1')
self.mech_driver.set_port_status_up(trunk['port_id'])
self._verify_trunk_info(trunk, has_items=True,
host='host1')

self.trunk_plugin.remove_subports(self.context, trunk['id'],
{'sub_ports': [subport]})
new_trunk = self.trunk_plugin.get_trunk(self.context,
Expand Down
Loading