diff --git a/glance/common/store_utils.py b/glance/common/store_utils.py index 8c461892b5..679057e9f0 100644 --- a/glance/common/store_utils.py +++ b/glance/common/store_utils.py @@ -38,6 +38,8 @@ CONF = cfg.CONF CONF.register_opts(store_utils_opts) +RESTRICTED_URI_SCHEMAS = frozenset(['file', 'filesystem', 'swift+config']) + def safe_delete_from_backend(context, image_id, location): """ @@ -136,8 +138,7 @@ def validate_external_location(uri): """ # TODO(zhiyan): This function could be moved to glance_store. - - pieces = urlparse.urlparse(uri) - valid_schemes = [scheme for scheme in store_api.get_known_schemes() - if scheme != 'file' and scheme != 'swift+config'] - return pieces.scheme in valid_schemes + # TODO(gm): Use a whitelist of allowed schemes + scheme = urlparse.urlparse(uri).scheme + return (scheme in store_api.get_known_schemes() and + scheme not in RESTRICTED_URI_SCHEMAS) diff --git a/glance/tests/unit/test_store_location.py b/glance/tests/unit/test_store_location.py index 2f82146d5c..ae527ae55c 100644 --- a/glance/tests/unit/test_store_location.py +++ b/glance/tests/unit/test_store_location.py @@ -67,12 +67,15 @@ def test_add_location_with_restricted_sources(self): loc1 = {'url': 'file:///fake1.img.tar.gz', 'metadata': {}} loc2 = {'url': 'swift+config:///xxx', 'metadata': {}} + loc3 = {'url': 'filesystem:///foo.img.tar.gz', 'metadata': {}} # Test for insert location image1 = TestStoreLocation.FakeImageProxy() locations = glance.location.StoreLocations(image1, []) self.assertRaises(exception.BadStoreUri, locations.insert, 0, loc1) + self.assertRaises(exception.BadStoreUri, locations.insert, 0, loc3) self.assertNotIn(loc1, locations) + self.assertNotIn(loc3, locations) # Test for set_attr of _locations_proxy image2 = TestStoreLocation.FakeImageProxy() diff --git a/glance/tests/unit/v1/test_api.py b/glance/tests/unit/v1/test_api.py index 5ddae5543f..acdc7d99a9 100644 --- a/glance/tests/unit/v1/test_api.py +++ b/glance/tests/unit/v1/test_api.py @@ -1070,31 +1070,23 @@ def test_add_copy_from_with_location(self): def test_add_copy_from_with_restricted_sources(self): """Tests creates an image from copy-from with restricted sources""" - fixture_headers = {'x-image-meta-store': 'file', + header_template = {'x-image-meta-store': 'file', 'x-image-meta-disk-format': 'vhd', - 'x-glance-api-copy-from': 'file:///etc/passwd', 'x-image-meta-container-format': 'ovf', 'x-image-meta-name': 'fake image #F'} - req = webob.Request.blank("/images") - req.method = 'POST' - for k, v in six.iteritems(fixture_headers): - req.headers[k] = v - res = req.get_response(self.api) - self.assertEqual(400, res.status_int) + schemas = ["file:///etc/passwd", + "swift+config:///xxx", + "filesystem:///etc/passwd"] - fixture_headers = {'x-image-meta-store': 'file', - 'x-image-meta-disk-format': 'vhd', - 'x-glance-api-copy-from': 'swift+config://xxx', - 'x-image-meta-container-format': 'ovf', - 'x-image-meta-name': 'fake image #F'} - - req = webob.Request.blank("/images") - req.method = 'POST' - for k, v in six.iteritems(fixture_headers): - req.headers[k] = v - res = req.get_response(self.api) - self.assertEqual(400, res.status_int) + for schema in schemas: + req = webob.Request.blank("/images") + req.method = 'POST' + for k, v in six.iteritems(header_template): + req.headers[k] = v + req.headers['x-glance-api-copy-from'] = schema + res = req.get_response(self.api) + self.assertEqual(400, res.status_int) def test_add_copy_from_upload_image_unauthorized_with_body(self): rules = {"upload_image": '!', "modify_image": '@',