Skip to content

Commit

Permalink
Merge "Don't drop 'protocol' from client supplied security_group_rule…
Browse files Browse the repository at this point in the history
… dict"
  • Loading branch information
Jenkins authored and openstack-gerrit committed Apr 5, 2016
2 parents 4996c66 + 5a41caa commit 90d9af6
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
13 changes: 3 additions & 10 deletions neutron/db/securitygroups_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,23 +584,16 @@ def _check_for_duplicate_rules_in_db(self, context, security_group_rule):
# is changed which cannot be because other methods are already
# relying on this behavior. Therefore, we do the filtering
# below to check for these corner cases.
rule_dict = security_group_rule['security_group_rule'].copy()
sg_protocol = rule_dict.pop('protocol', None)
for db_rule in db_rules:
rule_id = db_rule.pop('id', None)
# remove protocol and match separately for number and type
db_protocol = db_rule.pop('protocol', None)
sg_protocol = (
security_group_rule['security_group_rule'].pop('protocol',
None))
is_protocol_matching = (
self._get_ip_proto_name_and_num(db_protocol) ==
self._get_ip_proto_name_and_num(sg_protocol))
are_rules_matching = (
security_group_rule['security_group_rule'] == db_rule)
# reinstate protocol field for further processing
if sg_protocol:
security_group_rule['security_group_rule']['protocol'] = (
sg_protocol)
if (is_protocol_matching and are_rules_matching):
if (is_protocol_matching and rule_dict == db_rule):
raise ext_sg.SecurityGroupRuleExists(rule_id=rule_id)

def _validate_ip_prefix(self, rule):
Expand Down
13 changes: 13 additions & 0 deletions neutron/tests/unit/db/test_securitygroups_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@ def test_create_security_group_rule_conflict(self):
self.mixin.create_security_group_rule(
self.ctx, mock.MagicMock())

def test__check_for_duplicate_rules_in_db_does_not_drop_protocol(self):
with mock.patch.object(self.mixin, 'get_security_group_rules',
return_value=[mock.Mock()]):
context = mock.Mock()
rule_dict = {
'security_group_rule': {'protocol': None,
'tenant_id': 'fake',
'security_group_id': 'fake',
'direction': 'fake'}
}
self.mixin._check_for_duplicate_rules_in_db(context, rule_dict)
self.assertIn('protocol', rule_dict['security_group_rule'])

def test_delete_security_group_rule_in_use(self):
with mock.patch.object(registry, "notify") as mock_notify:
mock_notify.side_effect = exceptions.CallbackFailure(Exception())
Expand Down

0 comments on commit 90d9af6

Please sign in to comment.