-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
dhcp_rpc_agent_api.py
240 lines (214 loc) · 10.4 KB
/
dhcp_rpc_agent_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# Copyright (c) 2013 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from neutron_lib import constants
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from neutron._i18n import _LE, _LW
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import constants as n_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron import manager
LOG = logging.getLogger(__name__)
class DhcpAgentNotifyAPI(object):
"""API for plugin to notify DHCP agent.
This class implements the client side of an rpc interface. The server side
is neutron.agent.dhcp.agent.DhcpAgent. For more information about changing
rpc interfaces, please see doc/source/devref/rpc_api.rst.
"""
# It seems dhcp agent does not support bulk operation
VALID_RESOURCES = ['network', 'subnet', 'port']
VALID_METHOD_NAMES = ['network.create.end',
'network.update.end',
'network.delete.end',
'subnet.create.end',
'subnet.update.end',
'subnet.delete.end',
'port.create.end',
'port.update.end',
'port.delete.end']
def __init__(self, topic=topics.DHCP_AGENT, plugin=None):
self._plugin = plugin
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
# register callbacks for router interface changes
registry.subscribe(self._after_router_interface_created,
resources.ROUTER_INTERFACE, events.AFTER_CREATE)
registry.subscribe(self._after_router_interface_deleted,
resources.ROUTER_INTERFACE, events.AFTER_DELETE)
# register callbacks for events pertaining resources affecting DHCP
callback_resources = (
resources.NETWORK,
resources.NETWORKS,
resources.PORT,
resources.PORTS,
resources.SUBNET,
resources.SUBNETS,
)
for resource in callback_resources:
registry.subscribe(self._send_dhcp_notification,
resource, events.BEFORE_RESPONSE)
@property
def plugin(self):
if self._plugin is None:
self._plugin = manager.NeutronManager.get_plugin()
return self._plugin
def _schedule_network(self, context, network, existing_agents):
"""Schedule the network to new agents
:return: all agents associated with the network
"""
new_agents = self.plugin.schedule_network(context, network) or []
if new_agents:
for agent in new_agents:
self._cast_message(
context, 'network_create_end',
{'network': {'id': network['id']}}, agent['host'])
elif not existing_agents:
LOG.warning(_LW('Unable to schedule network %s: no agents '
'available; will retry on subsequent port '
'and subnet creation events.'),
network['id'])
return new_agents + existing_agents
def _get_enabled_agents(self, context, network, agents, method, payload):
"""Get the list of agents who can provide services."""
if not agents:
return []
network_id = network['id']
enabled_agents = agents
if not cfg.CONF.enable_services_on_agents_with_admin_state_down:
enabled_agents = [x for x in agents if x.admin_state_up]
active_agents = [x for x in agents if x.is_active]
len_enabled_agents = len(enabled_agents)
len_active_agents = len(active_agents)
if len_active_agents < len_enabled_agents:
LOG.warning(_LW("Only %(active)d of %(total)d DHCP agents "
"associated with network '%(net_id)s' "
"are marked as active, so notifications "
"may be sent to inactive agents."),
{'active': len_active_agents,
'total': len_enabled_agents,
'net_id': network_id})
if not enabled_agents:
num_ports = self.plugin.get_ports_count(
context, {'network_id': [network_id]})
notification_required = (
num_ports > 0 and len(network['subnets']) >= 1)
if notification_required:
LOG.error(_LE("Will not send event %(method)s for network "
"%(net_id)s: no agent available. Payload: "
"%(payload)s"),
{'method': method,
'net_id': network_id,
'payload': payload})
return enabled_agents
def _is_reserved_dhcp_port(self, port):
return port.get('device_id') == n_const.DEVICE_ID_RESERVED_DHCP_PORT
def _notify_agents(self, context, method, payload, network_id):
"""Notify all the agents that are hosting the network."""
# fanout is required as we do not know who is "listening"
no_agents = not utils.is_extension_supported(
self.plugin, constants.DHCP_AGENT_SCHEDULER_EXT_ALIAS)
fanout_required = method == 'network_delete_end' or no_agents
# we do nothing on network creation because we want to give the
# admin the chance to associate an agent to the network manually
cast_required = method != 'network_create_end'
if fanout_required:
self._fanout_message(context, method, payload)
elif cast_required:
admin_ctx = (context if context.is_admin else context.elevated())
network = self.plugin.get_network(admin_ctx, network_id)
agents = self.plugin.get_dhcp_agents_hosting_networks(
context, [network_id])
# schedule the network first, if needed
schedule_required = (
method == 'subnet_create_end' or
method == 'port_create_end' and
not self._is_reserved_dhcp_port(payload['port']))
if schedule_required:
agents = self._schedule_network(admin_ctx, network, agents)
if not agents:
LOG.debug("Network %s is not hosted by any dhcp agent",
network_id)
return
enabled_agents = self._get_enabled_agents(
context, network, agents, method, payload)
for agent in enabled_agents:
self._cast_message(
context, method, payload, agent.host, agent.topic)
def _cast_message(self, context, method, payload, host,
topic=topics.DHCP_AGENT):
"""Cast the payload to the dhcp agent running on the host."""
cctxt = self.client.prepare(topic=topic, server=host)
cctxt.cast(context, method, payload=payload)
def _fanout_message(self, context, method, payload):
"""Fanout the payload to all dhcp agents."""
cctxt = self.client.prepare(fanout=True)
cctxt.cast(context, method, payload=payload)
def network_removed_from_agent(self, context, network_id, host):
self._cast_message(context, 'network_delete_end',
{'network_id': network_id}, host)
def network_added_to_agent(self, context, network_id, host):
self._cast_message(context, 'network_create_end',
{'network': {'id': network_id}}, host)
def agent_updated(self, context, admin_state_up, host):
self._cast_message(context, 'agent_updated',
{'admin_state_up': admin_state_up}, host)
def _after_router_interface_created(self, resource, event, trigger,
**kwargs):
self._notify_agents(kwargs['context'], 'port_create_end',
{'port': kwargs['port']},
kwargs['port']['network_id'])
def _after_router_interface_deleted(self, resource, event, trigger,
**kwargs):
self._notify_agents(kwargs['context'], 'port_delete_end',
{'port_id': kwargs['port']['id']},
kwargs['port']['network_id'])
def _send_dhcp_notification(self, resource, event, trigger, context=None,
data=None, method_name=None, collection=None,
**kwargs):
if cfg.CONF.dhcp_agent_notification:
if collection and collection in data:
for body in data[collection]:
item = {resource: body}
self.notify(context, item, method_name)
else:
self.notify(context, data, method_name)
def notify(self, context, data, method_name):
# data is {'key' : 'value'} with only one key
if method_name not in self.VALID_METHOD_NAMES:
return
obj_type = list(data.keys())[0]
if obj_type not in self.VALID_RESOURCES:
return
obj_value = data[obj_type]
network_id = None
if obj_type == 'network' and 'id' in obj_value:
network_id = obj_value['id']
elif obj_type in ['port', 'subnet'] and 'network_id' in obj_value:
network_id = obj_value['network_id']
if not network_id:
return
method_name = method_name.replace(".", "_")
if method_name.endswith("_delete_end"):
if 'id' in obj_value:
self._notify_agents(context, method_name,
{obj_type + '_id': obj_value['id']},
network_id)
else:
self._notify_agents(context, method_name, data, network_id)