Permalink
Browse files

Merge "Create unit tests for endpoint policy drivers"

  • Loading branch information...
Jenkins authored and openstack-gerrit committed Nov 10, 2016
2 parents dacd57f + c4784d7 commit 498d700c65c951a308eecbe5025d285920a70fe3
@@ -47,7 +47,7 @@ def create_policy_association(self, policy_id, endpoint_id=None,
@abc.abstractmethod
def check_policy_association(self, policy_id, endpoint_id=None,
service_id=None, region_id=None):
- """Check existence a policy association.
+ """Check existence of a policy association.
:param policy_id: identity of policy that is being associated
:type policy_id: string
@@ -98,7 +98,8 @@ def get_policy_association(self, endpoint_id=None,
:type region_id: string
:raises keystone.exception.PolicyAssociationNotFound: If there is no
match for the specified association.
- :returns: dict containing policy_id
+ :returns: dict containing policy_id (value is a tuple containing only
+ the policy_id)
"""
raise exception.NotImplemented() # pragma: no cover
@@ -0,0 +1,152 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from keystone import exception
+
+
+class DriverTestCase(object):
+ """Test cases to validate the endpoint policy driver behavior."""
+
+ @property
+ def driver(self):
+ raise exception.NotImplemented()
+
+ def create_association(self, **kwargs):
+ association = {'policy_id': uuid.uuid4().hex,
+ 'endpoint_id': None,
+ 'service_id': None,
+ 'region_id': None}
+ association.update(kwargs)
+ self.driver.create_policy_association(**association)
+ return association
+
+ def test_create_policy_association(self):
+ association = self.create_association(endpoint_id=uuid.uuid4().hex)
+ self.driver.check_policy_association(**association)
+
+ association = self.create_association(service_id=uuid.uuid4().hex,
+ region_id=uuid.uuid4().hex)
+ self.driver.check_policy_association(**association)
+
+ association = self.create_association(service_id=uuid.uuid4().hex)
+ self.driver.check_policy_association(**association)
+
+ def test_recreate_policy_association(self):
+ # Creating a policy association to a target that already has a policy
+ # associated to it, will cause the original policy to be overridden
+ original_association = self.create_association(
+ service_id=uuid.uuid4().hex)
+ override_association = original_association.copy()
+ override_association.update({'policy_id': uuid.uuid4().hex})
+
+ self.driver.create_policy_association(**override_association)
+
+ self.driver.check_policy_association(**override_association)
+ self.assertRaises(exception.PolicyAssociationNotFound,
+ self.driver.check_policy_association,
+ **original_association)
+
+ def test_check_policy_association(self):
+ association = self.create_association(service_id=uuid.uuid4().hex,
+ region_id=uuid.uuid4().hex)
+ self.driver.check_policy_association(**association)
+
+ # An association is uniquely identified by its target. Omitting any
+ # attribute (region_id in this case) will result in a different check
+ association.pop('region_id')
+
+ self.assertRaises(exception.PolicyAssociationNotFound,
+ self.driver.check_policy_association,
+ **association)
+
+ def test_delete_policy_association(self):
+ association = self.create_association(endpoint_id=uuid.uuid4().hex)
+ self.driver.delete_policy_association(**association)
+
+ self.assertRaises(exception.PolicyAssociationNotFound,
+ self.driver.check_policy_association,
+ **association)
+
+ def test_get_policy_association(self):
+ association = self.create_association(service_id=uuid.uuid4().hex)
+
+ # Extract the policy_id from the association and query it by the target
+ policy_id = association.pop('policy_id')
+
+ association_ref = self.driver.get_policy_association(**association)
+ self.assertEqual({'policy_id': (policy_id,)}, association_ref)
+
+ def test_list_associations_for_policy(self):
+ policy_id = uuid.uuid4().hex
+ first = self.create_association(endpoint_id=uuid.uuid4().hex,
+ policy_id=policy_id)
+ second = self.create_association(service_id=uuid.uuid4().hex,
+ policy_id=policy_id)
+
+ associations_ref = self.driver.list_associations_for_policy(policy_id)
+
+ self.assertIn(first, associations_ref)
+ self.assertIn(second, associations_ref)
+
+ def test_delete_association_by_endpoint(self):
+ endpoint_id = uuid.uuid4().hex
+ associations = [self.create_association(endpoint_id=endpoint_id),
+ self.create_association(endpoint_id=endpoint_id)]
+
+ self.driver.delete_association_by_endpoint(endpoint_id)
+
+ for association in associations:
+ self.assertRaises(exception.PolicyAssociationNotFound,
+ self.driver.check_policy_association,
+ **association)
+
+ def test_delete_association_by_service(self):
+ service_id = uuid.uuid4().hex
+ associations = [self.create_association(service_id=service_id),
+ self.create_association(service_id=service_id)]
+
+ self.driver.delete_association_by_service(service_id)
+
+ for association in associations:
+ self.assertRaises(exception.PolicyAssociationNotFound,
+ self.driver.check_policy_association,
+ **association)
+
+ def test_delete_association_by_region(self):
+ region_id = uuid.uuid4().hex
+ first = self.create_association(service_id=uuid.uuid4().hex,
+ region_id=region_id)
+ second = self.create_association(service_id=uuid.uuid4().hex,
+ region_id=region_id)
+
+ self.driver.delete_association_by_region(region_id)
+
+ for association in [first, second]:
+ self.assertRaises(exception.PolicyAssociationNotFound,
+ self.driver.check_policy_association,
+ **association)
+
+ def test_delete_association_by_policy(self):
+ policy_id = uuid.uuid4().hex
+ first = self.create_association(endpoint_id=uuid.uuid4().hex,
+ policy_id=policy_id)
+ second = self.create_association(service_id=uuid.uuid4().hex,
+ policy_id=policy_id)
+
+ self.driver.delete_association_by_policy(policy_id)
+
+ for association in [first, second]:
+ self.assertRaises(exception.PolicyAssociationNotFound,
+ self.driver.check_policy_association,
+ **association)
@@ -0,0 +1,43 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from keystone.common import sql
+from keystone.endpoint_policy.backends import sql as sql_driver
+from keystone.tests import unit
+from keystone.tests.unit.backend import core_sql
+from keystone.tests.unit.endpoint_policy.backends import test_base
+from keystone.tests.unit.ksfixtures import database
+
+
+class SQLModelTestCase(core_sql.BaseBackendSqlModels):
+ """Test cases to validate the table structure."""
+
+ def test_policy_association_model(self):
+ cols = (('id', sql.String, 64),
+ ('policy_id', sql.String, 64),
+ ('endpoint_id', sql.String, 64),
+ ('service_id', sql.String, 64),
+ ('region_id', sql.String, 64))
+
+ self.assertExpectedSchema('policy_association', cols)
+
+
+class SQLDriverTestCase(test_base.DriverTestCase, unit.TestCase):
+
+ def setUp(self):
+ super(SQLDriverTestCase, self).setUp()
+ self.useFixture(database.Database())
+ self._driver = sql_driver.EndpointPolicy()
+
+ @property
+ def driver(self):
+ return self._driver

0 comments on commit 498d700

Please sign in to comment.