Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions neutron/agent/ovn/metadata/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

import collections
import functools
from random import randint
import re
import threading
import uuid

import netaddr
from neutron_lib import constants as n_const
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log
from oslo_utils import netutils
from ovsdbapp.backend.ovs_idl import event as row_event
Expand All @@ -35,10 +37,12 @@
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils as ovn_utils
from neutron.common import utils
from neutron.conf.agent.database import agents_db
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf as config


LOG = log.getLogger(__name__)
agents_db.register_db_agents_opts()
_SYNC_STATE_LOCK = lockutils.ReaderWriterLock()
CHASSIS_METADATA_LOCK = 'chassis_metadata_lock'

Expand Down Expand Up @@ -186,14 +190,34 @@ def __init__(self, metadata_agent):
events = (self.ROW_UPDATE,)
super(SbGlobalUpdateEvent, self).__init__(events, table, None)
self.event_name = self.__class__.__name__
self.first_run = True

def run(self, event, row, old):
table = ('Chassis_Private' if self.agent.has_chassis_private
else 'Chassis')
self.agent.sb_idl.db_set(
table, self.agent.chassis, ('external_ids', {
ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY:
str(row.nb_cfg)})).execute()

def _update_chassis(self, row):
table = ('Chassis_Private' if self.agent.has_chassis_private
else 'Chassis')
self.agent.sb_idl.db_set(
table, self.agent.chassis, ('external_ids', {
ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY:
str(row.nb_cfg)})).execute()

delay = 0
if self.first_run:
self.first_run = False
else:
# We occasionally see port binding failed errors due to
# the ml2 driver refusing to bind the port to a dead agent.
# if all agents heartbeat at the same time, they will all
# cause a load spike on the server. To mitigate that we
# need to spread out the load by introducing a random delay.
# clamp the max delay between 3 and 10 seconds.
max_delay = max(min(cfg.CONF.agent_down_time // 3, 10), 3)
delay = randint(0, max_delay)

LOG.debug("Delaying updating chassis table for %s seconds", delay)
timer = threading.Timer(delay, _update_chassis, [self, row])
timer.start()


class MetadataAgent(object):
Expand Down
2 changes: 1 addition & 1 deletion neutron/db/db_base_plugin_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@


def _ensure_subnet_not_used(context, subnet_id):
models_v2.Subnet.lock_register(
models_v2.Subnet.write_lock_register(
context, exc.SubnetInUse(subnet_id=subnet_id), id=subnet_id)
try:
registry.publish(
Expand Down
2 changes: 1 addition & 1 deletion neutron/db/ipam_backend_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ def _ipam_get_subnets(self, context, network_id, host, service_type=None,
msg = ('This subnet is being modified by another concurrent '
'operation')
for subnet in subnets:
subnet.lock_register(
subnet.read_lock_register(
context, exc.SubnetInUse(subnet_id=subnet.id, reason=msg),
id=subnet.id)
subnet_dicts = [self._make_subnet_dict(subnet, context=context)
Expand Down
52 changes: 39 additions & 13 deletions neutron/db/models_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,51 @@
class HasInUse(object):
"""NeutronBaseV2 mixin, to add the flag "in_use" to a DB model.

The content of this flag (boolean) parameter is not relevant. The goal of
this field is to be used in a write transaction to mark a DB register as
"in_use". Writing any value on this DB parameter will lock the container
register. At the end of the DB transaction, the DB engine will check if
this register was modified or deleted. In such case, the transaction will
fail and won't be commited.

"lock_register" is the method to write the register "in_use" column.
Because the lifespan of this DB lock is the DB transaction, there isn't an
unlock method. The lock will finish once the transaction ends.
The goal of this class is to allow users lock specific database rows with
a shared or exclusive lock (without necessarily introducing a change in
the table itself). Having these locks allows the DB engine to prevent
concurrent modifications (e.g. the deletion of a resource while we are
currently adding a new dependency on the resource).

"read_lock_register" takes a shared DB lock on the row specified by the
filters. The lock is automatically released once the transaction ends.
You can have any number of parallel read locks on the same DB row. But
you can not have any write lock in parallel.

"write_lock_register" takes an exclusive DB lock on the row specified by
the filters. The lock is automatically released on transaction commit.
You may only have one write lock on each row at a time. It therefor
blocks all other read and write locks to this row.
"""
# keep this value to not need to update the database schema
# only at backport
in_use = sa.Column(sa.Boolean(), nullable=False,
server_default=sql.false(), default=False)

@classmethod
def lock_register(cls, context, exception, **filters):
def write_lock_register(cls, context, exception, **filters):
# we use `with_for_update()` to include `FOR UPDATE` in the sql
# statement.
# we need to set `enable_eagerloads(False)` so that we do not try to
# load attached resources (e.g. standardattributes) as this breaks the
# `FOR UPDATE` statement.
num_reg = context.session.query(
cls).filter_by(**filters).update({'in_use': True})
if num_reg != 1:
cls).filter_by(**filters).enable_eagerloads(
False).with_for_update().first()
if num_reg is None:
raise exception

@classmethod
def read_lock_register(cls, context, exception, **filters):
# we use `with_for_update(read=True)` to include `LOCK IN SHARE MODE`
# in the sql statement.
# we need to set `enable_eagerloads(False)` so that we do not try to
# load attached resources (e.g. standardattributes) as this breaks the
# `LOCK IN SHARE MODE` statement.
num_reg = context.session.query(
cls).filter_by(**filters).enable_eagerloads(
False).with_for_update(read=True).first()
if num_reg is None:
raise exception


Expand Down
4 changes: 2 additions & 2 deletions zuul.d/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@

- job:
name: neutron-linuxbridge-tempest-plugin-scenario-nftables
parent: neutron-tempest-plugin-scenario-linuxbridge
parent: neutron-tempest-plugin-scenario-linuxbridge-yoga
pre-run: playbooks/install_nftables.yaml
vars:
devstack_local_conf:
Expand All @@ -162,7 +162,7 @@

- job:
name: neutron-ovs-tempest-plugin-scenario-iptables_hybrid-nftables
parent: neutron-tempest-plugin-scenario-openvswitch-iptables_hybrid
parent: neutron-tempest-plugin-scenario-openvswitch-iptables_hybrid-yoga
pre-run: playbooks/install_nftables.yaml
vars:
devstack_local_conf:
Expand Down