diff --git a/glance/db/simple/api.py b/glance/db/simple/api.py index 6b5d137093..d80c8bd825 100644 --- a/glance/db/simple/api.py +++ b/glance/db/simple/api.py @@ -649,40 +649,6 @@ def is_image_mutable(context, image): return image['owner'] == context.owner -def is_image_sharable(context, image, **kwargs): - """Return True if the image can be shared to others in this context.""" - # Is admin == image sharable - if context.is_admin: - return True - - # Only allow sharing if we have an owner - if context.owner is None: - return False - - # If we own the image, we can share it - if context.owner == image['owner']: - return True - - # Let's get the membership association - if 'membership' in kwargs: - member = kwargs['membership'] - if member is None: - # Not shared with us anyway - return False - else: - members = image_member_find(context, - image_id=image['id'], - member=context.owner) - if members: - member = members[0] - else: - # Not shared with us anyway - return False - - # It's the can_share attribute we're now interested in - return member['can_share'] - - def is_image_visible(context, image, status=None): """Return True if the image is visible in this context.""" # Is admin == image visible diff --git a/glance/db/sqlalchemy/api.py b/glance/db/sqlalchemy/api.py index 738df8c7f5..f486ae1193 100644 --- a/glance/db/sqlalchemy/api.py +++ b/glance/db/sqlalchemy/api.py @@ -353,40 +353,6 @@ def is_image_mutable(context, image): return image['owner'] == context.owner -def is_image_sharable(context, image, **kwargs): - """Return True if the image can be shared to others in this context.""" - # Is admin == image sharable - if context.is_admin: - return True - - # Only allow sharing if we have an owner - if context.owner is None: - return False - - # If we own the image, we can share it - if context.owner == image['owner']: - return True - - # Let's get the membership association - if 'membership' in kwargs: - membership = kwargs['membership'] - if membership is None: - # Not shared with us anyway - return False - else: - members = image_member_find(context, - image_id=image['id'], - member=context.owner) - if members: - member = members[0] - else: - # Not shared with us anyway - return False - - # It's the can_share attribute we're now interested in - return member['can_share'] - - def is_image_visible(context, image, status=None): """Return True if the image is visible in this context.""" # Is admin == image visible diff --git a/glance/registry/api/v1/members.py b/glance/registry/api/v1/members.py index 5b16e28412..2f95fc3693 100644 --- a/glance/registry/api/v1/members.py +++ b/glance/registry/api/v1/members.py @@ -37,6 +37,28 @@ def __init__(self): self.db_api = glance.db.get_api() self.db_api.setup_db_env() + def is_image_sharable(self, context, image): + """Return True if the image can be shared to others in this context.""" + # Is admin == image sharable + if context.is_admin: + return True + + # Only allow sharing if we have an owner + if context.owner is None: + return False + + # If we own the image, we can share it + if context.owner == image['owner']: + return True + + members = self.db_api.image_member_find(context, + image_id=image['id'], + member=context.owner) + if members: + return members[0]['can_share'] + + return False + def index(self, req, image_id): """ Get the members of an image. @@ -89,7 +111,7 @@ def update_all(self, req, image_id, body): raise webob.exc.HTTPNotFound() # Can they manipulate the membership? - if not self.db_api.is_image_sharable(req.context, image): + if not self.is_image_sharable(req.context, image): msg = _("User lacks permission to share image %(id)s") LOG.info(msg % {'id': image_id}) msg = _("No permission to share that image") @@ -202,7 +224,7 @@ def update(self, req, image_id, id, body=None): raise webob.exc.HTTPNotFound() # Can they manipulate the membership? - if not self.db_api.is_image_sharable(req.context, image): + if not self.is_image_sharable(req.context, image): msg = _("User lacks permission to share image %(id)s") LOG.info(msg % {'id': image_id}) msg = _("No permission to share that image") @@ -262,7 +284,7 @@ def delete(self, req, image_id, id): raise webob.exc.HTTPNotFound() # Can they manipulate the membership? - if not self.db_api.is_image_sharable(req.context, image): + if not self.is_image_sharable(req.context, image): msg = _("User lacks permission to share image %(id)s") LOG.info(msg % {'id': image_id}) msg = _("No permission to share that image") diff --git a/glance/tests/unit/test_context.py b/glance/tests/unit/test_context.py index f577999c6d..79150e47ec 100644 --- a/glance/tests/unit/test_context.py +++ b/glance/tests/unit/test_context.py @@ -52,27 +52,6 @@ def do_visible(self, exp_res, img_owner, img_public, **kwargs): self.assertEqual(self.db_api.is_image_visible(ctx, img), exp_res) - def do_sharable(self, exp_res, img_owner, membership=None, **kwargs): - """ - Perform a context sharability test. Creates a (fake) image - with the specified owner and is_public attributes, then - creates a context with the given keyword arguments and expects - exp_res as the result of an is_image_sharable() call on the - context. If membership is not None, its value will be passed - in as the 'membership' keyword argument of - is_image_sharable(). - """ - - img = _fake_image(img_owner, True) - ctx = context.RequestContext(**kwargs) - - sharable_args = {} - if membership is not None: - sharable_args['membership'] = membership - - output = self.db_api.is_image_sharable(ctx, img, **sharable_args) - self.assertEqual(exp_res, output) - def test_empty_public(self): """ Tests that an empty context (with is_admin set to True) can @@ -101,15 +80,6 @@ def test_empty_private_owned(self): """ self.do_visible(True, 'pattieblack', False, is_admin=True) - def test_empty_shared(self): - """ - Tests that an empty context (with is_admin set to False) can - not share an image, with or without membership. - """ - self.do_sharable(False, 'pattieblack', None, is_admin=False) - self.do_sharable(False, 'pattieblack', _fake_membership(True), - is_admin=False) - def test_anon_public(self): """ Tests that an anonymous context (with is_admin set to False) @@ -138,14 +108,6 @@ def test_anon_private_owned(self): """ self.do_visible(False, 'pattieblack', False) - def test_anon_shared(self): - """ - Tests that an empty context (with is_admin set to True) can - not share an image, with or without membership. - """ - self.do_sharable(False, 'pattieblack', None) - self.do_sharable(False, 'pattieblack', _fake_membership(True)) - def test_auth_public(self): """ Tests that an authenticated context (with is_admin set to @@ -192,49 +154,6 @@ def test_auth_private_owned(self): """ self.do_visible(True, 'pattieblack', False, tenant='pattieblack') - def test_auth_sharable(self): - """ - Tests that an authenticated context (with is_admin set to - False) cannot share an image it neither owns nor is shared - with it. - """ - self.do_sharable(False, 'pattieblack', None, tenant='froggy') - - def test_auth_sharable_admin(self): - """ - Tests that an authenticated context (with is_admin set to - True) can share an image it neither owns nor is shared with - it. - """ - self.do_sharable(True, 'pattieblack', None, tenant='froggy', - is_admin=True) - - def test_auth_sharable_owned(self): - """ - Tests that an authenticated context (with is_admin set to - False) can share an image it owns, even if it is not shared - with it. - """ - self.do_sharable(True, 'pattieblack', None, tenant='pattieblack') - - def test_auth_sharable_cannot_share(self): - """ - Tests that an authenticated context (with is_admin set to - False) cannot share an image it does not own even if it is - shared with it, but with can_share = False. - """ - self.do_sharable(False, 'pattieblack', _fake_membership(False), - tenant='froggy') - - def test_auth_sharable_can_share(self): - """ - Tests that an authenticated context (with is_admin set to - False) can share an image it does not own if it is shared with - it with can_share = True. - """ - self.do_sharable(True, 'pattieblack', _fake_membership(True), - tenant='froggy') - def test_request_id(self): contexts = [context.RequestContext().request_id for _ in range(5)] # Check for uniqueness -- set() will normalize its argument diff --git a/glance/tests/unit/v1/test_registry_api.py b/glance/tests/unit/v1/test_registry_api.py index fe4eff3094..6413c6d30b 100644 --- a/glance/tests/unit/v1/test_registry_api.py +++ b/glance/tests/unit/v1/test_registry_api.py @@ -28,8 +28,9 @@ import glance.api.common import glance.common.config from glance.common import crypt -import glance.context +from glance import context from glance.db.sqlalchemy import api as db_api +from glance.db.sqlalchemy import models as db_models from glance.openstack.common import timeutils from glance.openstack.common import uuidutils from glance.registry.api import v1 as rserver @@ -120,7 +121,7 @@ def _get_extra_fixture(id, name, **kwargs): _get_extra_fixture(UUID2, 'fake image #2', min_disk=5, min_ram=256, size=19, properties={})] - self.context = glance.context.RequestContext(is_admin=True) + self.context = context.RequestContext(is_admin=True) db_api.setup_db_env() db_api.get_engine() self.destroy_fixtures() @@ -1689,7 +1690,7 @@ def _get_extra_fixture(id, name, **kwargs): _get_extra_fixture(UUID2, 'fake image #2', min_disk=5, min_ram=256, size=19, properties={})] - self.context = glance.context.RequestContext(is_admin=True) + self.context = context.RequestContext(is_admin=True) db_api.setup_db_env() db_api.get_engine() self.destroy_fixtures() @@ -1783,3 +1784,138 @@ def test_create_from_location_data_with_encryption(self): encryption_key, image_entry['locations'][1]['url']) self.assertEqual(location_url1, decrypted_location_url1) self.assertEqual(location_url2, decrypted_location_url2) + + +class TestSharability(test_utils.BaseTestCase): + + def setUp(self): + super(TestSharability, self).setUp() + self.setup_db() + self.controller = glance.registry.api.v1.members.Controller() + + def setup_db(self): + db_api.setup_db_env() + db_api.get_engine() + db_models.unregister_models(db_api._ENGINE) + db_models.register_models(db_api._ENGINE) + + def test_is_image_sharable_as_admin(self): + TENANT1 = uuidutils.generate_uuid() + TENANT2 = uuidutils.generate_uuid() + ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + ctxt2 = context.RequestContext(is_admin=True, user=TENANT2, + auth_tok='user:%s:admin' % TENANT2, + owner_is_tenant=False) + UUIDX = uuidutils.generate_uuid() + #we need private image and context.owner should not match image owner + image = db_api.image_create(ctxt1, {'id': UUIDX, + 'status': 'queued', + 'is_public': False, + 'owner': TENANT1}) + + result = self.controller.is_image_sharable(ctxt2, image) + self.assertTrue(result) + + def test_is_image_sharable_owner_can_share(self): + TENANT1 = uuidutils.generate_uuid() + ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + UUIDX = uuidutils.generate_uuid() + #we need private image and context.owner should not match image owner + image = db_api.image_create(ctxt1, {'id': UUIDX, + 'status': 'queued', + 'is_public': False, + 'owner': TENANT1}) + + result = self.controller.is_image_sharable(ctxt1, image) + self.assertTrue(result) + + def test_is_image_sharable_non_owner_cannot_share(self): + TENANT1 = uuidutils.generate_uuid() + TENANT2 = uuidutils.generate_uuid() + ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + ctxt2 = context.RequestContext(is_admin=False, user=TENANT2, + auth_tok='user:%s:user' % TENANT2, + owner_is_tenant=False) + UUIDX = uuidutils.generate_uuid() + #we need private image and context.owner should not match image owner + image = db_api.image_create(ctxt1, {'id': UUIDX, + 'status': 'queued', + 'is_public': False, + 'owner': TENANT1}) + + result = self.controller.is_image_sharable(ctxt2, image) + self.assertFalse(result) + + def test_is_image_sharable_non_owner_can_share_as_image_member(self): + TENANT1 = uuidutils.generate_uuid() + TENANT2 = uuidutils.generate_uuid() + ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + ctxt2 = context.RequestContext(is_admin=False, user=TENANT2, + auth_tok='user:%s:user' % TENANT2, + owner_is_tenant=False) + UUIDX = uuidutils.generate_uuid() + #we need private image and context.owner should not match image owner + image = db_api.image_create(ctxt1, {'id': UUIDX, + 'status': 'queued', + 'is_public': False, + 'owner': TENANT1}) + + membership = {'can_share': True, + 'member': TENANT2, + 'image_id': UUIDX} + + db_api.image_member_create(ctxt1, membership) + + result = self.controller.is_image_sharable(ctxt2, image) + self.assertTrue(result) + + def test_is_image_sharable_non_owner_as_image_member_without_sharing(self): + TENANT1 = uuidutils.generate_uuid() + TENANT2 = uuidutils.generate_uuid() + ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + ctxt2 = context.RequestContext(is_admin=False, user=TENANT2, + auth_tok='user:%s:user' % TENANT2, + owner_is_tenant=False) + UUIDX = uuidutils.generate_uuid() + #we need private image and context.owner should not match image owner + image = db_api.image_create(ctxt1, {'id': UUIDX, + 'status': 'queued', + 'is_public': False, + 'owner': TENANT1}) + + membership = {'can_share': False, + 'member': TENANT2, + 'image_id': UUIDX} + + db_api.image_member_create(ctxt1, membership) + + result = self.controller.is_image_sharable(ctxt2, image) + self.assertFalse(result) + + def test_is_image_sharable_owner_is_none(self): + TENANT1 = uuidutils.generate_uuid() + ctxt1 = context.RequestContext(is_admin=False, tenant=TENANT1, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + ctxt2 = context.RequestContext(is_admin=False, tenant=None, + auth_tok='user:%s:user' % TENANT1, + owner_is_tenant=True) + UUIDX = uuidutils.generate_uuid() + #we need private image and context.owner should not match image owner + image = db_api.image_create(ctxt1, {'id': UUIDX, + 'status': 'queued', + 'is_public': False, + 'owner': TENANT1}) + + result = self.controller.is_image_sharable(ctxt2, image) + self.assertFalse(result)