-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
driver.py
407 lines (350 loc) · 16.2 KB
/
driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
import random
from neutron_lib.api.definitions import portbindings
from neutron_lib.callbacks import resources
from neutron_lib import exceptions as n_exceptions
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
from neutron_lib.services.logapi import constants as log_const
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from ovsdbapp.backend.ovs_idl import idlutils
from neutron._i18n import _
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.conf.services import logging as log_cfg
from neutron.services.logapi.common import db_api
from neutron.services.logapi.common import sg_callback
from neutron.services.logapi.drivers import base
from neutron.services.logapi.drivers import manager
LOG = logging.getLogger(__name__)
DRIVER = None
log_cfg.register_log_driver_opts()
MAX_INT_LABEL = 2**32
SUPPORTED_LOGGING_TYPES = [log_const.SECURITY_GROUP]
class LoggingNotSupported(n_exceptions.NeutronException):
message = _("The current OVN version does not offer support "
"for neutron network log functionality.")
class OVNDriver(base.DriverBase):
def __init__(self):
super().__init__(
name="ovn",
vif_types=[portbindings.VIF_TYPE_OVS,
portbindings.VIF_TYPE_VHOST_USER],
vnic_types=[portbindings.VNIC_NORMAL],
supported_logging_types=SUPPORTED_LOGGING_TYPES,
requires_rpc=False)
self._log_plugin_property = None
self.meter_name = (
cfg.CONF.network_log.local_output_log_base or "acl_log_meter")
@staticmethod
def network_logging_supported(ovn_nb):
columns = list(ovn_nb._tables["Meter"].columns)
return ("fair" in columns)
@classmethod
def create(cls, plugin_driver):
cls.plugin_driver = plugin_driver
return OVNDriver()
@property
def _log_plugin(self):
if self._log_plugin_property is None:
self._log_plugin_property = directory.get_plugin(
plugin_constants.LOG_API)
return self._log_plugin_property
@staticmethod
def _log_dict_to_obj(log_dict):
cls = namedtuple('Log_obj', log_dict)
cls.__new__.__defaults__ = tuple(log_dict.values())
return cls()
def _get_logs(self, context):
log_objs = self._log_plugin.get_logs(context)
return [self._log_dict_to_obj(lo) for lo in log_objs]
@property
def _ovn_client(self):
return self.plugin_driver._ovn_client
@property
def ovn_nb(self):
return self.plugin_driver.nb_ovn
@staticmethod
def _acl_actions_enabled(log_obj):
if not log_obj.enabled:
return set()
if log_obj.event == log_const.ACCEPT_EVENT:
return {ovn_const.ACL_ACTION_ALLOW_RELATED,
ovn_const.ACL_ACTION_ALLOW_STATELESS,
ovn_const.ACL_ACTION_ALLOW}
if log_obj.event == log_const.DROP_EVENT:
return {ovn_const.ACL_ACTION_DROP,
ovn_const.ACL_ACTION_REJECT}
# Fall through case: log_const.ALL_EVENT
return {ovn_const.ACL_ACTION_DROP,
ovn_const.ACL_ACTION_REJECT,
ovn_const.ACL_ACTION_ALLOW_RELATED,
ovn_const.ACL_ACTION_ALLOW_STATELESS,
ovn_const.ACL_ACTION_ALLOW}
def _remove_acls_log(self, pgs, ovn_txn, log_name=None):
acl_absents, acl_changes, acl_visits = 0, 0, 0
for pg in pgs:
for acl_uuid in pg["acls"]:
acl_visits += 1
acl = self.ovn_nb.lookup("ACL", acl_uuid, default=None)
# Log message if ACL is not found, as deleted concurrently
if acl is None:
LOG.debug("ACL %s not found, deleted concurrently",
acl_uuid)
acl_absents += 1
continue
# skip acls used by a different network log
if log_name:
if acl.name and acl.name[0] != log_name:
continue
columns = {
'log': False,
'meter': [],
'name': [],
'severity': []
}
# TODO(egarciar): There wont be a need to check if label exists
# once minimum version for OVN is >= 22.03
if hasattr(acl, 'label'):
columns['label'] = 0
ovn_txn.add(self.ovn_nb.db_remove(
"ACL", acl_uuid, 'options', 'log-related',
if_exists=True))
ovn_txn.add(self.ovn_nb.db_set(
"ACL", acl_uuid, *columns.items()))
acl_changes += 1
msg = "Cleared %d, Not found %d (out of %d visited) ACLs"
if log_name:
msg += " for network log {}".format(log_name)
LOG.info(msg, acl_changes, acl_absents, acl_visits)
def _set_acls_log(self, pgs, ovn_txn, actions_enabled, log_name):
acl_changes, acl_visits = 0, 0
for pg in pgs:
for acl_uuid in pg["acls"]:
acl_visits += 1
acl = self.ovn_nb.lookup("ACL", acl_uuid)
# skip acls used by a different network log
if acl.name and acl.name[0] != log_name:
continue
columns = {
'log': acl.action in actions_enabled,
'meter': self.meter_name,
'name': log_name,
'severity': "info"
}
# TODO(egarciar): There wont be a need to check if label exists
# once minimum version for OVN is >= 22.03
if hasattr(acl, "label"):
# Label needs to be an unsigned 32 bit number and not 0.
columns["label"] = random.randrange(1, MAX_INT_LABEL)
columns["options"] = {'log-related': "true"}
ovn_txn.add(self.ovn_nb.db_set(
"ACL", acl_uuid, *columns.items()))
acl_changes += 1
LOG.info("Set %d (out of %d visited) ACLs for network log %s",
acl_changes, acl_visits, log_name)
def _update_log_objs(self, context, ovn_txn, log_objs):
for log_obj in log_objs:
pgs = self._pgs_from_log_obj(context, log_obj)
actions_enabled = self._acl_actions_enabled(log_obj)
self._set_acls_log(pgs, ovn_txn, actions_enabled,
utils.ovn_name(log_obj.id))
def _pgs_all(self):
return self.ovn_nb.db_list(
"Port_Group", columns=["name", "acls"]).execute(check_error=True)
def _pgs_from_log_obj(self, context, log_obj):
"""Map Neutron log_obj into affected port groups in OVN.
:param context: current running context information
:param log_obj: a log_object to be analyzed.
"""
if not log_obj.resource_id and not log_obj.target_id:
# No sg, no port, ALL: return all pgs
if log_obj.event == log_const.ALL_EVENT:
return self._pgs_all()
try:
pg_drop = self.ovn_nb.lookup("Port_Group",
ovn_const.OVN_DROP_PORT_GROUP_NAME)
# No sg, no port, DROP: return DROP pg
if log_obj.event == log_const.DROP_EVENT:
return [{"name": pg_drop.name,
"acls": [r.uuid for r in pg_drop.acls]}]
# No sg, no port, ACCEPT: return all except DROP pg
pgs = self._pgs_all()
pgs.remove({"name": pg_drop.name,
"acls": [r.uuid for r in pg_drop.acls]})
return pgs
except idlutils.RowNotFound:
pass
pgs = []
# include special pg_drop to log DROP and ALL actions
if not log_obj.event or log_obj.event in (log_const.DROP_EVENT,
log_const.ALL_EVENT):
try:
pg = self.ovn_nb.lookup("Port_Group",
ovn_const.OVN_DROP_PORT_GROUP_NAME)
pgs.append({"name": pg.name,
"acls": [r.uuid for r in pg.acls]})
except idlutils.RowNotFound:
pass
if log_obj.event == log_const.DROP_EVENT:
return pgs
if log_obj.resource_id:
try:
pg = self.ovn_nb.lookup("Port_Group",
utils.ovn_port_group_name(
log_obj.resource_id))
pgs.append({"name": pg.name,
"acls": [r.uuid for r in pg.acls]})
except idlutils.RowNotFound:
pass
# Note: when sg is provided, it is redundant to get sgs from port,
# because model will ensure that sg is associated with neutron port
elif log_obj.target_id:
sg_ids = db_api._get_sgs_attached_to_port(context,
log_obj.target_id)
for sg_id in sg_ids:
try:
pg = self.ovn_nb.lookup("Port_Group",
utils.ovn_port_group_name(sg_id))
pgs.append({"name": pg.name,
"acls": [r.uuid for r in pg.acls]})
except idlutils.RowNotFound:
pass
return pgs
def create_log(self, context, log_obj):
"""Create a log_obj invocation.
:param context: current running context information
:param log_obj: a log objects being created
"""
LOG.debug("Create_log %s", log_obj)
pgs = self._pgs_from_log_obj(context, log_obj)
actions_enabled = self._acl_actions_enabled(log_obj)
with self.ovn_nb.transaction(check_error=True) as ovn_txn:
self._ovn_client.create_ovn_fair_meter(self.meter_name,
txn=ovn_txn)
self._set_acls_log(pgs, ovn_txn, actions_enabled,
utils.ovn_name(log_obj.id))
def create_log_precommit(self, context, log_obj):
"""Create a log_obj precommit.
:param context: current running context information
:param log_obj: a log object being created
"""
LOG.debug("Create_log_precommit %s", log_obj)
if not self.network_logging_supported(self.ovn_nb):
raise LoggingNotSupported()
def _unset_disabled_acls(self, context, log_obj, ovn_txn):
"""Check if we need to disable any ACLs after an update.
Will return True if there were more logs, and False if there was
nothing to check.
:param context: current running context information
:param log_obj: a log_object which was updated
:returns: True if there were other logs enabled, otherwise False.
"""
if log_obj.enabled:
return False
pgs = self._pgs_from_log_obj(context, log_obj)
other_logs = [log for log in self._get_logs(context)
if log.id != log_obj.id and log.enabled]
if not other_logs:
return False
if log_obj.event == log_const.ALL_EVENT:
acls_to_check = pgs[0]["acls"].copy()
if not acls_to_check:
return True
for log in other_logs:
for acl in self._pgs_from_log_obj(context, log)[0]["acls"]:
if acl in acls_to_check:
acls_to_check.remove(acl)
if not acls_to_check:
return True
acls_to_remove = [{"name": pgs[0]["name"], "acls": acls_to_check}]
self._remove_acls_log(acls_to_remove, ovn_txn)
else:
all_events = set([log.event for log in other_logs
if (not log.resource_id or
log.resource_id == log_obj.resource_id)])
if (log_const.ALL_EVENT not in all_events and
log_obj.event not in all_events):
self._remove_acls_log(pgs, ovn_txn)
return True
def update_log(self, context, log_obj):
"""Update a log_obj invocation.
:param context: current running context information
:param log_obj: a log object being updated
"""
LOG.debug("Update_log %s", log_obj)
with self.ovn_nb.transaction(check_error=True) as ovn_txn:
if not self._unset_disabled_acls(context, log_obj, ovn_txn):
pgs = self._pgs_from_log_obj(context, log_obj)
actions_enabled = self._acl_actions_enabled(log_obj)
self._set_acls_log(pgs, ovn_txn, actions_enabled,
utils.ovn_name(log_obj.id))
def delete_log(self, context, log_obj):
"""Delete a log_obj invocation.
:param context: current running context information
:param log_obj: a log_object being deleted
"""
LOG.debug("Delete_log %s", log_obj)
# If we are removing the last log_obj, let's clear log from all acls.
# This is a simple way of ensuring that no acl logs are left behind!
log_objs = self._get_logs(context)
if not log_objs or (
len(log_objs) == 1 and log_objs[0].id == log_obj.id):
pgs = self._pgs_all()
with self.ovn_nb.transaction(check_error=True) as ovn_txn:
self._remove_acls_log(pgs, ovn_txn)
ovn_txn.add(self.ovn_nb.meter_del(self.meter_name,
if_exists=True))
LOG.info("All ACL logs cleared after deletion of log_obj %s",
log_obj.id)
return
# Remove log_obj and revisit all remaining ones, since the acls that
# were serving the removed log_obj may be usable by the remaining
# log_objs.
pgs = self._pgs_from_log_obj(context, log_obj)
with self.ovn_nb.transaction(check_error=True) as ovn_txn:
self._remove_acls_log(pgs, ovn_txn, utils.ovn_name(log_obj.id))
# TODO(flaviof): We needed to break this second part into a separate
# transaction because logic that determines the value of the 'freed up'
# acl rows will not see the modified rows unless it was inside an an
# idl command.
with self.ovn_nb.transaction(check_error=True) as ovn_txn:
self._update_log_objs(context, ovn_txn, [lo for lo in log_objs
if lo.id != log_obj.id])
def resource_update(self, context, log_objs):
"""Tell the agent when resources related to log_objects are
being updated
:param context: current running context information
:param log_objs: a list of log_objects, whose related resources are
being updated.
"""
LOG.debug("Resource_update %s", log_objs)
with self.ovn_nb.transaction(check_error=True) as ovn_txn:
self._update_log_objs(context, ovn_txn, log_objs)
def register(plugin_driver):
"""Register the driver."""
global DRIVER
if not DRIVER:
DRIVER = OVNDriver.create(plugin_driver)
# Trigger decorator
importutils.import_module(
"neutron.services.logapi.common.sg_validate"
)
# Register resource callback handler
manager.register(
resources.SECURITY_GROUP_RULE, sg_callback.SecurityGroupRuleCallBack)
LOG.info("OVN logging driver registered")
return DRIVER