Skip to content

Commit

Permalink
Make DHCP notifier use core resource events
Browse files Browse the repository at this point in the history
This makes the notifier subscribe to core resource events
and leverage them if they are available. This solves the
issue where internal core plugin calls from service plugins
were not generating DHCP agent notifications.

Closes-Bug: #1621345
Change-Id: I607635601caff0322fd0c80c9023f5c4f663ca25
  • Loading branch information
kevinbenton committed Sep 14, 2016
1 parent 9d24490 commit 181bdb3
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 26 deletions.
51 changes: 43 additions & 8 deletions neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py
Expand Up @@ -52,6 +52,7 @@ class DhcpAgentNotifyAPI(object):
'port.delete.end']

def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
self._unsubscribed_resources = []
self._plugin = plugin
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
Expand All @@ -69,9 +70,22 @@ def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
resources.SUBNET,
resources.SUBNETS,
)
if not cfg.CONF.dhcp_agent_notification:
return
for resource in callback_resources:
registry.subscribe(self._send_dhcp_notification,
resource, events.BEFORE_RESPONSE)
self.uses_native_notifications = {}
for resource in (resources.NETWORK, resources.PORT, resources.SUBNET):
self.uses_native_notifications[resource] = {'create': False,
'update': False,
'delete': False}
registry.subscribe(self._native_event_send_dhcp_notification,
resource, events.AFTER_CREATE)
registry.subscribe(self._native_event_send_dhcp_notification,
resource, events.AFTER_UPDATE)
registry.subscribe(self._native_event_send_dhcp_notification,
resource, events.AFTER_DELETE)

@property
def plugin(self):
Expand Down Expand Up @@ -212,16 +226,37 @@ def _after_router_interface_deleted(self, resource, event, trigger,
{'port_id': kwargs['port']['id']},
kwargs['port']['network_id'])

def _native_event_send_dhcp_notification(self, resource, event, trigger,
context, **kwargs):
action = event.replace('after_', '')
# we unsubscribe the _send_dhcp_notification method now that we know
# the loaded core plugin emits native resource events
if resource not in self._unsubscribed_resources:
self.uses_native_notifications[resource][action] = True
if all(self.uses_native_notifications[resource].values()):
# only unsubscribe the API level listener if we are
# receiving all event types for this resource
self._unsubscribed_resources.append(resource)
registry.unsubscribe_by_resource(self._send_dhcp_notification,
resource)
method_name = '.'.join((resource, action, 'end'))
payload = kwargs[resource]
data = {resource: payload}
self.notify(context, data, method_name)

def _send_dhcp_notification(self, resource, event, trigger, context=None,
data=None, method_name=None, collection=None,
**kwargs):
if cfg.CONF.dhcp_agent_notification:
if collection and collection in data:
for body in data[collection]:
item = {resource: body}
self.notify(context, item, method_name)
else:
self.notify(context, data, method_name)
action='', **kwargs):
action = action.split('_')[0]
if (resource in self.uses_native_notifications and
self.uses_native_notifications[resource][action]):
return
if collection and collection in data:
for body in data[collection]:
item = {resource: body}
self.notify(context, item, method_name)
else:
self.notify(context, data, method_name)

def notify(self, context, data, method_name):
# data is {'key' : 'value'} with only one key
Expand Down
6 changes: 6 additions & 0 deletions neutron/pecan_wsgi/hooks/notifier.py
Expand Up @@ -47,6 +47,8 @@ def _nova_notify(self, action, resource, *args):
self._nova_notifier.send_network_change(action_resource, *args)

def _notify_dhcp_agent(self, context, resource_name, action, resources):
# NOTE(kevinbenton): we should remove this whole method in Ocata and
# make plugins emit the core resource events
plugin = manager.NeutronManager.get_plugin_for_resource(resource_name)
notifier_method = '%s.%s.end' % (resource_name, action)
# use plugin's dhcp notifier, if this is already instantiated
Expand All @@ -55,6 +57,10 @@ def _notify_dhcp_agent(self, context, resource_name, action, resources):
agent_notifiers.get(constants.AGENT_TYPE_DHCP) or
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
native_map = getattr(dhcp_agent_notifier, 'uses_native_notifications',
{})
if native_map.get(resource_name, {}).get(action):
return
# The DHCP Agent does not accept bulk notifications
for resource in resources:
item = {resource_name: resource}
Expand Down
5 changes: 5 additions & 0 deletions neutron/plugins/common/utils.py
Expand Up @@ -19,6 +19,7 @@
import contextlib
import hashlib

