Permalink
Browse files

Use RBAC policy to determine if context is admin.

Fixes bug 1152716

If the context roles do not match the configured admin_role,
fall back to determining if admin via the "context_is_admin"
RBAC policy rule (for consistency with the approach used by
the other projects).

Note this requires that the "context_is_admin" rule *must*
be set in the policy.json if the out-of-the-box default rule
is used (as this default is so open, the net effect of omitting
the "context_is_admin" rule is for every request to acquire
admin status).

Change-Id: Ide2cf604b48f24bd759ce2d65091ff546cd9d22e
  • Loading branch information...
Eoghan Glynn
Eoghan Glynn committed May 1, 2013
1 parent 2781f41 commit cc938e25f3babd8aa1299ae75cc5fa2cf24a00a0
View
@@ -1,4 +1,5 @@
{
"context_is_admin": "role:admin",
"default": "",
"manage_image_cache": "role:admin"
}
@@ -20,6 +20,7 @@
from oslo.config import cfg
import webob.exc
from glance.api import policy
from glance.common import wsgi
import glance.context
import glance.openstack.common.log as logging
@@ -58,6 +59,10 @@ def process_response(self, resp):
class ContextMiddleware(BaseContextMiddleware):
def __init__(self, app):
self.policy_enforcer = policy.Enforcer()
super(ContextMiddleware, self).__init__(app)
def process_request(self, req):
"""Convert authentication information into a request context
@@ -84,6 +89,7 @@ def _get_anonymous_context(self):
'roles': [],
'is_admin': False,
'read_only': True,
'policy_enforcer': self.policy_enforcer,
}
return glance.context.RequestContext(**kwargs)
@@ -113,6 +119,7 @@ def _get_authenticated_context(self, req):
'auth_tok': req.headers.get('X-Auth-Token', deprecated_token),
'owner_is_tenant': CONF.owner_is_tenant,
'service_catalog': service_catalog,
'policy_enforcer': self.policy_enforcer,
}
return glance.context.RequestContext(**kwargs)
View
@@ -41,6 +41,7 @@
DEFAULT_RULES = {
'context_is_admin': policy.RoleCheck('role', 'admin'),
'default': policy.TrueCheck(),
'manage_image_cache': policy.RoleCheck('role', 'admin'),
}
@@ -143,6 +144,16 @@ def check(self, context, action, target):
"""
return self._check(context, action, target)
def check_is_admin(self, context):
"""Check if the given context is associated with an admin role,
as defined via the 'context_is_admin' RBAC rule.
:param context: Glance request context
:returns: A non-False value if context role is admin.
"""
target = context.to_dict()
return self.check(context, 'context_is_admin', target)
class ImageRepoProxy(glance.domain.proxy.Repo):
View
@@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from glance.api import policy
from glance.openstack.common import local
from glance.openstack.common import uuidutils
@@ -27,17 +28,22 @@ class RequestContext(object):
def __init__(self, auth_tok=None, user=None, tenant=None, roles=None,
is_admin=False, read_only=False, show_deleted=False,
owner_is_tenant=True, service_catalog=None):
owner_is_tenant=True, service_catalog=None,
policy_enforcer=None):
self.auth_tok = auth_tok
self.user = user
self.tenant = tenant
self.roles = roles or []
self.is_admin = is_admin
self.read_only = read_only
self._show_deleted = show_deleted
self.owner_is_tenant = owner_is_tenant
self.request_id = uuidutils.generate_uuid()
self.service_catalog = service_catalog
self.policy_enforcer = policy_enforcer or policy.Enforcer()
self.is_admin = is_admin
if not self.is_admin:
self.is_admin = \
self.policy_enforcer.check_is_admin(self)
if not hasattr(local.store, 'context'):
self.update_store()
@@ -1,3 +1,4 @@
{
"context_is_admin": "role:admin",
"default": ""
}
@@ -21,6 +21,7 @@
from glance.common import exception
from glance import context
import glance.registry.client.v1.api as registry
from glance.tests import utils
class TestCacheMiddlewareURLMatching(testtools.TestCase):
@@ -140,7 +141,7 @@ def delete_cached_image(self, image_id):
self.cache = DummyCache()
class TestCacheMiddlewareProcessRequest(testtools.TestCase):
class TestCacheMiddlewareProcessRequest(utils.BaseTestCase):
def setUp(self):
super(TestCacheMiddlewareProcessRequest, self).setUp()
self.stubs = stubout.StubOutForTesting()
@@ -204,7 +205,7 @@ def dummy_img_iterator():
self.assertEqual(True, actual)
class TestProcessResponse(testtools.TestCase):
class TestProcessResponse(utils.BaseTestCase):
def setUp(self):
super(TestProcessResponse, self).setUp()
self.stubs = stubout.StubOutForTesting()
@@ -75,6 +75,7 @@ def _db_image_member_fixture(image_id, member_id, **kwargs):
class TestImageRepo(test_utils.BaseTestCase):
def setUp(self):
super(TestImageRepo, self).setUp()
self.db = unit_test_utils.FakeDB()
self.db.reset()
self.context = glance.context.RequestContext(
@@ -83,7 +84,6 @@ def setUp(self):
self.image_factory = glance.domain.ImageFactory()
self._create_images()
self._create_image_members()
super(TestImageRepo, self).setUp()
def _create_images(self):
self.db.reset()
@@ -275,6 +275,7 @@ def test_decrypt_locations_on_list(self):
class TestImageMemberRepo(test_utils.BaseTestCase):
def setUp(self):
super(TestImageMemberRepo, self).setUp()
self.db = unit_test_utils.FakeDB()
self.db.reset()
self.context = glance.context.RequestContext(
@@ -286,7 +287,6 @@ def setUp(self):
image = self.image_repo.get(UUID1)
self.image_member_repo = glance.db.ImageMemberRepo(self.context,
self.db, image)
super(TestImageMemberRepo, self).setUp()
def _create_images(self):
self.images = [
@@ -464,6 +464,7 @@ class TestImageNotifications(utils.BaseTestCase):
"""Test Image Notifications work"""
def setUp(self):
super(TestImageNotifications, self).setUp()
self.image = ImageStub(
image_id=UUID1, name='image-1', status='active', size=1024,
created_at=DATETIME, updated_at=DATETIME, owner=TENANT1,
@@ -479,7 +480,6 @@ def setUp(self):
self.image_repo_stub, self.context, self.notifier)
self.image_proxy = glance.notifier.ImageProxy(
self.image, self.context, self.notifier)
super(TestImageNotifications, self).setUp()
def test_image_save_notification(self):
self.image_repo_proxy.save(self.image_proxy)
@@ -258,3 +258,46 @@ def test_image_get_data(self):
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
self.assertRaises(exception.Forbidden, image.get_data)
class TestContextPolicyEnforcer(base.IsolatedUnitTest):
def _do_test_policy_influence_context_admin(self,
policy_admin_role,
context_role,
context_is_admin,
admin_expected):
self.config(policy_file=os.path.join(self.test_dir, 'gobble.gobble'))
rules = {'context_is_admin': 'role:%s' % policy_admin_role}
self.set_policy_rules(rules)
enforcer = glance.api.policy.Enforcer()
context = glance.context.RequestContext(roles=[context_role],
is_admin=context_is_admin,
policy_enforcer=enforcer)
self.assertEquals(context.is_admin, admin_expected)
def test_context_admin_policy_admin(self):
self._do_test_policy_influence_context_admin('test_admin',
'test_admin',
True,
True)
def test_context_nonadmin_policy_admin(self):
self._do_test_policy_influence_context_admin('test_admin',
'test_admin',
False,
True)
def test_context_admin_policy_nonadmin(self):
self._do_test_policy_influence_context_admin('test_admin',
'demo',
True,
True)
def test_context_nonadmin_policy_nonadmin(self):
self._do_test_policy_influence_context_admin('test_admin',
'demo',
False,
False)

0 comments on commit cc938e2

Please sign in to comment.