Skip to content

Commit

Permalink
When checking if there is a duplicate rule ignore the id field
Browse files Browse the repository at this point in the history
If a plugin wants to specify the id resource for a security group rule
to the db_base_class the code currently does not ignore this id field
when checking if the rules is a duplicate. This patch updates the code
so that it ignores the id field when checking for duplicate rules

Change-Id: Id4906cbdebd820d3349a4a3211ebb34491516c68
  • Loading branch information
aaronorosen committed Apr 8, 2016
1 parent 1af2a3e commit 3c1a068
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
12 changes: 11 additions & 1 deletion neutron/db/securitygroups_db.py
Expand Up @@ -556,11 +556,20 @@ def _make_security_group_rule_filter_dict(self, security_group_rule):
res['protocol'] = self._get_ip_proto_name_and_num(value)
return res

def _rules_equal(self, rule1, rule2):
"""Determines if two rules are equal ignoring id field."""
rule1_copy = rule1.copy()
rule2_copy = rule2.copy()
rule1_copy.pop('id', None)
rule2_copy.pop('id', None)
return rule1_copy == rule2_copy

def _check_for_duplicate_rules(self, context, security_group_rules):
for i in security_group_rules:
found_self = False
for j in security_group_rules:
if i['security_group_rule'] == j['security_group_rule']:
if self._rules_equal(i['security_group_rule'],
j['security_group_rule']):
if found_self:
raise ext_sg.DuplicateSecurityGroupRuleInPost(rule=i)
found_self = True
Expand All @@ -585,6 +594,7 @@ def _check_for_duplicate_rules_in_db(self, context, security_group_rule):
# relying on this behavior. Therefore, we do the filtering
# below to check for these corner cases.
rule_dict = security_group_rule['security_group_rule'].copy()
rule_dict.pop('id', None)
sg_protocol = rule_dict.pop('protocol', None)
for db_rule in db_rules:
rule_id = db_rule.pop('id', None)
Expand Down
29 changes: 29 additions & 0 deletions neutron/tests/unit/db/test_securitygroups_db.py
Expand Up @@ -104,6 +104,35 @@ def test__check_for_duplicate_rules_in_db_does_not_drop_protocol(self):
self.mixin._check_for_duplicate_rules_in_db(context, rule_dict)
self.assertIn('protocol', rule_dict['security_group_rule'])

def test__check_for_duplicate_rules_ignores_rule_id(self):
rules = [{'security_group_rule': {'protocol': 'tcp', 'id': 'fake1'}},
{'security_group_rule': {'protocol': 'tcp', 'id': 'fake2'}}]

# NOTE(arosen): the name of this exception is a little misleading
# in this case as this test, tests that the id fields are dropped
# while being compared. This is in the case if a plugin specifies
# the rule ids themselves.
self.assertRaises(securitygroup.DuplicateSecurityGroupRuleInPost,
self.mixin._check_for_duplicate_rules,
context, rules)

def test__check_for_duplicate_rules_in_db_ignores_rule_id(self):
db_rules = {'protocol': 'tcp', 'id': 'fake', 'tenant_id': 'fake',
'direction': 'ingress', 'security_group_id': 'fake'}
with mock.patch.object(self.mixin, 'get_security_group_rules',
return_value=[db_rules]):
context = mock.Mock()
rule_dict = {
'security_group_rule': {'protocol': 'tcp',
'id': 'fake2',
'tenant_id': 'fake',
'security_group_id': 'fake',
'direction': 'ingress'}
}
self.assertRaises(securitygroup.SecurityGroupRuleExists,
self.mixin._check_for_duplicate_rules_in_db,
context, rule_dict)

def test_delete_security_group_rule_in_use(self):
with mock.patch.object(registry, "notify") as mock_notify:
mock_notify.side_effect = exceptions.CallbackFailure(Exception())
Expand Down

0 comments on commit 3c1a068

Please sign in to comment.