diff --git a/neutron/agent/ovn/metadata/agent.py b/neutron/agent/ovn/metadata/agent.py index 423e26f0ae5..f0be39d235c 100644 --- a/neutron/agent/ovn/metadata/agent.py +++ b/neutron/agent/ovn/metadata/agent.py @@ -14,6 +14,7 @@ import collections import functools +from random import randint import re import threading import uuid @@ -21,6 +22,7 @@ import netaddr from neutron_lib import constants as n_const from oslo_concurrency import lockutils +from oslo_config import cfg from oslo_log import log from oslo_utils import netutils from ovsdbapp.backend.ovs_idl import event as row_event @@ -35,10 +37,12 @@ from neutron.common.ovn import constants as ovn_const from neutron.common.ovn import utils as ovn_utils from neutron.common import utils +from neutron.conf.agent.database import agents_db from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf as config LOG = log.getLogger(__name__) +agents_db.register_db_agents_opts() _SYNC_STATE_LOCK = lockutils.ReaderWriterLock() CHASSIS_METADATA_LOCK = 'chassis_metadata_lock' @@ -186,14 +190,34 @@ def __init__(self, metadata_agent): events = (self.ROW_UPDATE,) super(SbGlobalUpdateEvent, self).__init__(events, table, None) self.event_name = self.__class__.__name__ + self.first_run = True def run(self, event, row, old): - table = ('Chassis_Private' if self.agent.has_chassis_private - else 'Chassis') - self.agent.sb_idl.db_set( - table, self.agent.chassis, ('external_ids', { - ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY: - str(row.nb_cfg)})).execute() + + def _update_chassis(self, row): + table = ('Chassis_Private' if self.agent.has_chassis_private + else 'Chassis') + self.agent.sb_idl.db_set( + table, self.agent.chassis, ('external_ids', { + ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY: + str(row.nb_cfg)})).execute() + + delay = 0 + if self.first_run: + self.first_run = False + else: + # We occasionally see port binding failed errors due to + # the ml2 driver refusing to bind the port to a dead agent. + # if all agents heartbeat at the same time, they will all + # cause a load spike on the server. To mitigate that we + # need to spread out the load by introducing a random delay. + # clamp the max delay between 3 and 10 seconds. + max_delay = max(min(cfg.CONF.agent_down_time // 3, 10), 3) + delay = randint(0, max_delay) + + LOG.debug("Delaying updating chassis table for %s seconds", delay) + timer = threading.Timer(delay, _update_chassis, [self, row]) + timer.start() class MetadataAgent(object): diff --git a/neutron/db/db_base_plugin_v2.py b/neutron/db/db_base_plugin_v2.py index 8cdf757293d..973bad7cbd9 100644 --- a/neutron/db/db_base_plugin_v2.py +++ b/neutron/db/db_base_plugin_v2.py @@ -75,7 +75,7 @@ def _ensure_subnet_not_used(context, subnet_id): - models_v2.Subnet.lock_register( + models_v2.Subnet.write_lock_register( context, exc.SubnetInUse(subnet_id=subnet_id), id=subnet_id) try: registry.publish( diff --git a/neutron/db/ipam_backend_mixin.py b/neutron/db/ipam_backend_mixin.py index 088bd650b25..75f4c9f14dc 100644 --- a/neutron/db/ipam_backend_mixin.py +++ b/neutron/db/ipam_backend_mixin.py @@ -688,7 +688,7 @@ def _ipam_get_subnets(self, context, network_id, host, service_type=None, msg = ('This subnet is being modified by another concurrent ' 'operation') for subnet in subnets: - subnet.lock_register( + subnet.read_lock_register( context, exc.SubnetInUse(subnet_id=subnet.id, reason=msg), id=subnet.id) subnet_dicts = [self._make_subnet_dict(subnet, context=context) diff --git a/neutron/db/models_v2.py b/neutron/db/models_v2.py index 9e001c7dbdd..d8c74c27deb 100644 --- a/neutron/db/models_v2.py +++ b/neutron/db/models_v2.py @@ -33,25 +33,51 @@ class HasInUse(object): """NeutronBaseV2 mixin, to add the flag "in_use" to a DB model. - The content of this flag (boolean) parameter is not relevant. The goal of - this field is to be used in a write transaction to mark a DB register as - "in_use". Writing any value on this DB parameter will lock the container - register. At the end of the DB transaction, the DB engine will check if - this register was modified or deleted. In such case, the transaction will - fail and won't be commited. - - "lock_register" is the method to write the register "in_use" column. - Because the lifespan of this DB lock is the DB transaction, there isn't an - unlock method. The lock will finish once the transaction ends. + The goal of this class is to allow users lock specific database rows with + a shared or exclusive lock (without necessarily introducing a change in + the table itself). Having these locks allows the DB engine to prevent + concurrent modifications (e.g. the deletion of a resource while we are + currently adding a new dependency on the resource). + + "read_lock_register" takes a shared DB lock on the row specified by the + filters. The lock is automatically released once the transaction ends. + You can have any number of parallel read locks on the same DB row. But + you can not have any write lock in parallel. + + "write_lock_register" takes an exclusive DB lock on the row specified by + the filters. The lock is automatically released on transaction commit. + You may only have one write lock on each row at a time. It therefor + blocks all other read and write locks to this row. """ + # keep this value to not need to update the database schema + # only at backport in_use = sa.Column(sa.Boolean(), nullable=False, server_default=sql.false(), default=False) @classmethod - def lock_register(cls, context, exception, **filters): + def write_lock_register(cls, context, exception, **filters): + # we use `with_for_update()` to include `FOR UPDATE` in the sql + # statement. + # we need to set `enable_eagerloads(False)` so that we do not try to + # load attached resources (e.g. standardattributes) as this breaks the + # `FOR UPDATE` statement. num_reg = context.session.query( - cls).filter_by(**filters).update({'in_use': True}) - if num_reg != 1: + cls).filter_by(**filters).enable_eagerloads( + False).with_for_update().first() + if num_reg is None: + raise exception + + @classmethod + def read_lock_register(cls, context, exception, **filters): + # we use `with_for_update(read=True)` to include `LOCK IN SHARE MODE` + # in the sql statement. + # we need to set `enable_eagerloads(False)` so that we do not try to + # load attached resources (e.g. standardattributes) as this breaks the + # `LOCK IN SHARE MODE` statement. + num_reg = context.session.query( + cls).filter_by(**filters).enable_eagerloads( + False).with_for_update(read=True).first() + if num_reg is None: raise exception diff --git a/zuul.d/base.yaml b/zuul.d/base.yaml index 216044f3388..ec0bc1e1875 100644 --- a/zuul.d/base.yaml +++ b/zuul.d/base.yaml @@ -151,7 +151,7 @@ - job: name: neutron-linuxbridge-tempest-plugin-scenario-nftables - parent: neutron-tempest-plugin-scenario-linuxbridge + parent: neutron-tempest-plugin-scenario-linuxbridge-yoga pre-run: playbooks/install_nftables.yaml vars: devstack_local_conf: @@ -162,7 +162,7 @@ - job: name: neutron-ovs-tempest-plugin-scenario-iptables_hybrid-nftables - parent: neutron-tempest-plugin-scenario-openvswitch-iptables_hybrid + parent: neutron-tempest-plugin-scenario-openvswitch-iptables_hybrid-yoga pre-run: playbooks/install_nftables.yaml vars: devstack_local_conf: