Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
419 lines (351 sloc) 18 KB
# Copyright (c) 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
import functools
import itertools
import random
from neutron_lib.api.definitions import availability_zone as az_def
from neutron_lib import constants as lib_const
from neutron_lib.db import api as lib_db_api
from neutron_lib.exceptions import l3 as l3_exc
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
import six
from neutron.common import utils
from neutron.conf.db import l3_hamode_db
from neutron.db.models import l3agent as rb_model
from neutron.objects import l3agent as rb_obj
LOG = logging.getLogger(__name__)
cfg.CONF.register_opts(l3_hamode_db.L3_HA_OPTS)
@six.add_metaclass(abc.ABCMeta)
class L3Scheduler(object):
def __init__(self):
self.max_ha_agents = cfg.CONF.max_l3_agents_per_router
def schedule(self, plugin, context, router_id, candidates=None):
"""Schedule the router to an active L3 agent.
Schedule the router only if it is not already scheduled.
"""
return self._schedule_router(
plugin, context, router_id, candidates=candidates)
def _router_has_binding(self, context, router_id, l3_agent_id):
router_binding_model = rb_model.RouterL3AgentBinding
query = context.session.query(router_binding_model.router_id)
query = query.filter(router_binding_model.router_id == router_id,
router_binding_model.l3_agent_id == l3_agent_id)
return query.count() > 0
def _get_routers_can_schedule(self, plugin, context, routers, l3_agent):
"""Get the subset of routers that can be scheduled on the L3 agent."""
ids_to_discard = set()
for router in routers:
# check if the l3 agent is compatible with the router
candidates = plugin.get_l3_agent_candidates(
context, router, [l3_agent])
if not candidates:
ids_to_discard.add(router['id'])
return [r for r in routers if r['id'] not in ids_to_discard]
def auto_schedule_routers(self, plugin, context, host):
"""Schedule under-scheduled routers to L3 Agents.
An under-scheduled router is a router that is either completely
un-scheduled (scheduled to 0 agents), or an HA router that is
under-scheduled (scheduled to less than max_l3_agents configuration
option. The function finds all the under-scheduled routers and
schedules them.
:param host: if unspecified, under-scheduled routers are scheduled to
all agents (not necessarily from the requesting host). If
specified, under-scheduled routers are scheduled only to
the agent on 'host'.
"""
l3_agent = plugin.get_enabled_agent_on_host(
context, lib_const.AGENT_TYPE_L3, host)
if not l3_agent:
return
underscheduled_routers = self._get_underscheduled_routers(
plugin, context)
target_routers = self._get_routers_can_schedule(
plugin, context, underscheduled_routers, l3_agent)
for router in target_routers:
self.schedule(plugin, context, router['id'], candidates=[l3_agent])
def _get_underscheduled_routers(self, plugin, context):
underscheduled_routers = []
max_agents_for_ha = plugin.get_number_of_agents_for_scheduling(context)
for router, count in plugin.get_routers_l3_agents_count(context):
if (count < 1 or
router.get('ha', False) and count < max_agents_for_ha):
# Either the router was un-scheduled (scheduled to 0 agents),
# or it's an HA router and it was under-scheduled (scheduled to
# less than max_agents_for_ha). Either way, it should be added
# to the list of routers we want to handle.
underscheduled_routers.append(router)
return underscheduled_routers
def _get_candidates(self, plugin, context, sync_router):
"""Return L3 agents where a router could be scheduled."""
is_ha = sync_router.get('ha', False)
with lib_db_api.CONTEXT_READER.using(context):
# allow one router is hosted by just
# one enabled l3 agent hosting since active is just a
# timing problem. Non-active l3 agent can return to
# active any time
current_l3_agents = plugin.get_l3_agents_hosting_routers(
context, [sync_router['id']], admin_state_up=True)
if current_l3_agents and not is_ha:
LOG.debug('Router %(router_id)s has already been hosted '
'by L3 agent %(agent_id)s',
{'router_id': sync_router['id'],
'agent_id': current_l3_agents[0]['id']})
return []
active_l3_agents = plugin.get_l3_agents(context, active=True)
if not active_l3_agents:
LOG.warning('No active L3 agents')
return []
candidates = plugin.get_l3_agent_candidates(context,
sync_router,
active_l3_agents)
if not candidates:
LOG.warning('No L3 agents can host the router %s',
sync_router['id'])
return candidates
def _bind_routers(self, plugin, context, routers, l3_agent):
for router in routers:
if router.get('ha'):
if not self._router_has_binding(context, router['id'],
l3_agent.id):
self.create_ha_port_and_bind(
plugin, context, router['id'],
router['tenant_id'], l3_agent)
else:
self.bind_router(plugin, context, router['id'], l3_agent.id)
@lib_db_api.retry_db_errors
def bind_router(self, plugin, context, router_id, agent_id,
is_manual_scheduling=False, is_ha=False):
"""Bind the router to the l3 agent which has been chosen.
The function tries to create a RouterL3AgentBinding object and add it
to the database. It returns the binding that was created or None if it
failed to create it due to some conflict.
In the HA router case, when creating a RouterL3AgentBinding (with some
binding_index) fails because some other RouterL3AgentBinding was
concurrently created using the same binding_index, then the function
will retry to create an entry with a new binding_index. This creation
will be retried up to lib_db_api.MAX_RETRIES times.
If, still in the HA router case, the creation failed because the
router has already been bound to the l3 agent in question or has been
removed (by a concurrent operation), then no further attempts will be
made and the function will return None.
Note that for non-HA routers, the function will always perform exactly
one try, regardless of the error preventing the addition of a new
RouterL3AgentBinding object to the database.
"""
if rb_obj.RouterL3AgentBinding.objects_exist(
context, router_id=router_id, l3_agent_id=agent_id):
LOG.debug('Router %(router_id)s has already been scheduled '
'to L3 agent %(agent_id)s.',
{'router_id': router_id, 'agent_id': agent_id})
return
if not is_ha:
binding_index = rb_model.LOWEST_BINDING_INDEX
if rb_obj.RouterL3AgentBinding.objects_exist(
context, router_id=router_id, binding_index=binding_index):
LOG.debug('Non-HA router %s has already been scheduled',
router_id)
return
else:
binding_index = plugin.get_vacant_binding_index(
context, router_id, is_manual_scheduling)
if binding_index < rb_model.LOWEST_BINDING_INDEX:
LOG.debug('Unable to find a vacant binding_index for '
'router %(router_id)s and agent %(agent_id)s',
{'router_id': router_id,
'agent_id': agent_id})
return
try:
binding = rb_obj.RouterL3AgentBinding(
context, l3_agent_id=agent_id,
router_id=router_id, binding_index=binding_index)
binding.create()
LOG.debug('Router %(router_id)s is scheduled to L3 agent '
'%(agent_id)s with binding_index %(binding_index)d',
{'router_id': router_id,
'agent_id': agent_id,
'binding_index': binding_index})
return binding
except db_exc.DBReferenceError:
LOG.debug('Router %s has already been removed '
'by concurrent operation', router_id)
def _schedule_router(self, plugin, context, router_id,
candidates=None):
if not plugin.router_supports_scheduling(context, router_id):
return
sync_router = plugin.get_router(context, router_id)
candidates = candidates or self._get_candidates(
plugin, context, sync_router)
if not candidates:
return
elif sync_router.get('ha', False):
chosen_agents = self._bind_ha_router(plugin, context,
router_id,
sync_router.get('tenant_id'),
candidates)
if not chosen_agents:
return
chosen_agent = chosen_agents[-1]
else:
chosen_agent = self._choose_router_agent(
plugin, context, candidates)
self.bind_router(plugin, context, router_id, chosen_agent.id)
return chosen_agent
@abc.abstractmethod
def _choose_router_agent(self, plugin, context, candidates):
"""Choose an agent from candidates based on a specific policy."""
pass
@abc.abstractmethod
def _choose_router_agents_for_ha(self, plugin, context, candidates):
"""Choose agents from candidates based on a specific policy."""
pass
def _get_num_of_agents_for_ha(self, candidates_count):
return (min(self.max_ha_agents, candidates_count) if self.max_ha_agents
else candidates_count)
def _add_port_from_net_and_ensure_vr_id(self, plugin, ctxt, router_db,
tenant_id, ha_net):
plugin._ensure_vr_id(ctxt, router_db, ha_net)
return plugin.add_ha_port(ctxt, router_db.id, ha_net.network_id,
tenant_id)
def create_ha_port_and_bind(self, plugin, context, router_id,
tenant_id, agent, is_manual_scheduling=False):
"""Creates and binds a new HA port for this agent."""
ctxt = context.elevated()
router_db = plugin._get_router(ctxt, router_id)
creator = functools.partial(self._add_port_from_net_and_ensure_vr_id,
plugin, ctxt, router_db, tenant_id)
dep_getter = functools.partial(plugin.get_ha_network, ctxt, tenant_id)
dep_creator = functools.partial(plugin._create_ha_network,
ctxt, tenant_id)
dep_deleter = functools.partial(plugin._delete_ha_network, ctxt)
dep_id_attr = 'network_id'
# This might fail in case of concurrent calls, which is good for us
# as we can skip the rest of this function.
binding = self.bind_router(
plugin, context, router_id, agent['id'],
is_manual_scheduling=is_manual_scheduling, is_ha=True)
if not binding:
return
try:
port_binding = utils.create_object_with_dependency(
creator, dep_getter, dep_creator,
dep_id_attr, dep_deleter)[0]
# NOTE(ralonsoh): to be migrated to the new facade that can't be
# used with "create_object_with_dependency".
with lib_db_api.autonested_transaction(context.session):
port_binding.l3_agent_id = agent['id']
except db_exc.DBDuplicateEntry:
LOG.debug("Router %(router)s already scheduled for agent "
"%(agent)s", {'router': router_id,
'agent': agent['id']})
port_id = port_binding.port_id
# Below call will also delete entry from L3HARouterAgentPortBinding
# and RouterPort tables
plugin._core_plugin.delete_port(context, port_id,
l3_port_check=False)
except l3_exc.RouterNotFound:
LOG.debug('Router %s has already been removed '
'by concurrent operation', router_id)
# we try to clear the HA network here in case the port we created
# blocked the concurrent router delete operation from getting rid
# of the HA network
ha_net = plugin.get_ha_network(ctxt, tenant_id)
if ha_net:
plugin.safe_delete_ha_network(ctxt, ha_net, tenant_id)
def _filter_scheduled_agents(self, plugin, context, router_id, candidates):
hosting = plugin.get_l3_agents_hosting_routers(context, [router_id])
# convert to comparable types
hosting_list = [tuple(host) for host in hosting]
return list(set(candidates) - set(hosting_list))
def _bind_ha_router(self, plugin, context, router_id,
tenant_id, candidates):
"""Bind a HA router to agents based on a specific policy."""
candidates = self._filter_scheduled_agents(plugin, context, router_id,
candidates)
chosen_agents = self._choose_router_agents_for_ha(
plugin, context, candidates)
for agent in chosen_agents:
self.create_ha_port_and_bind(plugin, context, router_id,
tenant_id, agent)
return chosen_agents
class ChanceScheduler(L3Scheduler):
"""Randomly allocate an L3 agent for a router."""
def _choose_router_agent(self, plugin, context, candidates):
return random.choice(candidates)
def _choose_router_agents_for_ha(self, plugin, context, candidates):
num_agents = self._get_num_of_agents_for_ha(len(candidates))
return random.sample(candidates, num_agents)
class LeastRoutersScheduler(L3Scheduler):
"""Allocate to an L3 agent with the least number of routers bound."""
def _choose_router_agent(self, plugin, context, candidates):
candidate_ids = [candidate['id'] for candidate in candidates]
chosen_agent = plugin.get_l3_agent_with_min_routers(
context, candidate_ids)
return chosen_agent
def _choose_router_agents_for_ha(self, plugin, context, candidates):
num_agents = self._get_num_of_agents_for_ha(len(candidates))
ordered_agents = plugin.get_l3_agents_ordered_by_num_routers(
context, [candidate['id'] for candidate in candidates])
return ordered_agents[:num_agents]
class AZLeastRoutersScheduler(LeastRoutersScheduler):
"""Availability zone aware scheduler.
If a router is ha router, allocate L3 agents distributed AZs
according to router's az_hints.
"""
def _get_az_hints(self, router):
return (router.get(az_def.AZ_HINTS) or
cfg.CONF.default_availability_zones)
def _get_routers_can_schedule(self, plugin, context, routers, l3_agent):
"""Overwrite L3Scheduler's method to filter by availability zone."""
target_routers = []
for r in routers:
az_hints = self._get_az_hints(r)
if not az_hints or l3_agent['availability_zone'] in az_hints:
target_routers.append(r)
if not target_routers:
return []
return super(AZLeastRoutersScheduler, self)._get_routers_can_schedule(
plugin, context, target_routers, l3_agent)
def _get_candidates(self, plugin, context, sync_router):
"""Overwrite L3Scheduler's method to filter by availability zone."""
all_candidates = (
super(AZLeastRoutersScheduler, self)._get_candidates(
plugin, context, sync_router))
candidates = []
az_hints = self._get_az_hints(sync_router)
for agent in all_candidates:
if not az_hints or agent['availability_zone'] in az_hints:
candidates.append(agent)
return candidates
def _choose_router_agents_for_ha(self, plugin, context, candidates):
ordered_agents = plugin.get_l3_agents_ordered_by_num_routers(
context, [candidate['id'] for candidate in candidates])
num_agents = self._get_num_of_agents_for_ha(len(ordered_agents))
# Order is kept in each az
group_by_az = collections.defaultdict(list)
for agent in ordered_agents:
az = agent['availability_zone']
group_by_az[az].append(agent)
selected_agents = []
for az, agents in itertools.cycle(group_by_az.items()):
if not agents:
continue
selected_agents.append(agents.pop(0))
if len(selected_agents) >= num_agents:
break
return selected_agents
You can’t perform that action at this time.