import debtcollector
from neutron_lib import constants as n_const
from neutron_lib import exceptions
from oslo_config import cfg
Expand Down Expand Up @@ -172,6 +173,10 @@ def create_network(core_plugin, context, net, check_allow_post=True):
return core_plugin.create_network(context, {'network': net_data})


@debtcollector.removals.remove(
message="This will be removed in the O cycle. "
"Please call update_network directly on the plugin."
)
def update_network(core_plugin, context, network_id, net_data):
network = core_plugin.update_network(
context, network_id, {resources.NETWORK: net_data})
Expand Down
6 changes: 3 additions & 3 deletions neutron/services/auto_allocate/db.py
Expand Up @@ -340,9 +340,9 @@ def _save(self, context, tenant_id, network_id, router_id, subnets):
tenant_id=tenant_id,
network_id=network_id,
router_id=router_id))
p_utils.update_network(
self.core_plugin, context,
network_id, {'admin_state_up': True})
self.core_plugin.update_network(
context, network_id,
{'network': {'admin_state_up': True}})
except db_exc.DBDuplicateEntry:
LOG.error(_LE("Multiple auto-allocated networks detected for "
"tenant %(tenant)s. Attempting clean up for "
Expand Down
24 changes: 19 additions & 5 deletions neutron/tests/functional/pecan_wsgi/test_hooks.py
Expand Up @@ -222,7 +222,7 @@ def test_after_on_list_excludes_admin_attribute(self):
self.assertNotIn('restricted_attr', json_response['mehs'][0])


class TestDHCPNotifierHook(test_functional.PecanFunctionalTest):
class DHCPNotifierTestBase(test_functional.PecanFunctionalTest):

def setUp(self):
# the DHCP notifier needs to be mocked so that correct operations can
Expand All @@ -232,29 +232,42 @@ def setUp(self):
patcher = mock.patch('neutron.api.rpc.agentnotifiers.'
'dhcp_rpc_agent_api.DhcpAgentNotifyAPI.notify')
self.mock_notifier = patcher.start()
super(TestDHCPNotifierHook, self).setUp()
super(DHCPNotifierTestBase, self).setUp()

def test_dhcp_notifications_disabled(self):

class TestDHCPNotifierHookNegative(DHCPNotifierTestBase):

def setUp(self):
cfg.CONF.set_override('dhcp_agent_notification', False)
super(TestDHCPNotifierHookNegative, self).setUp()

def test_dhcp_notifications_disabled(self):
self.app.post_json(
'/v2.0/networks.json',
params={'network': {'name': 'meh'}},
headers={'X-Project-Id': 'tenid'})
self.assertEqual(0, self.mock_notifier.call_count)


class TestDHCPNotifierHook(DHCPNotifierTestBase):

def test_get_does_not_trigger_notification(self):
self.do_request('/v2.0/networks', tenant_id='tenid')
self.assertEqual(0, self.mock_notifier.call_count)

def test_post_put_delete_triggers_notification(self):
ctx = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()

req_headers = {'X-Project-Id': 'tenid', 'X-Roles': 'admin'}
response = self.app.post_json(
'/v2.0/networks.json',
params={'network': {'name': 'meh'}}, headers=req_headers)
self.assertEqual(201, response.status_int)
json_body = jsonutils.loads(response.body)
net = {'network': plugin.get_network(ctx, json_body['network']['id'])}
self.assertEqual(1, self.mock_notifier.call_count)
self.assertEqual(mock.call(mock.ANY, json_body, 'network.create.end'),
self.assertEqual(mock.call(mock.ANY, net, 'network.create.end'),
self.mock_notifier.mock_calls[-1])
network_id = json_body['network']['id']

Expand All @@ -264,8 +277,9 @@ def test_post_put_delete_triggers_notification(self):
headers=req_headers)
self.assertEqual(200, response.status_int)
json_body = jsonutils.loads(response.body)
net = {'network': plugin.get_network(ctx, json_body['network']['id'])}
self.assertEqual(2, self.mock_notifier.call_count)
self.assertEqual(mock.call(mock.ANY, json_body, 'network.update.end'),
self.assertEqual(mock.call(mock.ANY, net, 'network.update.end'),
self.mock_notifier.mock_calls[-1])

response = self.app.delete(
Expand Down
Expand Up @@ -19,6 +19,9 @@
from oslo_utils import timeutils

from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import utils
from neutron.db import agents_db
from neutron.db.agentschedulers_db import cfg
Expand Down Expand Up @@ -215,3 +218,23 @@ def test__fanout_message(self):
def test__cast_message(self):
self.notifier._cast_message(mock.ANY, mock.ANY, mock.ANY)
self.assertEqual(1, self.mock_cast.call_count)

def test__native_notification_unsubscribes(self):
self.assertFalse(self.notifier._unsubscribed_resources)
for res in (resources.PORT, resources.NETWORK, resources.SUBNET):
self.notifier._unsubscribed_resources = []
kwargs = {res: {}}
registry.notify(res, events.AFTER_CREATE, self,
context=mock.Mock(), **kwargs)
# don't unsubscribe until all three types are observed
self.assertEqual([], self.notifier._unsubscribed_resources)
registry.notify(res, events.AFTER_UPDATE, self,
context=mock.Mock(), **kwargs)
self.assertEqual([], self.notifier._unsubscribed_resources)
registry.notify(res, events.AFTER_DELETE, self,
context=mock.Mock(), **kwargs)
self.assertEqual([res], self.notifier._unsubscribed_resources)
# after first time, no further unsubscribing should happen
registry.notify(res, events.AFTER_CREATE, self,
context=mock.Mock(), **kwargs)
self.assertEqual([res], self.notifier._unsubscribed_resources)
18 changes: 17 additions & 1 deletion neutron/tests/unit/db/test_agentschedulers_db.py
Expand Up @@ -1359,7 +1359,7 @@ def test_agent_updated_dhcp_agent_notification(self):
mock.ANY, 'agent_updated',
{'admin_state_up': False}, DHCP_HOSTA)

def _network_port_create(
def _api_network_port_create(
self, hosts, gateway=constants.ATTR_NOT_SPECIFIED, owner=None):
for host in hosts:
helpers.register_dhcp_agent(host)
Expand All @@ -1374,6 +1374,22 @@ def _network_port_create(
with self.port(subnet=subnet1) as port:
return [net1, subnet1, port]

def _network_port_create(self, *args, **kwargs):
net, sub, port = self._api_network_port_create(*args, **kwargs)

dhcp_notifier = self.plugin.agent_notifiers[constants.AGENT_TYPE_DHCP]
if (not hasattr(dhcp_notifier, 'uses_native_notifications') or
not all(dhcp_notifier.uses_native_notifications[r]['create']
for r in ('port', 'subnet', 'network'))):
return net, sub, port
# since plugin has native dhcp notifications, the payloads will be the
# same as the getter outputs
ctx = context.get_admin_context()
net['network'] = self.plugin.get_network(ctx, net['network']['id'])
sub['subnet'] = self.plugin.get_subnet(ctx, sub['subnet']['id'])
port['port'] = self.plugin.get_port(ctx, port['port']['id'])
return net, sub, port

def _notification_mocks(self, hosts, net, subnet, port):
host_calls = {}
for host in hosts:
Expand Down
1 change: 1 addition & 0 deletions neutron/tests/unit/plugins/ml2/test_plugin.py
Expand Up @@ -780,6 +780,7 @@ def test_update_port_fixed_ip_changed(self):
self.assertTrue(sg_member_update.called)

def test_update_port_status_with_network(self):
registry.clear() # don't care about callback behavior
ctx = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()
with self.port() as port:
Expand Down
17 changes: 8 additions & 9 deletions neutron/tests/unit/services/auto_allocate/test_db.py
Expand Up @@ -20,7 +20,6 @@

from neutron.common import exceptions as c_exc
from neutron import context
from neutron.plugins.common import utils
from neutron.services.auto_allocate import db
from neutron.services.auto_allocate import exceptions
from neutron.tests.unit import testlib_api
Expand Down Expand Up @@ -160,14 +159,14 @@ def test__build_topology_error_network_with_router_and_interfaces(self):
provisioning_exception)

def test__save_with_provisioning_error(self):
with mock.patch.object(utils, "update_network", side_effect=Exception):
with testtools.ExpectedException(
exceptions.UnknownProvisioningError) as e:
self.mixin._save(self.ctx, 'foo_t', 'foo_n', 'foo_r',
[{'id': 'foo_s'}])
self.assertEqual('foo_n', e.network_id)
self.assertEqual('foo_r', e.router_id)
self.assertEqual([{'id': 'foo_s'}], e.subnets)
self.mixin._core_plugin.update_network.side_effect = Exception
with testtools.ExpectedException(
exceptions.UnknownProvisioningError) as e:
self.mixin._save(self.ctx, 'foo_t', 'foo_n', 'foo_r',
[{'id': 'foo_s'}])
self.assertEqual('foo_n', e.network_id)
self.assertEqual('foo_r', e.router_id)
self.assertEqual([{'id': 'foo_s'}], e.subnets)

def test__provision_external_connectivity_with_provisioning_error(self):
self.mixin._l3_plugin.create_router.side_effect = Exception
Expand Down

0 comments on commit 181bdb3

Please sign in to comment.