Skip to content

Commit

Permalink
Improve port dhcp Provisioning
Browse files Browse the repository at this point in the history
Currently, the dhcp Provisioning of ports is the crucial bottleneck
of that concurrently boot multiple VM.

The root cause is that these ports will be processed one by one by dhcp
agent when they belong to the same network, And the 'Provisioning complete'
port is still blocked other port's processing in other dhcp agents. The
patch aim to optimize the dispatch strategy of the port cast to agent to
improve the Provisioning process.

In server side, I classify messages to multi levels. Especially, I classify
the port_update_end or port_create_end message to two levels, the high-level
message only cast to one agent, the low-level message cast to all agent. In
agent side I put these messages to `resource_processing_queue`, with the queue,
We can delete `_net_lock` and process these messages in order of priority.

Additonally, I modified the `resource_processing_queue` for my demand. I update
`_queue` from LIST to PriorityQueue in `ExclusiveResourceProcessor`, by this
way, we can sort all message which cached in `ExclusiveResourceProcessor` by
priority.

Related-Bug: #1760047
Change-Id: I255caa0571c42fb012fe882259ef181070beccef
  • Loading branch information
jeffyjf committed Jan 28, 2019
1 parent f03797a commit 99f4495
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 128 deletions.
19 changes: 10 additions & 9 deletions neutron/agent/common/resource_processing_queue.py
Expand Up @@ -89,7 +89,7 @@ def __init__(self, id):

if id not in self._masters:
self._masters[id] = self
self._queue = []
self._queue = Queue.PriorityQueue(-1)

self._master = self._masters[id]

Expand Down Expand Up @@ -119,7 +119,7 @@ def queue_update(self, update):
resource is being processed. These updates have already bubbled to
the front of the ResourceProcessingQueue.
"""
self._master._queue.append(update)
self._master._queue.put(update)

def updates(self):
"""Processes the resource until updates stop coming
Expand All @@ -128,13 +128,14 @@ def updates(self):
may come in from other workers while it is in progress. This method
loops until they stop coming.
"""
if self._i_am_master():
while self._queue:
# Remove the update from the queue even if it is old.
update = self._queue.pop(0)
# Process the update only if it is fresh.
if self._get_resource_data_timestamp() < update.timestamp:
yield update
while self._i_am_master():
if self._queue.empty():
return
# Get the update from the queue even if it is old.
update = self._queue.get()
# Process the update only if it is fresh.
if self._get_resource_data_timestamp() < update.timestamp:
yield update


class ResourceProcessingQueue(object):
Expand Down
217 changes: 146 additions & 71 deletions neutron/agent/dhcp/agent.py
Expand Up @@ -23,7 +23,6 @@
from neutron_lib import constants
from neutron_lib import context
from neutron_lib import exceptions
from neutron_lib.utils import runtime
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
Expand All @@ -34,6 +33,7 @@
import six

from neutron._i18n import _
from neutron.agent.common import resource_processing_queue as queue
from neutron.agent.linux import dhcp
from neutron.agent.linux import external_process
from neutron.agent.metadata import driver as metadata_driver
Expand All @@ -46,6 +46,8 @@
LOG = logging.getLogger(__name__)
_SYNC_STATE_LOCK = lockutils.ReaderWriterLock()

DEFAULT_PRIORITY = 255


def _sync_lock(f):
"""Decorator to block all operations for a global sync call."""
Expand All @@ -65,12 +67,6 @@ def wrapped(*args, **kwargs):
return wrapped


def _net_lock(network_id):
"""Returns a context manager lock based on network_id."""
lock_name = 'dhcp-agent-network-lock-%s' % network_id
return lockutils.lock(lock_name, runtime.SYNCHRONIZED_PREFIX)


class DhcpAgent(manager.Manager):
"""DHCP agent service manager.
Expand Down Expand Up @@ -110,6 +106,7 @@ def __init__(self, host=None, conf=None):
self._process_monitor = external_process.ProcessMonitor(
config=self.conf,
resource_type='dhcp')
self._queue = queue.ResourceProcessingQueue()

def init_host(self):
self.sync_state()
Expand Down Expand Up @@ -138,6 +135,7 @@ def run(self):
"""Activate the DHCP agent."""
self.periodic_resync()
self.start_ready_ports_loop()
eventlet.spawn_n(self._process_loop)

def call_driver(self, action, network, **action_kwargs):
"""Invoke an action on a DHCP driver instance."""
Expand Down Expand Up @@ -381,36 +379,64 @@ def refresh_dhcp_helper(self, network_id):
# Update the metadata proxy after the dhcp driver has been updated
self.update_isolated_metadata_proxy(network)

@_wait_if_syncing
def network_create_end(self, context, payload):
"""Handle the network.create.end notification event."""
network_id = payload['network']['id']
with _net_lock(network_id):
self.enable_dhcp_helper(network_id)
update = queue.ResourceUpdate(payload['network']['id'],
payload.get('priority',
DEFAULT_PRIORITY),
action='_network_create',
resource=payload)
self._queue.add(update)

@_wait_if_syncing
def _network_create(self, payload):
network_id = payload['network']['id']
self.enable_dhcp_helper(network_id)

def network_update_end(self, context, payload):
"""Handle the network.update.end notification event."""
network_id = payload['network']['id']
with _net_lock(network_id):
if payload['network']['admin_state_up']:
self.enable_dhcp_helper(network_id)
else:
self.disable_dhcp_helper(network_id)
update = queue.ResourceUpdate(payload['network']['id'],
payload.get('priority',
DEFAULT_PRIORITY),
action='_network_update',
resource=payload)
self._queue.add(update)

@_wait_if_syncing
def _network_update(self, payload):
network_id = payload['network']['id']
if payload['network']['admin_state_up']:
self.enable_dhcp_helper(network_id)
else:
self.disable_dhcp_helper(network_id)

def network_delete_end(self, context, payload):
"""Handle the network.delete.end notification event."""
network_id = payload['network_id']
with _net_lock(network_id):
self.disable_dhcp_helper(network_id)
update = queue.ResourceUpdate(payload['network_id'],
payload.get('priority',
DEFAULT_PRIORITY),
action='_network_delete',
resource=payload)
self._queue.add(update)

@_wait_if_syncing
def _network_delete(self, payload):
network_id = payload['network_id']
self.disable_dhcp_helper(network_id)

def subnet_update_end(self, context, payload):
"""Handle the subnet.update.end notification event."""
update = queue.ResourceUpdate(payload['subnet']['network_id'],
payload.get('priority',
DEFAULT_PRIORITY),
action='_subnet_update',
resource=payload)
self._queue.add(update)

@_wait_if_syncing
def _subnet_update(self, payload):
network_id = payload['subnet']['network_id']
with _net_lock(network_id):
self.refresh_dhcp_helper(network_id)
self.refresh_dhcp_helper(network_id)

# Use the update handler for the subnet create event.
subnet_create_end = subnet_update_end
Expand All @@ -433,31 +459,63 @@ def _get_network_lock_id(self, payload):
port = self.cache.get_port_by_id(port_id)
return port.network_id if port else None

@_wait_if_syncing
def subnet_delete_end(self, context, payload):
"""Handle the subnet.delete.end notification event."""
network_id = self._get_network_lock_id(payload)
if not network_id:
return
with _net_lock(network_id):
subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id)
if not network:
return
self.refresh_dhcp_helper(network.id)
update = queue.ResourceUpdate(network_id,
payload.get('priority',
DEFAULT_PRIORITY),
action='_subnet_delete',
resource=payload)
self._queue.add(update)

@_wait_if_syncing
def _subnet_delete(self, payload):
network_id = self._get_network_lock_id(payload)
if not network_id:
return
subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id)
if not network:
return
self.refresh_dhcp_helper(network.id)

def _process_loop(self):
LOG.debug("Starting _process_loop")

pool = eventlet.GreenPool(size=8)
while True:
pool.spawn_n(self._process_resource_update)

def _process_resource_update(self):
for tmp, update in self._queue.each_update_to_next_resource():
method = getattr(self, update.action)
method(update.resource)

def port_update_end(self, context, payload):
"""Handle the port.update.end notification event."""
updated_port = dhcp.DictModel(payload['port'])
with _net_lock(updated_port.network_id):
if self.cache.is_port_message_stale(payload['port']):
LOG.debug("Discarding stale port update: %s", updated_port)
return
network = self.cache.get_network_by_id(updated_port.network_id)
if not network:
return
self.reload_allocations(updated_port, network)
if self.cache.is_port_message_stale(updated_port):
LOG.debug("Discarding stale port update: %s", updated_port)
return
update = queue.ResourceUpdate(updated_port.network_id,
payload.get('priority',
DEFAULT_PRIORITY),
action='_port_update',
resource=updated_port)
self._queue.add(update)

@_wait_if_syncing
def _port_update(self, updated_port):
if self.cache.is_port_message_stale(updated_port):
LOG.debug("Discarding stale port update: %s", updated_port)
return
network = self.cache.get_network_by_id(updated_port.network_id)
if not network:
return
self.reload_allocations(updated_port, network)

def reload_allocations(self, port, network):
LOG.info("Trigger reload_allocations for port %s", port)
Expand Down Expand Up @@ -493,50 +551,67 @@ def _is_port_on_this_agent(self, port):
port['network_id'], self.conf.host)
return port['device_id'] == thishost

@_wait_if_syncing
def port_create_end(self, context, payload):
"""Handle the port.create.end notification event."""
created_port = dhcp.DictModel(payload['port'])
with _net_lock(created_port.network_id):
network = self.cache.get_network_by_id(created_port.network_id)
if not network:
return
new_ips = {i['ip_address'] for i in created_port['fixed_ips']}
for port_cached in network.ports:
# if there are other ports cached with the same ip address in
# the same network this indicate that the cache is out of sync
cached_ips = {i['ip_address']
for i in port_cached['fixed_ips']}
if new_ips.intersection(cached_ips):
self.schedule_resync("Duplicate IP addresses found, "
"DHCP cache is out of sync",
created_port.network_id)
return
self.reload_allocations(created_port, network)
update = queue.ResourceUpdate(created_port.network_id,
payload.get('priority',
DEFAULT_PRIORITY),
action='_port_create',
resource=created_port)
self._queue.add(update)

@_wait_if_syncing
def _port_create(self, created_port):
network = self.cache.get_network_by_id(created_port.network_id)
if not network:
return
new_ips = {i['ip_address'] for i in created_port['fixed_ips']}
for port_cached in network.ports:
# if there are other ports cached with the same ip address in
# the same network this indicate that the cache is out of sync
cached_ips = {i['ip_address']
for i in port_cached['fixed_ips']}
if new_ips.intersection(cached_ips):
self.schedule_resync("Duplicate IP addresses found, "
"DHCP cache is out of sync",
created_port.network_id)
return
self.reload_allocations(created_port, network)

def port_delete_end(self, context, payload):
"""Handle the port.delete.end notification event."""
network_id = self._get_network_lock_id(payload)
if not network_id:
return
with _net_lock(network_id):
port_id = payload['port_id']
port = self.cache.get_port_by_id(port_id)
self.cache.deleted_ports.add(port_id)
if not port:
return
network = self.cache.get_network_by_id(port.network_id)
self.cache.remove_port(port)
if self._is_port_on_this_agent(port):
# the agent's port has been deleted. disable the service
# and add the network to the resync list to create
# (or acquire a reserved) port.
self.call_driver('disable', network)
self.schedule_resync("Agent port was deleted", port.network_id)
else:
self.call_driver('reload_allocations', network)
self.update_isolated_metadata_proxy(network)
update = queue.ResourceUpdate(network_id,
payload.get('priority',
DEFAULT_PRIORITY),
action='_port_delete',
resource=payload)
self._queue.add(update)

@_wait_if_syncing
def _port_delete(self, payload):
network_id = self._get_network_lock_id(payload)
if not network_id:
return
port_id = payload['port_id']
port = self.cache.get_port_by_id(port_id)
self.cache.deleted_ports.add(port_id)
if not port:
return
network = self.cache.get_network_by_id(port.network_id)
self.cache.remove_port(port)
if self._is_port_on_this_agent(port):
# the agent's port has been deleted. disable the service
# and add the network to the resync list to create
# (or acquire a reserved) port.
self.call_driver('disable', network)
self.schedule_resync("Agent port was deleted", port.network_id)
else:
self.call_driver('reload_allocations', network)
self.update_isolated_metadata_proxy(network)

def update_isolated_metadata_proxy(self, network):
"""Spawn or kill metadata proxy.
Expand Down

0 comments on commit 99f4495

Please sign in to comment.