From a02ef859a6f7a32d14f2809edc77cb1cb9e957f6 Mon Sep 17 00:00:00 2001 From: Alex Meade Date: Sat, 2 Nov 2013 21:43:25 +0000 Subject: [PATCH] Move is_image_sharable to registry api When using the membership kwarg in is_image_sharable with the sqlalchemy driver, an AttributeError may be raised since the 'member' variable is not always defined. This function was copied throughout the db drivers and untested. This patch adds tests and moves the function to the v1 registry api, where it was the only place being used. Tests for this function in test_context were removed as it does not make sense to test this function there and allowed for the removal of the membership kwarg. Fixes bug 1247481 Change-Id: Ia77bc0a144831ddcf7e180afdb0842754ff3fcf2 --- glance/db/simple/api.py | 34 ------ glance/db/sqlalchemy/api.py | 34 ------ glance/registry/api/v1/members.py | 28 ++++- glance/tests/unit/test_context.py | 81 ------------ glance/tests/unit/v1/test_registry_api.py | 142 +++++++++++++++++++++- 5 files changed, 164 insertions(+), 155 deletions(-) diff --git a/glance/db/simple/api.py b/glance/db/simple/api.py index 6b5d137093..d80c8bd825 100644 --- a/glance/db/simple/api.py +++ b/glance/db/simple/api.py @@ -649,40 +649,6 @@ def is_image_mutable(context, image): return image['owner'] == context.owner -def is_image_sharable(context, image, **kwargs): - """Return True if the image can be shared to others in this context.""" - # Is admin == image sharable - if context.is_admin: - return True - - # Only allow sharing if we have an owner - if context.owner is None: - return False - - # If we own the image, we can share it - if context.owner == image['owner']: - return True - - # Let's get the membership association - if 'membership' in kwargs: - member = kwargs['membership'] - if member is None: - # Not shared with us anyway - return False - else: - members = image_member_find(context, - image_id=image['id'], - member=context.owner) - if members: - member = members[0] - else: - # Not shared with us anyway - return False - - # It's the can_share attribute we're now interested in - return member['can_share'] - - def is_image_visible(context, image, status=None): """Return True if the image is visible in this context.""" # Is admin == image visible diff --git a/glance/db/sqlalchemy/api.py b/glance/db/sqlalchemy/api.py index 738df8c7f5..f486ae1193 100644 --- a/glance/db/sqlalchemy/api.py +++ b/glance/db/sqlalchemy/api.py @@ -353,40 +353,6 @@ def is_image_mutable(context, image): return image['owner'] == context.owner -def is_image_sharable(context, image, **kwargs): - """Return True if the image can be shared to others in this context.""" - # Is admin == image sharable - if context.is_admin: - return True - - # Only allow sharing if we have an owner - if context.owner is None: - return False - - # If we own the image, we can share it - if context.owner == image['owner']: - return True - - # Let's get the membership association - if 'membership' in kwargs: - membership = kwargs['membership'] - if membership is None: - # Not shared with us anyway - return False - else: - members = image_member_find(context, - image_id=image['id'], - member=context.owner) - if members: - member = members[0] - else: - # Not shared with us anyway - return False - - # It's the can_share attribute we're now interested in - return member['can_share'] - - def is_image_visible(context, image, status=None): """Return True if the image is visible in this context.""" # Is admin == image visible diff --git a/glance/registry/api/v1/members.py b/glance/registry/api/v1/members.py index 5b16e28412..2f95fc3693 100644 --- a/glance/registry/api/v1/members.py +++ b/glance/registry/api/v1/members.py @@ -37,6 +37,28 @@ def __init__(self): self.db_api = glance.db.get_api() self.db_api.setup_db_env() + def is_image_sharable(self, context, image): + """Return True if the image can be shared to others in this context.""" + # Is admin == image sharable + if context.is_admin: + return True + + # Only allow sharing if we have an owner + if context.owner is None: + return False + + # If we own the image, we can share it + if context.owner == image['owner']: + return True + + members = self.db_api.image_member_find(context, + image_id=image['id'], + member=context.owner) + if members: + return members[0]['can_share'] + + return False + def index(self, req, image_id): """ Get the members of an image. @@ -89,7 +111,7 @@ def update_all(self, req, image_id, body): raise webob.exc.HTTPNotFound() # Can they manipulate the membership? - if not self.db_api.is_image_sharable(req.context, image): + if not self.is_image_sharable(req.context, image): msg = _("User lacks permission to share image %(id)s") LOG.info(msg % {'id': image_id}) msg = _("No permission to share that image") @@ -202,7 +224,7 @@ def update(self, req, image_id, id, body=None): raise webob.exc.HTTPNotFound() # Can they manipulate the membership? - if not self.db_api.is_image_sharable(req.context, image): + if not self.is_image_sharable(req.context, image): msg = _("User lacks permission to share image %(id)s") LOG.info(msg % {'id': image_id}) msg = _("No permission to share that image") @@ -262,7 +284,7 @@ def delete(self, req, image_id, id): raise webob.exc.HTTPNotFound() # Can they manipulate the membership? - if not self.db_api.is_image_sharable(req.context, image): + if not self.is_image_sharable(req.context, image): msg = _("User lacks permission to share image %(id)s") LOG.info(msg % {'id': image_id}) msg = _("No permission to share that image") diff --git a/glance/tests/unit/test_context.py b/glance/tests/unit/test_context.py index f577999c6d..79150e47ec 100644 --- a/glance/tests/unit/test_context.py +++ b/glance/tests/unit/test_context.py @@ -52,27 +52,6 @@ def do_visible(self, exp_res, img_owner, img_public, **kwargs): self.assertEqual(self.db_api.is_image_visible(ctx, img), exp_res) - def do_sharable(self, exp_res, img_owner, membership=None, **kwargs): - """ - Perform a context sharability test. Creates a (fake) image - with the specified owner and is_public attributes, then - creates a context with the given keyword arguments and expects - exp_res as the result of an is_image_sharable() call on the - context. If membership is not None, its value will be passed - in as the 'membership' keyword argument of - is_image_sharable(). - """ - - img = _fake_image(img_owner, True) - ctx = context.RequestContext(**kwargs) - - sharable_args = {} - if membership is not None: - sharable_args['membership'] = membership - - output = self.db_api.is_image_sharable(ctx, img, **sharable_args) - self.assertEqual(exp_res, output) - def test_empty_public(self): """ Tests that an empty context (with is_admin set to True) can @@ -101,15 +80,6 @@ def test_empty_private_owned(self): """ self.do_visible(True, 'pattieblack', False, is_admin=True) - def test_empty_shared(self): - """ - Tests that an empty context (with is_admin set to False) can - not share an image, with or without membership. - """ - self.do_sharable(False, 'pattieblack', None, is_admin=False) - self.do_sharable(False, 'pattieblack', _fake_membership(True), - is_admin=False) - def test_anon_public(self): """ Tests that an anonymous context (with is_admin set to False) @@ -138,14 +108,6 @@ def test_anon_private_owned(self): """ self.do_visible(False, 'pattieblack', False) - def test_anon_shared(self): - """ - Tests that an empty context (with is_admin set to True) can - not share an image, with or without membership. - """ - self.do_sharable(False, 'pattieblack', None) - self.do_sharable(False, 'pattieblack', _fake_membership(True)) - def test_auth_public(self): """ Tests that an authenticated context (with is_admin set to @@ -192,49 +154,6 @@ def test_auth_private_owned(self): """ self.do_visible(True, 'pattieblack', False, tenant='pattieblack') - def test_auth_sharable(self): - """ - Tests that an authenticated context (with is_admin set to - False) cannot share an image it neither owns nor is shared - with it. - """ - self.do_sharable(False, 'pattieblack', None, tenant='froggy') - - def test_auth_sharable_admin(self): - """ - Tests that an authenticated context (with is_admin set to - True) can share an image it neither owns nor is shared with - it. - """ - self.do_sharable(True, 'pattieblack', None, tenant='froggy', - is_admin=True) - - def test_auth_sharable_owned(self): - """ - Tests that an authenticated context (with is_admin set to - False) can share an image it owns, even if it is not shared - with it. - """ - self.do_sharable(True, 'pattieblack', None, tenant='pattieblack') - - def test_auth_sharable_cannot_share(self): - """ - Tests that an authenticated context (with is_admin set to - False) cannot share an image it does not own even if it is - shared with it, but with can_share = False. - """ - self.do_sharable(False, 'pattieblack', _fake_membership(False), - tenant='froggy') - - def test_auth_sharable_can_share(self): - """ - Tests that an authenticated context (with is_admin set to - False) can share an image it does not own if it is shared with - it with can_share = True. - """ - self.do_sharable(True, 'pattieblack', _fake_membership(True), - tenant='froggy') - def test_request_id(self): contexts = [context.RequestContext().request_id for _ in range(5)] # Check for uniqueness -- set() will normalize its argument diff --git a/glance/tests/unit/v1/test_registry_api.py b/glance/tests/unit/v1/test_registry_api.py index fe4eff3094..6413c6d30b 100644 --- a/glance/tests/unit/v1/test_registry_api.py +++ b/glance/tests/unit/v1/test_registry_api.py @@ -28,8 +28,9 @@ import glance.api.common import glance.common.config from glance.common import crypt -import glance.context +from glance import context from glance.db.sqlalchemy import api as db_api +from glance.db.sqlalchemy import models as db_models from glance.openstack.common import timeutils from glance.openstack.common import uuidutils from glance.registry.api import v1 as rserver @@ -120,7 +121,7 @@ def _get_extra_fixture(id, name, **kwargs): _get_extra_fixture(UUID2, 'fake image #2', min_disk=5, min_ram=256, size=19, properties={})] - self.context = glance.context.RequestContext(is_admin=True) + self.context = context.RequestContext(is_admin=True) db_api.setup_db_env() db_api.get_engine() self.destroy_fixtures() @@ -1689,7 +1690,7 @@ def _get_extra_fixture(id, name, **kwargs): _get_extra_fixture(UUID2, 'fake image #2', min_disk=5, min_ram=256, size=19, properties={})] - self.context = glance.context.RequestContext(is_admin=True) + self.context = context.RequestContext(is_admin=True) db_api.setup_db_env() db_api.get_engine() self.destroy_fixtures() @@ -1783,3 +1784,138 @@ def test_create_from_location_data_with_encryption(self): encryption_key, image_entry['locations'][1]['url']) self.assertEqual(location_url1, decrypted_location_url1) self.assertEqual(location_url2, decrypted_location_url2) + + +class TestSharability(test_utils.BaseTestCase): + + def setUp(self): + super(TestSharability, self).setUp() + self.setup_db() + self.controller = glance.registry.api.v1.members.Controller() + + def setup_db(self): + db_api.setup_db_env() + db_api.get_engine() + db_models.unregister_models(db_api._ENGINE) + db_models.register_models(db_api._ENGINE) + + def test_is_image_sharable_as_admin(self): + TENANT1 = uuidutils.generate_uuid() + TENANT2 = uuidutils.generate_uuid() + ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + ctxt2 = context.RequestContext(is_admin=True, user=TENANT2, + auth_tok='user:%s:admin' % TENANT2, + owner_is_tenant=False) + UUIDX = uuidutils.generate_uuid() + #we need private image and context.owner should not match image owner + image = db_api.image_create(ctxt1, {'id': UUIDX, + 'status': 'queued', + 'is_public': False, + 'owner': TENANT1}) + + result = self.controller.is_image_sharable(ctxt2, image) + self.assertTrue(result) + + def test_is_image_sharable_owner_can_share(self): + TENANT1 = uuidutils.generate_uuid() + ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + UUIDX = uuidutils.generate_uuid() + #we need private image and context.owner should not match image owner + image = db_api.image_create(ctxt1, {'id': UUIDX, + 'status': 'queued', + 'is_public': False, + 'owner': TENANT1}) + + result = self.controller.is_image_sharable(ctxt1, image) + self.assertTrue(result) + + def test_is_image_sharable_non_owner_cannot_share(self): + TENANT1 = uuidutils.generate_uuid() + TENANT2 = uuidutils.generate_uuid() + ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + ctxt2 = context.RequestContext(is_admin=False, user=TENANT2, + auth_tok='user:%s:user' % TENANT2, + owner_is_tenant=False) + UUIDX = uuidutils.generate_uuid() + #we need private image and context.owner should not match image owner + image = db_api.image_create(ctxt1, {'id': UUIDX, + 'status': 'queued', + 'is_public': False, + 'owner': TENANT1}) + + result = self.controller.is_image_sharable(ctxt2, image) + self.assertFalse(result) + + def test_is_image_sharable_non_owner_can_share_as_image_member(self): + TENANT1 = uuidutils.generate_uuid() + TENANT2 = uuidutils.generate_uuid() + ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + ctxt2 = context.RequestContext(is_admin=False, user=TENANT2, + auth_tok='user:%s:user' % TENANT2, + owner_is_tenant=False) + UUIDX = uuidutils.generate_uuid() + #we need private image and context.owner should not match image owner + image = db_api.image_create(ctxt1, {'id': UUIDX, + 'status': 'queued', + 'is_public': False, + 'owner': TENANT1}) + + membership = {'can_share': True, + 'member': TENANT2, + 'image_id': UUIDX} + + db_api.image_member_create(ctxt1, membership) + + result = self.controller.is_image_sharable(ctxt2, image) + self.assertTrue(result) + + def test_is_image_sharable_non_owner_as_image_member_without_sharing(self): + TENANT1 = uuidutils.generate_uuid() + TENANT2 = uuidutils.generate_uuid() + ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + ctxt2 = context.RequestContext(is_admin=False, user=TENANT2, + auth_tok='user:%s:user' % TENANT2, + owner_is_tenant=False) + UUIDX = uuidutils.generate_uuid() + #we need private image and context.owner should not match image owner + image = db_api.image_create(ctxt1, {'id': UUIDX, + 'status': 'queued', + 'is_public': False, + 'owner': TENANT1}) + + membership = {'can_share': False, + 'member': TENANT2, + 'image_id': UUIDX} + + db_api.image_member_create(ctxt1, membership) + + result = self.controller.is_image_sharable(ctxt2, image) + self.assertFalse(result) + + def test_is_image_sharable_owner_is_none(self): + TENANT1 = uuidutils.generate_uuid() + ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + ctxt2 = context.RequestContext(is_admin=False, tenant=None, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + UUIDX = uuidutils.generate_uuid() + #we need private image and context.owner should not match image owner + image = db_api.image_create(ctxt1, {'id': UUIDX, + 'status': 'queued', + 'is_public': False, + 'owner': TENANT1}) + + result = self.controller.is_image_sharable(ctxt2, image) + self.assertFalse(result)