From 4927bee0a1eeaaac32ca710e3ae8f8467599a4e8 Mon Sep 17 00:00:00 2001 From: Tobias Urdin Date: Mon, 12 Feb 2024 09:26:35 +0000 Subject: [PATCH 1/9] Add test with noauth for s3tokens and ec2tokens The s3tokens and ec2tokens endpoint is a public API that does not require authentication to be called but that is what the testing does today. This duplicates the "good request" tests by also testing with noauth set so that we don't send a token in the request, this can prevent bugs where we would unintentionally require auth for these endpoints. Change-Id: Ibbde313ac8bcf3187c139bbd0840702f229534d0 (cherry picked from commit bd70653a2445fe1b9628ff1f92cbc58f2c9f4d70) --- keystone/tests/unit/test_contrib_ec2_core.py | 12 ++++++++++-- keystone/tests/unit/test_contrib_s3_core.py | 11 +++++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/keystone/tests/unit/test_contrib_ec2_core.py b/keystone/tests/unit/test_contrib_ec2_core.py index 4b514f8985..1494c721c4 100644 --- a/keystone/tests/unit/test_contrib_ec2_core.py +++ b/keystone/tests/unit/test_contrib_ec2_core.py @@ -37,7 +37,7 @@ def setUp(self): PROVIDERS.credential_api.create_credential( self.credential['id'], self.credential) - def test_valid_authentication_response_with_proper_secret(self): + def _test_valid_authentication_response_with_proper_secret(self, **kwargs): signer = ec2_utils.Ec2Signer(self.cred_blob['secret']) timestamp = utils.isotime(timeutils.utcnow()) credentials = { @@ -56,9 +56,17 @@ def test_valid_authentication_response_with_proper_secret(self): resp = self.post( '/ec2tokens', body={'credentials': credentials}, - expected_status=http.client.OK) + expected_status=http.client.OK, + **kwargs) self.assertValidProjectScopedTokenResponse(resp, self.user) + def test_valid_authentication_response_with_proper_secret(self): + self._test_valid_authentication_response_with_proper_secret() + + def test_valid_authentication_response_with_proper_secret_noauth(self): + self._test_valid_authentication_response_with_proper_secret( + noauth=True) + def test_valid_authentication_response_with_signature_v4(self): signer = ec2_utils.Ec2Signer(self.cred_blob['secret']) timestamp = utils.isotime(timeutils.utcnow()) diff --git a/keystone/tests/unit/test_contrib_s3_core.py b/keystone/tests/unit/test_contrib_s3_core.py index a9c8acd7ce..d03a3de46e 100644 --- a/keystone/tests/unit/test_contrib_s3_core.py +++ b/keystone/tests/unit/test_contrib_s3_core.py @@ -39,7 +39,7 @@ def setUp(self): PROVIDERS.credential_api.create_credential( self.credential['id'], self.credential) - def test_good_response(self): + def _test_good_response(self, **kwargs): sts = 'string to sign' # opaque string from swift3 sig = hmac.new(self.cred_blob['secret'].encode('ascii'), sts.encode('ascii'), hashlib.sha1).digest() @@ -50,10 +50,17 @@ def test_good_response(self): 'signature': base64.b64encode(sig).strip(), 'token': base64.b64encode(sts.encode('ascii')).strip(), }}, - expected_status=http.client.OK) + expected_status=http.client.OK, + **kwargs) self.assertValidProjectScopedTokenResponse(resp, self.user, forbid_token_id=True) + def test_good_response(self): + self._test_good_response() + + def test_good_response_noauth(self): + self._test_good_response(noauth=True) + def test_bad_request(self): self.post( '/s3tokens', From 65bf9315b8384e6f3d24a3d632745e1f4d3c00b4 Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Fri, 19 Sep 2025 14:02:18 +0200 Subject: [PATCH 2/9] Add service user authentication to ec2 and s3 endpoints Add a policy to enforce authentication with a user in the service group. This maintains AWS compatibility with the added security layer. Closes-Bug: 2119646 Change-Id: Ic84b84247e05f29874e2c5636a033aaedd4de83c Signed-off-by: Grzegorz Grasza Signed-off-by: Jeremy Stanley Signed-off-by: Artem Goncharov (cherry picked from commit b59e533bfd94d7dc8d404573179d00f2490f2ad1) --- doc/source/getting-started/policy_mapping.rst | 2 ++ keystone/api/ec2tokens.py | 8 ++++- keystone/api/s3tokens.py | 7 +++- keystone/common/policies/__init__.py | 4 +++ keystone/common/policies/ec2tokens.py | 34 ++++++++++++++++++ keystone/common/policies/s3tokens.py | 35 +++++++++++++++++++ keystone/tests/unit/test_contrib_ec2_core.py | 24 ++++++++++--- keystone/tests/unit/test_contrib_s3_core.py | 30 ++++++++++------ keystone/tests/unit/test_v3_credential.py | 22 ++++++++---- 9 files changed, 142 insertions(+), 24 deletions(-) create mode 100644 keystone/common/policies/ec2tokens.py create mode 100644 keystone/common/policies/s3tokens.py diff --git a/doc/source/getting-started/policy_mapping.rst b/doc/source/getting-started/policy_mapping.rst index a7cb27cfa7..fab79be12d 100644 --- a/doc/source/getting-started/policy_mapping.rst +++ b/doc/source/getting-started/policy_mapping.rst @@ -245,6 +245,8 @@ identity:delete_application_credential DELETE /v3/users/{use identity:get_access_rule GET /v3/users/{user_id}/access_rules/{access_rule_id} identity:list_access_rules GET /v3/users/{user_id}/access_rules identity:delete_access_rule DELETE /v3/users/{user_id}/access_rules/{access_rule_id} +identity:s3tokens_validate POST /v3/s3tokens +identity:ec2tokens_validate POST /v3/es2tokens ========================================================= === diff --git a/keystone/api/ec2tokens.py b/keystone/api/ec2tokens.py index d21673a031..04af947b54 100644 --- a/keystone/api/ec2tokens.py +++ b/keystone/api/ec2tokens.py @@ -21,6 +21,7 @@ from keystone.api._shared import EC2_S3_Resource from keystone.api._shared import json_home_relations +from keystone.common import rbac_enforcer from keystone.common import render_token from keystone.common import utils from keystone import exception @@ -31,6 +32,9 @@ CRED_TYPE_EC2 = 'ec2' +ENFORCER = rbac_enforcer.RBACEnforcer + + class EC2TokensResource(EC2_S3_Resource.ResourceBase): @staticmethod def _check_signature(creds_ref, credentials): @@ -60,12 +64,14 @@ def _check_signature(creds_ref, credentials): raise exception.Unauthorized( _('EC2 signature not supplied.')) - @ks_flask.unenforced_api def post(self): """Authenticate ec2 token. POST /v3/ec2tokens """ + # Enforce RBAC in the same way as S3 tokens + ENFORCER.enforce_call(action='identity:ec2tokens_validate') + token = self.handle_authenticate() token_reference = render_token.render_token_response_from_model(token) resp_body = jsonutils.dumps(token_reference) diff --git a/keystone/api/s3tokens.py b/keystone/api/s3tokens.py index 4a8439d696..0ba6fb547a 100644 --- a/keystone/api/s3tokens.py +++ b/keystone/api/s3tokens.py @@ -22,12 +22,15 @@ from keystone.api._shared import EC2_S3_Resource from keystone.api._shared import json_home_relations +from keystone.common import rbac_enforcer from keystone.common import render_token from keystone.common import utils from keystone import exception from keystone.i18n import _ from keystone.server import flask as ks_flask +ENFORCER = rbac_enforcer.RBACEnforcer + def _calculate_signature_v1(string_to_sign, secret_key): """Calculate a v1 signature. @@ -90,12 +93,14 @@ def _check_signature(creds_ref, credentials): raise exception.Unauthorized( message=_('Credential signature mismatch')) - @ks_flask.unenforced_api def post(self): """Authenticate s3token. POST /v3/s3tokens """ + # Use standard Keystone policy enforcement for s3tokens access + ENFORCER.enforce_call(action='identity:s3tokens_validate') + token = self.handle_authenticate() token_reference = render_token.render_token_response_from_model(token) resp_body = jsonutils.dumps(token_reference) diff --git a/keystone/common/policies/__init__.py b/keystone/common/policies/__init__.py index 02608c185a..68842b50ce 100644 --- a/keystone/common/policies/__init__.py +++ b/keystone/common/policies/__init__.py @@ -22,6 +22,7 @@ from keystone.common.policies import domain from keystone.common.policies import domain_config from keystone.common.policies import ec2_credential +from keystone.common.policies import ec2tokens from keystone.common.policies import endpoint from keystone.common.policies import endpoint_group from keystone.common.policies import grant @@ -40,6 +41,7 @@ from keystone.common.policies import revoke_event from keystone.common.policies import role from keystone.common.policies import role_assignment +from keystone.common.policies import s3tokens from keystone.common.policies import service from keystone.common.policies import service_provider from keystone.common.policies import token @@ -78,6 +80,8 @@ def list_rules(): revoke_event.list_rules(), role.list_rules(), role_assignment.list_rules(), + s3tokens.list_rules(), + ec2tokens.list_rules(), service.list_rules(), service_provider.list_rules(), token_revocation.list_rules(), diff --git a/keystone/common/policies/ec2tokens.py b/keystone/common/policies/ec2tokens.py new file mode 100644 index 0000000000..7f5f4e21f4 --- /dev/null +++ b/keystone/common/policies/ec2tokens.py @@ -0,0 +1,34 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_policy import policy + +from keystone.common.policies import base + +# Align EC2 tokens API with S3 tokens: require admin or service users +ADMIN_OR_SERVICE = 'rule:service_or_admin' + + +ec2tokens_policies = [ + policy.DocumentedRuleDefault( + name=base.IDENTITY % 'ec2tokens_validate', + check_str=ADMIN_OR_SERVICE, + scope_types=['system', 'domain', 'project'], + description='Validate EC2 credentials and create a Keystone token. ' + 'Restricted to service users or administrators.', + operations=[{'path': '/v3/ec2tokens', 'method': 'POST'}], + ) +] + + +def list_rules(): + return ec2tokens_policies diff --git a/keystone/common/policies/s3tokens.py b/keystone/common/policies/s3tokens.py new file mode 100644 index 0000000000..96088f62e2 --- /dev/null +++ b/keystone/common/policies/s3tokens.py @@ -0,0 +1,35 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_policy import policy + +from keystone.common.policies import base + +# S3 tokens API requires service authentication to prevent presigned URL exploitation +# This policy restricts access to service users or administrators only +ADMIN_OR_SERVICE = 'rule:service_or_admin' + +s3tokens_policies = [ + policy.DocumentedRuleDefault( + name=base.IDENTITY % 's3tokens_validate', + check_str=ADMIN_OR_SERVICE, + scope_types=['system', 'domain', 'project'], + description='Validate S3 credentials and create a Keystone token. ' + 'Restricted to service users or administrators to prevent ' + 'exploitation via presigned URLs.', + operations=[{'path': '/v3/s3tokens', 'method': 'POST'}], + ) +] + + +def list_rules(): + return s3tokens_policies diff --git a/keystone/tests/unit/test_contrib_ec2_core.py b/keystone/tests/unit/test_contrib_ec2_core.py index 1494c721c4..78607c0e6e 100644 --- a/keystone/tests/unit/test_contrib_ec2_core.py +++ b/keystone/tests/unit/test_contrib_ec2_core.py @@ -53,19 +53,35 @@ def _test_valid_authentication_response_with_proper_secret(self, **kwargs): }, } credentials['signature'] = signer.generate(credentials) + # Authenticate as system admin by default unless overridden via kwargs + token = None + if 'noauth' in kwargs and kwargs['noauth']: + token = None + else: + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.role_id + ) + token = self.get_system_scoped_token() + + expected_status = kwargs.get('expected_status', http.client.OK) resp = self.post( '/ec2tokens', body={'credentials': credentials}, - expected_status=http.client.OK, - **kwargs) - self.assertValidProjectScopedTokenResponse(resp, self.user) + expected_status=expected_status, + token=token, + noauth=kwargs.get('noauth'), + ) + if expected_status == http.client.OK: + self.assertValidProjectScopedTokenResponse(resp, self.user) def test_valid_authentication_response_with_proper_secret(self): self._test_valid_authentication_response_with_proper_secret() def test_valid_authentication_response_with_proper_secret_noauth(self): + # ec2 endpoint now enforces RBAC; unauthenticated should be denied self._test_valid_authentication_response_with_proper_secret( - noauth=True) + expected_status=http.client.UNAUTHORIZED, noauth=True + ) def test_valid_authentication_response_with_signature_v4(self): signer = ec2_utils.Ec2Signer(self.cred_blob['secret']) diff --git a/keystone/tests/unit/test_contrib_s3_core.py b/keystone/tests/unit/test_contrib_s3_core.py index d03a3de46e..c3647ad317 100644 --- a/keystone/tests/unit/test_contrib_s3_core.py +++ b/keystone/tests/unit/test_contrib_s3_core.py @@ -39,27 +39,35 @@ def setUp(self): PROVIDERS.credential_api.create_credential( self.credential['id'], self.credential) - def _test_good_response(self, **kwargs): + def _test_good_response(self, expected_status=http.client.OK, **kwargs): sts = 'string to sign' # opaque string from swift3 sig = hmac.new(self.cred_blob['secret'].encode('ascii'), sts.encode('ascii'), hashlib.sha1).digest() resp = self.post( '/s3tokens', - body={'credentials': { - 'access': self.cred_blob['access'], - 'signature': base64.b64encode(sig).strip(), - 'token': base64.b64encode(sts.encode('ascii')).strip(), - }}, - expected_status=http.client.OK, - **kwargs) - self.assertValidProjectScopedTokenResponse(resp, self.user, - forbid_token_id=True) + body={ + 'credentials': { + 'access': self.cred_blob['access'], + 'signature': base64.b64encode(sig).strip(), + 'token': base64.b64encode(sts.encode('ascii')).strip(), + } + }, + expected_status=expected_status, + **kwargs, + ) + if expected_status == http.client.OK: + self.assertValidProjectScopedTokenResponse( + resp, self.user, forbid_token_id=True + ) + else: + self.assertValidErrorResponse(resp) def test_good_response(self): self._test_good_response() def test_good_response_noauth(self): - self._test_good_response(noauth=True) + # s3tokens now requires service/admin auth; unauthenticated should be denied + self._test_good_response(http.client.UNAUTHORIZED, noauth=True) def test_bad_request(self): self.post( diff --git a/keystone/tests/unit/test_v3_credential.py b/keystone/tests/unit/test_v3_credential.py index 6573f4402a..a87161b474 100644 --- a/keystone/tests/unit/test_v3_credential.py +++ b/keystone/tests/unit/test_v3_credential.py @@ -79,16 +79,24 @@ def _test_get_token(self, access, secret): # Now make a request to validate the signed dummy request via the # ec2tokens API. This proves the v3 ec2 credentials actually work. - sig_ref = {'access': access, - 'signature': signature, - 'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} + sig_ref = { + 'access': access, + 'signature': signature, + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params, + } + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.role_id + ) + token = self.get_system_scoped_token() r = self.post( '/ec2tokens', body={'ec2Credentials': sig_ref}, - expected_status=http.client.OK) + expected_status=http.client.OK, + token=token, + ) self.assertValidTokenResponse(r) return r.result['token'] From 7f32072a665ff8cdcdf055275b443a2e635933bb Mon Sep 17 00:00:00 2001 From: Boris Bobrov Date: Tue, 7 Apr 2026 23:45:15 +0200 Subject: [PATCH 3/9] Block restricted app creds from creating EC2 credentials via /credentials The POST /v3/credentials endpoint accepted EC2 credential creation from restricted application credential tokens, bypassing the guard on the dedicated OS-EC2 endpoint. Add the same unrestricted application credential check to the generic credentials API for EC2-type credentials, and update the existing test to use an unrestricted application credential. Related-Bug: #2142138 Generated-By: claude-opus-4-6 (OpenCode) Signed-off-by: Boris Bobrov Change-Id: Idb192a2fd370fc26c7d76788e9ad1856483d3239 (cherry picked from commit afb7c20f705e4747189729c14f95f5caa867d6b4) --- keystone/api/credentials.py | 19 +++++++--- keystone/tests/unit/test_v3_credential.py | 42 +++++++++++++++++++++-- 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/keystone/api/credentials.py b/keystone/api/credentials.py index 90b53dd686..09304185a4 100644 --- a/keystone/api/credentials.py +++ b/keystone/api/credentials.py @@ -32,6 +32,16 @@ ENFORCER = rbac_enforcer.RBACEnforcer +def _check_unrestricted_application_credential(token): + if 'application_credential' in token.methods: + if not token.application_credential['unrestricted']: + action = _( + "Using method 'application_credential' is not " + "allowed for managing additional credentials." + ) + raise exception.ForbiddenAction(action=action) + + def _build_target_enforcement(): target = {} try: @@ -155,12 +165,13 @@ def post(self): ENFORCER.enforce_call( action='identity:create_credential', target_attr=target ) + token = self.auth_context['token'] + if credential.get('type', '').lower() == 'ec2': + _check_unrestricted_application_credential(token) validation.lazy_validate(schema.credential_create, credential) trust_id = getattr(self.oslo_context, 'trust_id', None) - app_cred_id = getattr( - self.auth_context['token'], 'application_credential_id', None) - access_token_id = getattr( - self.auth_context['token'], 'access_token_id', None) + app_cred_id = getattr(token, 'application_credential_id', None) + access_token_id = getattr(token, 'access_token_id', None) ref = self._assign_unique_id( self._normalize_dict(credential), trust_id=trust_id, app_cred_id=app_cred_id, diff --git a/keystone/tests/unit/test_v3_credential.py b/keystone/tests/unit/test_v3_credential.py index a87161b474..5787e339b9 100644 --- a/keystone/tests/unit/test_v3_credential.py +++ b/keystone/tests/unit/test_v3_credential.py @@ -690,9 +690,11 @@ def test_app_cred_ec2_credential(self): Call ``POST /credentials``. """ - # Create the app cred + # Create an unrestricted app cred (restricted app creds are + # blocked from creating EC2 credentials) ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}]) del ref['id'] + ref['unrestricted'] = True r = self.post('/users/%s/application_credentials' % self.user_id, body={'application_credential': ref}) app_cred = r.result['application_credential'] @@ -743,7 +745,43 @@ def test_app_cred_ec2_credential(self): '/credentials', body={'credential': ref}, token=token_id, - expected_status=http.client.CONFLICT) + expected_status=http.client.CONFLICT, + ) + + def _get_app_cred_token(self, unrestricted=False): + """Create an application credential and return its token.""" + ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}]) + del ref['id'] + if unrestricted: + ref['unrestricted'] = True + r = self.post( + f'/users/{self.user_id}/application_credentials', + body={'application_credential': ref}, + ) + app_cred = r.result['application_credential'] + auth_data = self.build_authentication_request( + app_cred_id=app_cred['id'], secret=app_cred['secret'] + ) + r = self.v3_create_token(auth_data) + return r.headers.get('X-Subject-Token') + + def test_restricted_app_cred_cannot_create_ec2_credential(self): + """Test that a restricted app cred cannot create EC2 credentials. + + A restricted application credential must not be allowed to + create EC2 credentials via POST /credentials either, as this + would bypass the guard on the OS-EC2 endpoint. + """ + token_id = self._get_app_cred_token(unrestricted=False) + blob, ref = unit.new_ec2_credential( + user_id=self.user_id, project_id=self.project_id + ) + self.post( + '/credentials', + body={'credential': ref}, + token=token_id, + expected_status=http.client.FORBIDDEN, + ) class TestCredentialAccessToken(CredentialBaseTestCase): From cc72fbed4e3a7750f023e1d4ae0430e9909d2b88 Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Wed, 22 Apr 2026 13:23:44 +0200 Subject: [PATCH 4/9] Enforce app cred project boundary on EC2 credential paths POST /v3/credentials did not validate that the caller-supplied project_id for an EC2-type credential matched the project of the authenticating application credential. This allowed an attacker holding an unrestricted application credential for project A to create an EC2 credential targeting project B; a subsequent /v3/ec2tokens exchange would then issue a Keystone token scoped to project B while still carrying the original app_cred_id, enabling cross-project lateral movement within the credential owner's role footprint. Two fixes: 1. credentials.py: after extracting app_cred_id from the token, check that credential['project_id'] == app_cred['project_id'] for EC2-type credentials and raise ForbiddenAction otherwise. 2. EC2_S3_Resource.py: in handle_authenticate(), assert that the stored EC2 credential project_id matches the application credential's project before issuing the token. This issue is orthogonal to CVE-2026-33551 (LP#2142138 / Gerrit 983655), which blocks restricted application credentials from creating EC2 credentials at all. The project-boundary check is absent regardless of the restricted flag and requires separate treatment. Closes-Bug: #2149775 Related-Bug: #OSPRH-29345 Assisted-by: claude-sonnet-4-6 Change-Id: I7c10c8a52e57e63cb9c66d03d69540abefe5425c Signed-off-by: Grzegorz Grasza (cherry picked from commit b6fd80996b882890a51f3e2aab41d952d7ff68ae) (cherry picked from commit d9e18a37888cabdea919c58b24f630fd722aa8b0) (cherry picked from commit 0892b69c407900ec7e7f6d4fbf9093ea5faf9a85) --- keystone/api/_shared/EC2_S3_Resource.py | 7 ++ keystone/api/credentials.py | 13 ++++ keystone/tests/unit/test_v3_credential.py | 92 +++++++++++++++++++++++ 3 files changed, 112 insertions(+) diff --git a/keystone/api/_shared/EC2_S3_Resource.py b/keystone/api/_shared/EC2_S3_Resource.py index ff94286b6e..b9083fc8d8 100644 --- a/keystone/api/_shared/EC2_S3_Resource.py +++ b/keystone/api/_shared/EC2_S3_Resource.py @@ -154,6 +154,13 @@ def handle_authenticate(self): app_cred = ac_client.get_application_credential( cred_data['app_cred_id']) roles = [r['id'] for r in app_cred['roles']] + if cred_data['project_id'] != app_cred['project_id']: + raise ks_exceptions.Unauthorized( + _( + 'EC2 credential project does not match the ' + 'application credential project.' + ) + ) elif cred_data['access_token_id']: access_token = PROVIDERS.oauth_api.get_access_token( cred_data['access_token_id']) diff --git a/keystone/api/credentials.py b/keystone/api/credentials.py index 09304185a4..068e9860aa 100644 --- a/keystone/api/credentials.py +++ b/keystone/api/credentials.py @@ -172,6 +172,19 @@ def post(self): trust_id = getattr(self.oslo_context, 'trust_id', None) app_cred_id = getattr(token, 'application_credential_id', None) access_token_id = getattr(token, 'access_token_id', None) + if ( + app_cred_id is not None + and credential.get('type', '').lower() == 'ec2' + ): + ac_api = PROVIDERS.application_credential_api + app_cred = ac_api.get_application_credential(app_cred_id) + if credential.get('project_id') != app_cred['project_id']: + action = _( + 'EC2 credential project_id must match the ' + 'project of the application credential used ' + 'to authenticate' + ) + raise exception.ForbiddenAction(action=action) ref = self._assign_unique_id( self._normalize_dict(credential), trust_id=trust_id, app_cred_id=app_cred_id, diff --git a/keystone/tests/unit/test_v3_credential.py b/keystone/tests/unit/test_v3_credential.py index 5787e339b9..5f25244574 100644 --- a/keystone/tests/unit/test_v3_credential.py +++ b/keystone/tests/unit/test_v3_credential.py @@ -783,6 +783,98 @@ def test_restricted_app_cred_cannot_create_ec2_credential(self): expected_status=http.client.FORBIDDEN, ) + def test_app_cred_ec2_credential_cross_project_forbidden(self): + """EC2 credential project_id must match the app cred project. + + An unrestricted app cred scoped to project A must not be used to + create an EC2 credential targeting a different project B. + + Call ``POST /credentials``. + """ + token_id = self._get_app_cred_token(unrestricted=True) + + other_project = unit.new_project_ref(domain_id=self.domain_id) + PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + + _, ec2_ref = unit.new_ec2_credential( + user_id=self.user_id, project_id=other_project['id'] + ) + self.post( + '/credentials', + body={'credential': ec2_ref}, + token=token_id, + expected_status=http.client.FORBIDDEN, + ) + + def test_app_cred_ec2_auth_cross_project_rejected(self): + """EC2 auth is rejected when credential project differs from app cred. + + A pre-existing EC2 credential whose project_id does not match the + linked application credential's project must be rejected at + authentication time, preventing cross-project lateral movement. + + Call ``POST /ec2tokens``. + """ + ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}]) + del ref['id'] + r = self.post( + f'/users/{self.user_id}/application_credentials', + body={'application_credential': ref}, + ) + app_cred = r.result['application_credential'] + + other_project = unit.new_project_ref(domain_id=self.domain_id) + PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + + # Bypass the API to plant a credential with a mismatched project_id. + # This simulates a credential that existed before the creation-time + # check was added, or one created via a direct DB write. + blob = { + 'access': uuid.uuid4().hex, + 'secret': uuid.uuid4().hex, + 'trust_id': None, + 'app_cred_id': app_cred['id'], + } + _, ec2_ref = unit.new_ec2_credential( + user_id=self.user_id, project_id=other_project['id'], blob=blob + ) + PROVIDERS.credential_api.create_credential(ec2_ref['id'], ec2_ref) + + signer = ec2_utils.Ec2Signer(blob['secret']) + params = { + 'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': blob['access'], + } + request = { + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params, + } + sig_ref = { + 'access': blob['access'], + 'signature': signer.generate(request), + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params, + } + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.role_id + ) + token = self.get_system_scoped_token() + self.post( + '/ec2tokens', + body={'ec2Credentials': sig_ref}, + token=token, + expected_status=http.client.UNAUTHORIZED, + ) + class TestCredentialAccessToken(CredentialBaseTestCase): """Test credential with access token.""" From d5f87157ee39c2d3b597421ea79fc67794183ef2 Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Thu, 23 Apr 2026 10:13:20 +0200 Subject: [PATCH 5/9] Enforce delegation project boundary for delegated tokens Delegated tokens (trusts, application credentials, OAuth1 access tokens) are scoped to a single project at delegation time. This must be enforced thoroughly while granting the API access to Keystone resources that might be also bound to a single project. Without this it is possible to gain different access (using trust to see application credentials for a different project, reuse the MFA seed, etc). * Credentials CRUD (/v3/credentials) All five CRUD operations verified ownership via user_id but did not bind credential.project_id to the delegating token's project scope. Fix: _check_credential_project_scope() - no-op for non-delegated tokens, raises ForbiddenAction on project mismatch. For list, out-of-scope credentials are silently filtered. Credentials with project_id=None (TOTP/MFA bindings) are treated as out-of-scope for any delegated token: they are user-level secrets with no project anchor, and a delegated token should never be able to enumerate, read, or mutate them - doing so would allow a stolen delegation token to exfiltrate or destroy a user's MFA binding. * OS-EC2 credential CRUD (/v3/users/{id}/credentials/OS-EC2) POST accepted any tenant_id from a delegated token. GET and DELETE had no delegation check at all. Fix: _check_delegation_for_ec2() enforces the project boundary; list silently filters. Additionally, pre-existing OAuth1 access-token-backed EC2 credentials with a mismatched project_id could be used at auth-time (POST /v3/ec2tokens) to obtain a cross-project token. Added a check in EC2_S3_Resource.py that cred_data['project_id'] matches access_token['project_id'] before issuing the token. The trust branch does not need this check - the token provider uses the trust's project regardless of the credential's project_id. * OS-OAUTH1 access token management (/v3/users/{id}/OS-OAUTH1/access_tokens) GET and DELETE had no delegation check. List blocked trust/OAuth but not app-cred tokens. Fix: _block_delegated_token() raises Forbidden for any delegation type on list, get, and delete. * Application credential management (/v3/users/{id}/application_credentials) Trust-scoped and OAuth1 tokens had no guard on the application credential and access rule management APIs. An impersonating trust could LIST, CREATE, or DELETE application credentials, creating a persistent backdoor that outlives the trust's own expiry. App credential tokens are intentionally excluded - the unrestricted/restricted distinction is handled separately by _check_unrestricted_application_credential. Fix: _block_delegated_token_app_creds() raises Forbidden for trust-scoped and OAuth1 tokens on all six app credential and access rule endpoints. Closes-Bug: #2150089 Related-Bug: #2149789 Related-Bug: #2149775 Assisted-by: Claude Sonnet 4.6 Change-Id: Iaaa0ec713a0a5e062acc3209d6010982899d8f6f Signed-off-by: Grzegorz Grasza Signed-off-by: Artem Goncharov (cherry picked from commit 16582e5192be354e26ebef4badca1213ddc4dc07) (cherry picked from commit d88779a85e7a0e97db61d2be6c1c46758236eed6) --- keystone/api/_shared/EC2_S3_Resource.py | 7 + keystone/api/credentials.py | 71 +++- keystone/api/users.py | 124 +++++- keystone/conf/security_compliance.py | 69 ++- .../unit/test_v3_application_credential.py | 316 +++++++++++++- keystone/tests/unit/test_v3_credential.py | 392 ++++++++++++++++++ keystone/tests/unit/test_v3_oauth1.py | 112 +++++ .../notes/bug-2150089-e91b592c948e5771.yaml | 35 ++ 8 files changed, 1089 insertions(+), 37 deletions(-) create mode 100644 releasenotes/notes/bug-2150089-e91b592c948e5771.yaml diff --git a/keystone/api/_shared/EC2_S3_Resource.py b/keystone/api/_shared/EC2_S3_Resource.py index b9083fc8d8..d543ecd7d5 100644 --- a/keystone/api/_shared/EC2_S3_Resource.py +++ b/keystone/api/_shared/EC2_S3_Resource.py @@ -165,6 +165,13 @@ def handle_authenticate(self): access_token = PROVIDERS.oauth_api.get_access_token( cred_data['access_token_id']) roles = jsonutils.loads(access_token['role_ids']) + if cred_data['project_id'] != access_token['project_id']: + raise ks_exceptions.Unauthorized( + _( + 'EC2 credential project does not match the ' + 'OAuth1 access token project.' + ) + ) auth_context = {'access_token_id': cred_data['access_token_id']} else: roles = PROVIDERS.assignment_api.get_roles_for_user_and_project( diff --git a/keystone/api/credentials.py b/keystone/api/credentials.py index 068e9860aa..be7e1e7208 100644 --- a/keystone/api/credentials.py +++ b/keystone/api/credentials.py @@ -42,6 +42,48 @@ def _check_unrestricted_application_credential(token): raise exception.ForbiddenAction(action=action) +def _check_credential_project_scope(token, oslo_context, credential): + """Enforce project boundary for delegated tokens. + + Non-delegated tokens (password, totp, etc.) are not restricted here -- + an admin with a regular token can legitimately manage credentials across + projects. Delegated tokens (trusts, application credentials, OAuth1) are + always bound to a single project at delegation time; only credentials + whose project_id exactly matches the token's project scope are in bounds. + + Credentials with project_id=None (e.g. TOTP/MFA bindings) are treated as + out-of-scope for any delegated token: they are user-level secrets with no + project anchor, and a delegated token should never be able to enumerate, + read, or mutate them -- doing so would allow a stolen delegation token to + exfiltrate or destroy a user's MFA binding. + """ + trust_id = getattr(oslo_context, 'trust_id', None) + app_cred_id = getattr(token, 'application_credential_id', None) + access_token_id = getattr(token, 'access_token_id', None) + + if not (trust_id or app_cred_id or access_token_id): + return + + token_project_id = oslo_context.project_id + cred_project_id = credential.get('project_id') + + if cred_project_id != token_project_id: + if CONF.security_compliance.allow_insecure_admin_trust_cross_project_credentials_access: + # When insecure cross-project access is enabled, still restrict to + # admin-role delegated tokens only. See LP#2150089. + try: + ENFORCER.enforce_call(action='admin_required') + return + except Exception: + pass + raise exception.ForbiddenAction( + action=_( + 'Credential project does not match the ' + 'project scope of the delegated token' + ) + ) + + def _build_target_enforcement(): target = {} try: @@ -122,6 +164,7 @@ def _list_credentials(self): # If the request was filtered, make sure to return only the # credentials specific to that user. This makes it so that users with # roles on projects can't see credentials that aren't theirs. + token = self.auth_context['token'] filtered_refs = [] for ref in refs: # Check each credential again to make sure the user has access to @@ -134,8 +177,9 @@ def _list_credentials(self): action='identity:get_credential', target_attr={'credential': cred} ) + _check_credential_project_scope(token, self.oslo_context, cred) filtered_refs.append(ref) - except exception.Forbidden: + except (exception.Forbidden, exception.ForbiddenAction): pass refs = filtered_refs refs = [self._blob_to_json(r) for r in refs] @@ -147,6 +191,9 @@ def _get_credential(self, credential_id): build_target=_build_target_enforcement ) credential = PROVIDERS.credential_api.get_credential(credential_id) + _check_credential_project_scope( + self.auth_context['token'], self.oslo_context, credential + ) return self.wrap_member(self._blob_to_json(credential)) def get(self, credential_id=None): @@ -172,19 +219,7 @@ def post(self): trust_id = getattr(self.oslo_context, 'trust_id', None) app_cred_id = getattr(token, 'application_credential_id', None) access_token_id = getattr(token, 'access_token_id', None) - if ( - app_cred_id is not None - and credential.get('type', '').lower() == 'ec2' - ): - ac_api = PROVIDERS.application_credential_api - app_cred = ac_api.get_application_credential(app_cred_id) - if credential.get('project_id') != app_cred['project_id']: - action = _( - 'EC2 credential project_id must match the ' - 'project of the application credential used ' - 'to authenticate' - ) - raise exception.ForbiddenAction(action=action) + _check_credential_project_scope(token, self.oslo_context, credential) ref = self._assign_unique_id( self._normalize_dict(credential), trust_id=trust_id, app_cred_id=app_cred_id, @@ -213,7 +248,9 @@ def patch(self, credential_id): build_target=_build_target_enforcement ) current = PROVIDERS.credential_api.get_credential(credential_id) - + _check_credential_project_scope( + self.auth_context['token'], self.oslo_context, current + ) credential = self.request_body_json.get('credential', {}) validation.lazy_validate(schema.credential_update, credential) self._validate_blob_update_keys(current.copy(), credential.copy()) @@ -233,6 +270,10 @@ def delete(self, credential_id): action='identity:delete_credential', build_target=_build_target_enforcement ) + credential = PROVIDERS.credential_api.get_credential(credential_id) + _check_credential_project_scope( + self.auth_context['token'], self.oslo_context, credential + ) return (PROVIDERS.credential_api.delete_credential( credential_id, initiator=self.audit_initiator), diff --git a/keystone/api/users.py b/keystone/api/users.py index e2821827d1..304e2730d4 100644 --- a/keystone/api/users.py +++ b/keystone/api/users.py @@ -92,6 +92,76 @@ def _check_unrestricted_application_credential(token): raise ks_exception.ForbiddenAction(action=action) +def _is_delegated_token(oslo_context, token): + """Return True if the token is any form of delegation.""" + trust_id = getattr(oslo_context, 'trust_id', None) + app_cred_id = getattr(token, 'application_credential_id', None) + access_token_id = getattr(token, 'access_token_id', None) + return bool(trust_id or app_cred_id or access_token_id) + + +def _check_delegation_for_ec2(oslo_context, token, project_id): + """For delegated tokens raise unless project_id exactly matches scope. + + Credentials with project_id=None (user-scoped secrets such as TOTP) are + treated as out-of-scope: a delegated token must not read or modify them. + """ + if not _is_delegated_token(oslo_context, token): + return + if project_id != oslo_context.project_id: + raise ks_exception.ForbiddenAction( + action=_( + 'EC2 credential project does not match the ' + 'project scope of the delegated token' + ) + ) + + +def _block_delegated_token(oslo_context, token): + """Raise Forbidden if the token is any form of delegation.""" + if oslo_context.is_delegated_auth: + raise ks_exception.Forbidden( + _( + 'Cannot manage OAuth access tokens with a token ' + 'issued via delegation.' + ) + ) + if 'application_credential' in token.methods: + raise ks_exception.Forbidden( + _( + 'Cannot manage OAuth access tokens with a token ' + 'issued via delegation.' + ) + ) + + +def _block_delegated_token_app_creds(oslo_context, token): + """Raise Forbidden if the token is a trust or OAuth1 delegation. + + Trust-scoped and OAuth1 access token-scoped tokens must not be used to + create, list, read, or delete application credentials or access rules. + Creating an application credential via such a token produces a persistent + credential that outlives the delegation's expiry or scope, providing a + backdoor that breaks the accountability model: the trust-scoped token + carries the full delegation chain enabling audit, but a derived application + credential does not. + + Application credential tokens are intentionally excluded from this check. + The unrestricted/restricted distinction for application credentials is a + documented feature handled separately by + _check_unrestricted_application_credential. + """ + trust_id = getattr(oslo_context, 'trust_id', None) + access_token_id = getattr(token, 'access_token_id', None) + if trust_id or access_token_id: + raise ks_exception.Forbidden( + _( + 'Cannot manage application credentials with a token ' + 'issued via delegation.' + ) + ) + + def _build_user_target_enforcement(): target = {} try: @@ -372,10 +442,16 @@ def get(self, user_id): PROVIDERS.identity_api.get_user(user_id) credential_refs = PROVIDERS.credential_api.list_credentials_for_user( user_id, type=CRED_TYPE_EC2) - collection_refs = [ - _convert_v3_to_ec2_credential(cred) - for cred in credential_refs - ] + token = self.auth_context['token'] + collection_refs = [] + for cred in credential_refs: + try: + _check_delegation_for_ec2( + self.oslo_context, token, cred.get('project_id') + ) + except (ks_exception.Forbidden, ks_exception.ForbiddenAction): + continue + collection_refs.append(_convert_v3_to_ec2_credential(cred)) return self.wrap_collection(collection_refs) def post(self, user_id): @@ -392,6 +468,7 @@ def post(self, user_id): PROVIDERS.identity_api.get_user(user_id) tenant_id = self.request_body_json.get('tenant_id') PROVIDERS.resource_api.get_project(tenant_id) + _check_delegation_for_ec2(self.oslo_context, token, tenant_id) blob = dict( access=uuid.uuid4().hex, secret=uuid.uuid4().hex, @@ -412,12 +489,12 @@ def post(self, user_id): class UserOSEC2CredentialsResourceGetDelete(_UserOSEC2CredBaseResource): @staticmethod - def _get_cred_data(credential_id): + def _get_raw_cred(credential_id): cred = PROVIDERS.credential_api.get_credential(credential_id) if not cred or cred['type'] != CRED_TYPE_EC2: raise ks_exception.Unauthorized( message=_('EC2 access key not found.')) - return _convert_v3_to_ec2_credential(cred) + return cred def get(self, user_id, credential_id): """Get a specific EC2 credential. @@ -430,8 +507,13 @@ def get(self, user_id, credential_id): build_target=func) PROVIDERS.identity_api.get_user(user_id) ec2_cred_id = utils.hash_access_key(credential_id) - cred_data = self._get_cred_data(ec2_cred_id) - return self.wrap_member(cred_data) + cred = self._get_raw_cred(ec2_cred_id) + _check_delegation_for_ec2( + self.oslo_context, + self.auth_context['token'], + cred.get('project_id'), + ) + return self.wrap_member(_convert_v3_to_ec2_credential(cred)) def delete(self, user_id, credential_id): """Delete a specific EC2 credential. @@ -443,7 +525,12 @@ def delete(self, user_id, credential_id): build_target=func) PROVIDERS.identity_api.get_user(user_id) ec2_cred_id = utils.hash_access_key(credential_id) - self._get_cred_data(ec2_cred_id) + cred = self._get_raw_cred(ec2_cred_id) + _check_delegation_for_ec2( + self.oslo_context, + self.auth_context['token'], + cred.get('project_id'), + ) PROVIDERS.credential_api.delete_credential(ec2_cred_id) return None, http.client.NO_CONTENT @@ -472,10 +559,7 @@ def get(self, user_id): GET /v3/users/{user_id}/OS-OAUTH1/access_tokens """ ENFORCER.enforce_call(action='identity:list_access_tokens') - if self.oslo_context.is_delegated_auth: - raise ks_exception.Forbidden( - _('Cannot list request tokens with a token ' - 'issued via delegation.')) + _block_delegated_token(self.oslo_context, self.auth_context['token']) refs = PROVIDERS.oauth_api.list_access_tokens(user_id) formatted_refs = ([_format_token_entity(x) for x in refs]) return self.wrap_collection(formatted_refs) @@ -488,6 +572,7 @@ def get(self, user_id, access_token_id): GET/HEAD /v3/users/{user_id}/OS-OAUTH1/access_tokens/{access_token_id} """ ENFORCER.enforce_call(action='identity:get_access_token') + _block_delegated_token(self.oslo_context, self.auth_context['token']) access_token = PROVIDERS.oauth_api.get_access_token(access_token_id) if access_token['authorizing_user_id'] != user_id: raise ks_exception.NotFound() @@ -502,6 +587,7 @@ def delete(self, user_id, access_token_id): ENFORCER.enforce_call( action='identity:ec2_delete_credential', build_target=_build_enforcer_target_data_owner_and_user_id_match) + _block_delegated_token(self.oslo_context, self.auth_context['token']) access_token = PROVIDERS.oauth_api.get_access_token(access_token_id) reason = ( 'Invalidating the token cache because an access token for ' @@ -623,6 +709,8 @@ def get(self, user_id): filters = ('name',) ENFORCER.enforce_call(action='identity:list_application_credentials', filters=filters) + token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) app_cred_api = PROVIDERS.application_credential_api hints = self.build_driver_hints(filters) refs = app_cred_api.list_application_credentials(user_id, hints=hints) @@ -639,6 +727,7 @@ def post(self, user_id): validation.lazy_validate(app_cred_schema.application_credential_create, app_cred_data) token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) _check_unrestricted_application_credential(token) if self.oslo_context.user_id != user_id: action = _('Cannot create an application credential for another ' @@ -694,6 +783,8 @@ def get(self, user_id, application_credential_id): action='identity:get_application_credential', target_attr=target, ) + token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) ref = PROVIDERS.application_credential_api.get_application_credential( application_credential_id) return self.wrap_member(ref) @@ -710,6 +801,7 @@ def delete(self, user_id, application_credential_id): target_attr=target ) token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) _check_unrestricted_application_credential(token) PROVIDERS.application_credential_api.delete_application_credential( application_credential_id, initiator=self.audit_initiator) @@ -729,6 +821,8 @@ def get(self, user_id): ENFORCER.enforce_call(action='identity:list_access_rules', filters=filters, build_target=_build_user_target_enforcement) + token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) app_cred_api = PROVIDERS.application_credential_api hints = self.build_driver_hints(filters) refs = app_cred_api.list_access_rules_for_user(user_id, hints=hints) @@ -749,6 +843,8 @@ def get(self, user_id, access_rule_id): action='identity:get_access_rule', build_target=_build_user_target_enforcement ) + token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) ref = PROVIDERS.application_credential_api.get_access_rule( access_rule_id) return self.wrap_member(ref) @@ -762,6 +858,8 @@ def delete(self, user_id, access_rule_id): action='identity:delete_access_rule', build_target=_build_user_target_enforcement ) + token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) PROVIDERS.application_credential_api.delete_access_rule( access_rule_id, initiator=self.audit_initiator) return None, http.client.NO_CONTENT diff --git a/keystone/conf/security_compliance.py b/keystone/conf/security_compliance.py index 5d528d0dea..f63607a892 100644 --- a/keystone/conf/security_compliance.py +++ b/keystone/conf/security_compliance.py @@ -125,6 +125,71 @@ """)) +allow_insecure_admin_trust_cross_project_credentials_access = cfg.BoolOpt( + 'allow_insecure_admin_trust_cross_project_credentials_access', + default=False, + deprecated_for_removal=True, + deprecated_reason=utils.fmt( + """ +Migrate automated workflows that use admin-role trusts to access credentials +across multiple projects (e.g. Mistral cron triggers) to use non-delegated +service account credentials instead, then remove this option. +""" + ), + deprecated_since='2026.1', + help=utils.fmt( + """ +INSECURE: When enabled, admin-role delegated tokens (trusts, application +credentials, OAuth1 access tokens) are allowed to access credentials outside +their project scope. By default (False), delegated tokens can only access +credentials whose project_id matches the token's project scope, preventing +cross-project lateral movement via a compromised delegation token. + +Enable this only if you have automated workflows (e.g. Mistral cron triggers) +that use admin-role trusts to access credentials across multiple projects and +cannot be migrated to use non-delegated service account credentials. Enabling +this option weakens the isolation guarantee provided by the delegation boundary +fix for LP#2150089. This option is deprecated and will be removed in a future +release. +""" + ), +) + + +allow_insecure_application_credential_trust_escalation = cfg.BoolOpt( + 'allow_insecure_application_credential_trust_escalation', + default=False, + deprecated_for_removal=True, + deprecated_reason=utils.fmt( + """ +Migrate workflows where application credentials create trusts to use OIDC +federation flows (v3oidcclientcredentials, v3oidcdeviceauthz) instead, then +remove this option. +""" + ), + deprecated_since='2026.1', + help=utils.fmt( + """ +INSECURE: When enabled, application credential tokens (including restricted +ones) are allowed to create, delete, and list trusts. By default (False), +application credential tokens are blocked from all trust operations regardless +of the unrestricted flag, because allowing an application credential to +bootstrap a trust creates a new delegation context. A trust-scoped token +produced from that trust can then access authentication material (EC2 +credentials, TOTP seeds) and operate entirely outside the delegation chain, +breaking the audit trail. The 'unrestricted' flag governs credential +management, not trust management. + +Enable this only if you have workflows where application credentials must +create trusts (e.g. Heat stacks authenticated via application credentials). +Use OIDC federation flows (v3oidcclientcredentials, v3oidcdeviceauthz) as the +proper long-term alternative. This option is deprecated and will be removed +in a future release. +""" + ), +) + + GROUP_NAME = __name__.split('.')[-1] ALL_OPTS = [ disable_user_account_days_inactive, @@ -135,7 +200,9 @@ minimum_password_age, password_regex, password_regex_description, - change_password_upon_first_use + change_password_upon_first_use, + allow_insecure_admin_trust_cross_project_credentials_access, + allow_insecure_application_credential_trust_escalation, ] diff --git a/keystone/tests/unit/test_v3_application_credential.py b/keystone/tests/unit/test_v3_application_credential.py index b60307c05a..38e45690e0 100644 --- a/keystone/tests/unit/test_v3_application_credential.py +++ b/keystone/tests/unit/test_v3_application_credential.py @@ -13,7 +13,7 @@ import datetime from testtools import matchers import uuid - +import unittest import http.client from keystone.common import provider_api @@ -181,7 +181,6 @@ def test_create_application_credential_with_trust(self): self.user_id, self.project_id, second_role['id']) with self.test_client() as c: pw_token = self.get_scoped_token() - # create a self-trust - only the roles are important for this test trust_ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.user_id, @@ -198,14 +197,26 @@ def test_create_application_credential_with_trust(self): trust_token = self.v3_create_token( trust_auth).headers['X-Subject-Token'] app_cred = self._app_cred_body(roles=[{'id': self.role_id}]) - # only the roles from the trust token should be allowed, even if - # the user has the role assigned on the project - c.post('/v3/users/%s/application_credentials' % self.user_id, - headers={'X-Auth-Token': trust_token}, - json=app_cred, - expected_status_code=http.client.BAD_REQUEST) + # Trust-scoped tokens are entirely blocked from managing + # application credentials (LP#2150089). + c.post( + f'/v3/users/{self.user_id}/application_credentials', + headers={'X-Auth-Token': trust_token}, + json=app_cred, + expected_status_code=http.client.FORBIDDEN, + ) def test_create_application_credential_allow_recursion(self): + """Unrestricted app credential token can create new credentials. + + The `unrestricted` flag is a documented (unsafe) feature that allows + an application credential token to create additional application + credentials. This must continue to work -- restricted application + credentials are blocked by _check_unrestricted_application_credential, + but unrestricted ones are explicitly opted-in to this behaviour. + Trust-scoped and OAuth1 tokens are separately blocked by + _block_delegated_token_app_creds (LP#2150089). + """ with self.test_client() as c: roles = [{'id': self.role_id}] app_cred_body_1 = self._app_cred_body(roles=roles) @@ -499,6 +510,12 @@ def test_delete_application_credential_with_application_credential(self): headers={'X-Auth-Token': token}) def test_delete_application_credential_allow_recursion(self): + """Unrestricted app credential token can delete credentials. + + The `unrestricted` flag allows an application credential token to + delete application credentials. Trust-scoped and OAuth1 tokens are + separately blocked by _block_delegated_token_app_creds (LP#2150089). + """ with self.test_client() as c: roles = [{'id': self.role_id}] app_cred_body = self._app_cred_body(roles=roles) @@ -548,3 +565,286 @@ def test_update_application_credential(self): json=app_cred_body, expected_status_code=http.client.METHOD_NOT_ALLOWED, headers={'X-Auth-Token': token}) + + def _get_trust_token(self, c, pw_token): + """Return a trust-scoped token for self.user_id on self.project_id.""" + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.user_id, + project_id=self.project_id, + role_ids=[self.role_id], + ) + resp = c.post( + '/v3/OS-TRUST/trusts', + headers={'X-Auth-Token': pw_token}, + json={'trust': trust_ref}, + ) + trust_id = resp.json['trust']['id'] + trust_auth = self.build_authentication_request( + user_id=self.user_id, + password=self.user['password'], + trust_id=trust_id, + ) + return self.v3_create_token(trust_auth).headers['X-Subject-Token'] + + def test_delegation_guard_trust_list_app_creds(self): + """Trust-scoped token cannot list application credentials (LP#2150089). + + Previously GET /v3/users/{id}/application_credentials had no delegation + guard at all; any trust-scoped token could enumerate the user's + application credentials. + """ + with self.test_client() as c: + pw_token = self.get_scoped_token() + trust_token = self._get_trust_token(c, pw_token) + c.get( + f'/v3/users/{self.user_id}/application_credentials', + headers={'X-Auth-Token': trust_token}, + expected_status_code=http.client.FORBIDDEN, + ) + + def test_delegation_guard_trust_get_app_cred(self): + """Trust-scoped token cannot read a specific application credential. + + Previously GET /v3/users/{id}/application_credentials/{id} had no + delegation guard at all (LP#2150089). + """ + with self.test_client() as c: + pw_token = self.get_scoped_token() + roles = [{'id': self.role_id}] + resp = c.post( + f'/v3/users/{self.user_id}/application_credentials', + json=self._app_cred_body(roles=roles), + expected_status_code=http.client.CREATED, + headers={'X-Auth-Token': pw_token}, + ) + app_cred_id = resp.json['application_credential']['id'] + trust_token = self._get_trust_token(c, pw_token) + member_path = f'/v3{MEMBER_PATH_FMT}' % { + 'user_id': self.user_id, + 'app_cred_id': app_cred_id, + } + c.get( + member_path, + headers={'X-Auth-Token': trust_token}, + expected_status_code=http.client.FORBIDDEN, + ) + + def test_delegation_guard_trust_delete_app_cred(self): + """Trust-scoped token cannot delete an application credential. + + Previously DELETE /v3/users/{id}/application_credentials/{id} only + checked for restricted app-cred tokens; trust-scoped tokens had no + guard (LP#2150089). + """ + with self.test_client() as c: + pw_token = self.get_scoped_token() + roles = [{'id': self.role_id}] + resp = c.post( + f'/v3/users/{self.user_id}/application_credentials', + json=self._app_cred_body(roles=roles), + expected_status_code=http.client.CREATED, + headers={'X-Auth-Token': pw_token}, + ) + app_cred_id = resp.json['application_credential']['id'] + trust_token = self._get_trust_token(c, pw_token) + member_path = f'/v3{MEMBER_PATH_FMT}' % { + 'user_id': self.user_id, + 'app_cred_id': app_cred_id, + } + c.delete( + member_path, + headers={'X-Auth-Token': trust_token}, + expected_status_code=http.client.FORBIDDEN, + ) + + def test_delegation_guard_trust_list_access_rules(self): + """Trust-scoped token cannot list access rules (LP#2150089).""" + access_rules = [ + {'path': '/v3/projects', 'method': 'GET', 'service': 'identity'} + ] + with self.test_client() as c: + pw_token = self.get_scoped_token() + roles = [{'id': self.role_id}] + c.post( + f'/v3/users/{self.user_id}/application_credentials', + json=self._app_cred_body( + roles=roles, access_rules=access_rules + ), + expected_status_code=http.client.CREATED, + headers={'X-Auth-Token': pw_token}, + ) + trust_token = self._get_trust_token(c, pw_token) + c.get( + f'/v3/users/{self.user_id}/access_rules', + headers={'X-Auth-Token': trust_token}, + expected_status_code=http.client.FORBIDDEN, + ) + + def test_delegation_guard_trust_get_access_rule(self): + """Trust-scoped token cannot read a specific access rule (LP#2150089).""" + access_rules = [ + {'path': '/v3/projects', 'method': 'GET', 'service': 'identity'} + ] + with self.test_client() as c: + pw_token = self.get_scoped_token() + roles = [{'id': self.role_id}] + resp = c.post( + f'/v3/users/{self.user_id}/application_credentials', + json=self._app_cred_body( + roles=roles, access_rules=access_rules + ), + expected_status_code=http.client.CREATED, + headers={'X-Auth-Token': pw_token}, + ) + access_rule_id = resp.json['application_credential'][ + 'access_rules' + ][0]['id'] + trust_token = self._get_trust_token(c, pw_token) + c.get( + f'/v3/users/{self.user_id}/access_rules/{access_rule_id}', + headers={'X-Auth-Token': trust_token}, + expected_status_code=http.client.FORBIDDEN, + ) + + def test_delegation_guard_trust_delete_access_rule(self): + """Trust-scoped token cannot delete an access rule (LP#2150089).""" + access_rules = [ + {'path': '/v3/projects', 'method': 'GET', 'service': 'identity'} + ] + with self.test_client() as c: + pw_token = self.get_scoped_token() + roles = [{'id': self.role_id}] + resp = c.post( + f'/v3/users/{self.user_id}/application_credentials', + json=self._app_cred_body( + roles=roles, access_rules=access_rules + ), + expected_status_code=http.client.CREATED, + headers={'X-Auth-Token': pw_token}, + ) + ac = resp.json['application_credential'] + access_rule_id = ac['access_rules'][0]['id'] + c.delete( + f'/v3/users/{self.user_id}/application_credentials/{ac["id"]}', + headers={'X-Auth-Token': pw_token}, + expected_status_code=http.client.NO_CONTENT, + ) + trust_token = self._get_trust_token(c, pw_token) + c.delete( + f'/v3/users/{self.user_id}/access_rules/{access_rule_id}', + headers={'X-Auth-Token': trust_token}, + expected_status_code=http.client.FORBIDDEN, + ) + + def test_list_access_rules(self): + access_rules: list[dict[str, str]] = [ + {"service": "foo", "method": "GET", "path": "/bar"} + ] + with self.test_client() as c: + roles = [{'id': self.role_id}] + app_cred_body = self._app_cred_body( + roles=roles, access_rules=access_rules + ) + token = self.get_scoped_token() + c.post( + f"/v3/users/{self.user_id}/application_credentials", + json=app_cred_body, + expected_status_code=http.client.CREATED, + headers={"X-Auth-Token": token}, + ) + # Invoke GET access_rules and trigger internal validation + r = c.get( + f"/v3/users/{self.user_id}/access_rules", + expected_status_code=http.client.OK, + headers={"X-Auth-Token": token}, + ) + ar = r.json["access_rules"] + self.assertEqual(access_rules[0]["method"], ar[0]["method"]) + + # TODO(stephenfin): This will pass once we increase strictness of the query + # string validation + @unittest.expectedFailure + def test_list_access_rules_invalid_qs(self): + with self.test_client() as c: + token = self.get_scoped_token() + # Invoke GET access_rules with unsupported query parameters and + # trigger internal validation + c.get( + f"/v3/users/{self.user_id}/access_rules?user_id=foo", + expected_status_code=http.client.BAD_REQUEST, + headers={"X-Auth-Token": token}, + ) + + def test_show_access_rule(self): + access_rules: list[dict[str, str]] = [ + {"service": "foo", "method": "GET", "path": "/bar"} + ] + with self.test_client() as c: + roles = [{'id': self.role_id}] + app_cred_body = self._app_cred_body( + roles=roles, access_rules=access_rules + ) + token = self.get_scoped_token() + resp = c.post( + f"/v3/users/{self.user_id}/application_credentials", + json=app_cred_body, + expected_status_code=http.client.CREATED, + headers={"X-Auth-Token": token}, + ) + access_rule_id = resp.json["application_credential"][ + "access_rules" + ][0]["id"] + # Invoke GET access_rules/{id} and trigger internal validation + c.get( + f"/v3/users/{self.user_id}/access_rules/{access_rule_id}", + expected_status_code=http.client.OK, + headers={"X-Auth-Token": token}, + ) + + # TODO(stephenfin): This will pass once we increase strictness of the query + # string validation + @unittest.expectedFailure + def test_show_access_rule_invalid_qs(self): + with self.test_client() as c: + token = self.get_scoped_token() + # Invoke GET access_rules/{id} with unsupported query parameters and + # trigger internal validation + c.get( + f"/v3/users/{self.user_id}/access_rules/{access_rule_id}" + "?foo=bar", + expected_status_code=http.client.BAD_REQUEST, + headers={"X-Auth-Token": token}, + ) + + def test_delete_access_rule(self): + access_rules: list[dict[str, str]] = [ + {"service": "foo", "method": "GET", "path": "/bar"} + ] + with self.test_client() as c: + roles = [{'id': self.role_id}] + app_cred_body = self._app_cred_body( + roles=roles, access_rules=access_rules + ) + token = self.get_scoped_token() + resp = c.post( + f"/v3/users/{self.user_id}/application_credentials", + json=app_cred_body, + expected_status_code=http.client.CREATED, + headers={"X-Auth-Token": token}, + ) + app_cred: dict = resp.json["application_credential"] + access_rule_id = app_cred["access_rules"][0]["id"] + c.delete( + f"/v3/users/{self.user_id}/application_credentials" + f"/{app_cred['id']}", + json=app_cred_body, + expected_status_code=http.client.NO_CONTENT, + headers={"X-Auth-Token": token}, + ) + # Invoke GET access_rules/{id} and trigger internal validation + c.delete( + f"/v3/users/{self.user_id}/access_rules/{access_rule_id}", + expected_status_code=http.client.NO_CONTENT, + headers={"X-Auth-Token": token}, + ) diff --git a/keystone/tests/unit/test_v3_credential.py b/keystone/tests/unit/test_v3_credential.py index 5f25244574..3f5e4759e0 100644 --- a/keystone/tests/unit/test_v3_credential.py +++ b/keystone/tests/unit/test_v3_credential.py @@ -65,6 +65,30 @@ def _create_dict_blob_credential(self): return json.dumps(blob), credential_id + def _get_ec2_sig_ref(self, blob): + """Return a signed ec2Credentials dict for use with POST /ec2tokens.""" + signer = ec2_utils.Ec2Signer(blob['secret']) + params = { + 'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': blob['access'], + } + return { + 'access': blob['access'], + 'signature': signer.generate( + { + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params, + } + ), + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params, + } + def _test_get_token(self, access, secret): """Test signature validation with the access/secret provided.""" signer = ec2_utils.Ec2Signer(secret) @@ -671,6 +695,152 @@ def test_trust_scoped_ec2_credential(self): token=token_id, expected_status=http.client.CONFLICT) + def _get_trust_token(self): + ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=True, + role_ids=[self.role_id], + ) + del ref['id'] + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + auth_data = self.build_authentication_request( + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], + trust_id=trust['id'], + ) + r = self.v3_create_token(auth_data) + return r.headers.get('X-Subject-Token') + + def test_trust_token_cannot_list_totp_credentials(self): + """Trust-scoped token must not see TOTP/MFA credentials (project_id=None). + + TOTP credentials have no project anchor. Before this fix the + project boundary check skipped null-project credentials, allowing a + delegation token to enumerate and exfiltrate MFA secrets. + """ + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + + trust_token = self._get_trust_token() + + r = self.get(f'/credentials?user_id={self.user_id}', token=trust_token) + listed_ids = [c['id'] for c in r.result['credentials']] + self.assertNotIn(totp_id, listed_ids) + + def test_trust_token_cannot_read_totp_credential(self): + """Trust-scoped token must not read a TOTP credential blob.""" + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + + trust_token = self._get_trust_token() + self.get( + f'/credentials/{totp_id}', + token=trust_token, + expected_status=http.client.FORBIDDEN, + ) + + def test_trust_token_cannot_update_totp_credential(self): + """Trust-scoped token must not be able to update a TOTP credential blob.""" + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + + trust_token = self._get_trust_token() + self.patch( + f'/credentials/{totp_id}', + token=trust_token, + body={'credential': totp_ref}, + expected_status=http.client.FORBIDDEN, + ) + + def test_trust_token_cannot_delete_totp_credential(self): + """Trust-scoped token must not delete a TOTP credential.""" + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + + trust_token = self._get_trust_token() + self.delete( + f'/credentials/{totp_id}', + token=trust_token, + expected_status=http.client.FORBIDDEN, + ) + # Confirm it still exists + self.get(f'/credentials/{totp_id}', expected_status=http.client.OK) + + def test_ec2_auth_trust_cross_project_scoped_to_trust(self): + """Trust-backed EC2 credential with mismatched project_id is safe. + + When an EC2 credential's project_id differs from the trust's + project_id, the trust mechanism constrains the resulting token to + the trust's project -- not the credential's project. This means the + cross-project escalation does not occur for trust-backed credentials, + and no additional auth-time check is needed in that branch. + + This test documents and protects that invariant. + """ + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=True, + role_ids=[self.role_id], + ) + del trust_ref['id'] + r = self.post('/OS-TRUST/trusts', body={'trust': trust_ref}) + trust = self.assertValidTrustResponse(r) + + other_project = unit.new_project_ref(domain_id=self.domain_id) + other_project = PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + + # Plant a credential with project_id pointing to the other project + # but trust_id from the trust above (scoped to self.project_id). + blob, ref = unit.new_ec2_credential( + user_id=self.user_id, project_id=other_project['id'] + ) + blob['trust_id'] = trust['id'] + ref['blob'] = json.dumps(blob) + PROVIDERS.credential_api.create_credential(ref['id'], ref) + + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.role_id + ) + token = self.get_system_scoped_token() + r = self.post( + '/ec2tokens', + body={'ec2Credentials': self._get_ec2_sig_ref(blob)}, + token=token, + expected_status=http.client.OK, + ) + # The resulting token is scoped to the trust's project, not to + # other_project -- the trust mechanism prevents cross-project escalation. + token_project = r.result['token']['project']['id'] + self.assertEqual(self.project_id, token_project) + self.assertNotEqual(other_project['id'], token_project) + class TestCredentialAppCreds(CredentialBaseTestCase): """Test credential with application credential token.""" @@ -875,6 +1045,81 @@ def test_app_cred_ec2_auth_cross_project_rejected(self): expected_status=http.client.UNAUTHORIZED, ) + def test_app_cred_token_cannot_list_totp_credentials(self): + """App cred token must not see TOTP/MFA credentials (project_id=None). + + TOTP credentials have no project anchor. Before this fix the + project boundary check skipped null-project credentials, allowing a + delegation token to enumerate and exfiltrate MFA secrets. + """ + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + + app_cred_token = self._get_app_cred_token(unrestricted=True) + + r = self.get( + f'/credentials?user_id={self.user_id}', token=app_cred_token + ) + listed_ids = [c['id'] for c in r.result['credentials']] + self.assertNotIn(totp_id, listed_ids) + + def test_app_cred_token_cannot_read_totp_credential(self): + """App cred token must not read a TOTP credential blob.""" + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + app_cred_token = self._get_app_cred_token(unrestricted=True) + + self.get( + f'/credentials/{totp_id}', + token=app_cred_token, + expected_status=http.client.FORBIDDEN, + ) + + def test_app_cred_token_cannot_update_totp_credential(self): + """App cred token must not update a TOTP credential blob.""" + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + app_cred_token = self._get_app_cred_token(unrestricted=True) + + self.patch( + f'/credentials/{totp_id}', + token=app_cred_token, + body={'credential': totp_ref}, + expected_status=http.client.FORBIDDEN, + ) + + def test_app_cred_token_cannot_delete_totp_credential(self): + """App cred token must not delete a TOTP credential blob.""" + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + app_cred_token = self._get_app_cred_token(unrestricted=True) + + self.delete( + f'/credentials/{totp_id}', + token=app_cred_token, + expected_status=http.client.FORBIDDEN, + ) + class TestCredentialAccessToken(CredentialBaseTestCase): """Test credential with access token.""" @@ -1022,6 +1267,53 @@ def test_access_token_ec2_credential(self): self.assertIn(self.role_id, ec2_roles) self.assertNotIn(role_id, ec2_roles) + def test_ec2_auth_access_token_cross_project_blocked(self): + """OAuth1 access-token-backed EC2 credential must not auth cross-project. + + Auth-time check: if a cross-project EC2 credential backed by an OAuth1 + access token exists, POST /ec2tokens must reject it when the + credential's project_id differs from the access token's project_id. + """ + access_key, _ = self._get_access_token() + + # Retrieve the stored access token to get its project_id + access_token = PROVIDERS.oauth_api.get_access_token( + access_key.decode('utf-8') + if isinstance(access_key, bytes) + else access_key + ) + + # Create a second project (cross-project target) + other_project = unit.new_project_ref(domain_id=self.domain_id) + other_project = PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + + # Directly inject an EC2 credential whose project_id points to the + # other project but whose access_token_id references the token above. + # This simulates a pre-existing cross-project credential. + blob, ref = unit.new_ec2_credential( + user_id=self.user_id, project_id=other_project['id'] + ) + blob['access_token_id'] = ( + access_key.decode('utf-8') + if isinstance(access_key, bytes) + else access_key + ) + ref['blob'] = json.dumps(blob) + PROVIDERS.credential_api.create_credential(ref['id'], ref) + + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.role_id + ) + token = self.get_system_scoped_token() + self.post( + '/ec2tokens', + body={'ec2Credentials': self._get_ec2_sig_ref(blob)}, + token=token, + expected_status=http.client.UNAUTHORIZED, + ) + class TestCredentialEc2(CredentialBaseTestCase): """Test v3 credential compatibility with ec2tokens.""" @@ -1126,3 +1418,103 @@ def test_ec2_delete_credential(self): self.assertRaises(exception.CredentialNotFound, PROVIDERS.credential_api.get_credential, cred_from_credential_api[0]['id']) + + def _get_trust_token(self): + """Create a trust and return a trust-scoped token for the trustee.""" + trustee = unit.new_user_ref(domain_id=self.domain_id) + password = trustee['password'] + trustee = PROVIDERS.identity_api.create_user(trustee) + trustee['password'] = password + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=trustee['id'], + project_id=self.project_id, + impersonation=True, + role_ids=[self.role_id], + ) + del trust_ref['id'] + r = self.post('/OS-TRUST/trusts', body={'trust': trust_ref}) + trust = r.result['trust'] + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password'], + trust_id=trust['id'], + ) + r = self.v3_create_token(auth_data) + return r.headers.get('X-Subject-Token') + + def test_ec2_create_credential_trust_cross_project_blocked(self): + """Trust-scoped token cannot create EC2 cred for a different project.""" + other_project = unit.new_project_ref(domain_id=self.domain_id) + other_project = PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + trust_token = self._get_trust_token() + uri = f'/users/{self.user_id}/credentials/OS-EC2' + self.post( + uri, + body={'tenant_id': other_project['id']}, + token=trust_token, + expected_status=http.client.FORBIDDEN, + ) + + def test_ec2_create_credential_trust_same_project_allowed(self): + """Trust-scoped token can create EC2 cred for the trust project.""" + trust_token = self._get_trust_token() + uri = self._get_ec2_cred_uri() + r = self.post( + uri, + body={'tenant_id': self.project_id}, + token=trust_token, + expected_status=http.client.CREATED, + ) + self.assertEqual(self.project_id, r.result['credential']['tenant_id']) + + def test_ec2_get_credential_trust_cross_project_blocked(self): + """Trust-scoped token cannot get an EC2 cred from a different project.""" + other_project = unit.new_project_ref(domain_id=self.domain_id) + other_project = PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, other_project['id'], self.role_id + ) + ec2_cred = self._get_ec2_cred() + # Change the credential's project to the other project directly + PROVIDERS.credential_api.update_credential( + next( + c['id'] + for c in PROVIDERS.credential_api.list_credentials_for_user( + self.user_id, type=CRED_TYPE_EC2 + ) + ), + {'project_id': other_project['id']}, + ) + trust_token = self._get_trust_token() + uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) + self.get(uri, token=trust_token, expected_status=http.client.FORBIDDEN) + + def test_ec2_delete_credential_trust_cross_project_blocked(self): + """Trust-scoped token cannot delete EC2 cred from a different project.""" + other_project = unit.new_project_ref(domain_id=self.domain_id) + other_project = PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, other_project['id'], self.role_id + ) + ec2_cred = self._get_ec2_cred() + PROVIDERS.credential_api.update_credential( + next( + c['id'] + for c in PROVIDERS.credential_api.list_credentials_for_user( + self.user_id, type=CRED_TYPE_EC2 + ) + ), + {'project_id': other_project['id']}, + ) + trust_token = self._get_trust_token() + uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) + self.delete( + uri, token=trust_token, expected_status=http.client.FORBIDDEN + ) diff --git a/keystone/tests/unit/test_v3_oauth1.py b/keystone/tests/unit/test_v3_oauth1.py index 6b6942b612..debe17aa68 100644 --- a/keystone/tests/unit/test_v3_oauth1.py +++ b/keystone/tests/unit/test_v3_oauth1.py @@ -430,6 +430,118 @@ def test_list_and_delete_access_tokens(self): self.assertEqual([], entities) self.assertValidListLinks(resp.result['links']) + def _get_app_cred_token(self): + app_cred = { + 'id': uuid.uuid4().hex, + 'user_id': self.user_id, + 'project_id': self.project_id, + 'name': uuid.uuid4().hex, + 'roles': [{'id': self.role_id}], + 'secret': uuid.uuid4().hex, + } + PROVIDERS.application_credential_api.create_application_credential( + app_cred + ) + auth_data = self.build_authentication_request( + app_cred_id=app_cred['id'], secret=app_cred['secret'] + ) + r = self.v3_create_token(auth_data) + return r.headers['X-Subject-Token'] + + def test_list_access_tokens_with_app_cred_blocked(self): + """Application credential token must not list OAuth1 access tokens.""" + self.test_oauth_flow() + token = self._get_app_cred_token() + self.get( + f'/users/{self.user_id}/OS-OAUTH1/access_tokens', + token=token, + expected_status=http.client.FORBIDDEN, + ) + + def test_get_access_token_with_app_cred_blocked(self): + """Application credential token must not get a specific access token.""" + self.test_oauth_flow() + token = self._get_app_cred_token() + access_token_key = self.access_token.key.decode() + self.get( + f'/users/{self.user_id}/OS-OAUTH1/access_tokens/{access_token_key}', + token=token, + expected_status=http.client.FORBIDDEN, + ) + + def test_delete_access_token_with_app_cred_blocked(self): + """Application credential token must not delete an access token.""" + self.test_oauth_flow() + token = self._get_app_cred_token() + access_token_key = self.access_token.key.decode() + self.delete( + f'/users/{self.user_id}/OS-OAUTH1/access_tokens/{access_token_key}', + token=token, + expected_status=http.client.FORBIDDEN, + ) + + def test_get_access_token_with_trust_token_blocked(self): + """Trust-scoped token must not get a specific access token.""" + self.test_oauth_flow() + trustee = unit.new_user_ref(domain_id=self.domain_id) + password = trustee['password'] + trustee = PROVIDERS.identity_api.create_user(trustee) + trustee['password'] = password + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=trustee['id'], + project_id=self.project_id, + impersonation=True, + role_ids=[self.role_id], + ) + del trust_ref['id'] + r = self.post('/OS-TRUST/trusts', body={'trust': trust_ref}) + trust_id = r.result['trust']['id'] + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password'], + trust_id=trust_id, + ) + r = self.v3_create_token(auth_data) + trust_token = r.headers['X-Subject-Token'] + access_token_key = self.access_token.key.decode() + self.get( + f'/users/{self.user_id}/OS-OAUTH1/access_tokens/{access_token_key}', + token=trust_token, + expected_status=http.client.FORBIDDEN, + ) + + def test_delete_access_token_with_trust_token_blocked(self): + """Trust-scoped token must not delete an access token.""" + self.test_oauth_flow() + trustee = unit.new_user_ref(domain_id=self.domain_id) + password = trustee['password'] + trustee = PROVIDERS.identity_api.create_user(trustee) + trustee['password'] = password + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=trustee['id'], + project_id=self.project_id, + impersonation=True, + role_ids=[self.role_id], + ) + del trust_ref['id'] + r = self.post('/OS-TRUST/trusts', body={'trust': trust_ref}) + trust_id = r.result['trust']['id'] + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password'], + trust_id=trust_id, + ) + r = self.v3_create_token(auth_data) + trust_token = r.headers['X-Subject-Token'] + access_token_key = self.access_token.key.decode() + self.delete( + f'/users/{self.user_id}/OS-OAUTH1/access_tokens/{access_token_key}', + token=trust_token, + expected_status=http.client.FORBIDDEN, + ) + class AuthTokenTests(object): diff --git a/releasenotes/notes/bug-2150089-e91b592c948e5771.yaml b/releasenotes/notes/bug-2150089-e91b592c948e5771.yaml new file mode 100644 index 0000000000..9d44424625 --- /dev/null +++ b/releasenotes/notes/bug-2150089-e91b592c948e5771.yaml @@ -0,0 +1,35 @@ +--- +security: + - | + [`bug 2150089 `_] + Delegated tokens (trusts, application credentials, OAuth1 access tokens) + are now restricted to credentials whose ``project_id`` matches the token's + project scope. This closes a cross-project lateral movement vector where a + delegated token could read, modify, or delete credentials belonging to a + different project, including EC2 keys and TOTP/MFA seed bindings. + + Application credential tokens are now blocked from all trust operations + (create, delete, list, get). Allowing an application credential to bootstrap + a trust creates a new delegation context whose token can access + authentication material outside the delegation chain, breaking the audit + trail. The ``unrestricted`` flag governs credential management, not trust + management. +upgrade: + - | + [`bug 2150089 `_] + Two new ``[security_compliance]`` options control opt-in insecure behaviour + for operators with workflows that break after this upgrade: + + ``allow_insecure_admin_trust_cross_project_credentials_access`` (default + ``False``): set to ``True`` if admin-role trusts or application credentials + need to access credentials across multiple projects (e.g. Mistral cron + triggers syncing EC2 credentials system-wide). + + ``allow_insecure_application_credential_trust_escalation`` (default + ``False``): set to ``True`` if application credentials must create or manage + trusts (e.g. Heat stacks authenticated via application credentials). Use + OIDC federation flows (``v3oidcclientcredentials``, ``v3oidcdeviceauthz``) + as the proper long-term alternative. + + Both options are intentionally named to signal that enabling them is + insecure. Migrate affected workflows away from these options. From a37aab466f65956783cee45bc42ae41105f57f6f Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Tue, 12 May 2026 09:22:34 +0200 Subject: [PATCH 6/9] Fix user impersonation through application credentials (CVE-2026-42998) When authenticating by application credential ID, the caller can supply a 'user' field in the payload. AppCredInfo conditionally set the user from the credential owner only when no user field was present. If present, BaseUserInfo resolved the caller-supplied user and attributed the resulting token to that user instead of the credential owner. Fix: always set auth_payload['user'] from the credential's stored user_id, ignoring any caller-supplied value. Closes-Bug: #2148477 Assisted-by: Claude Sonnet 4.6 Co-authored-by: Boris Bobrov Change-Id: I2fe6089886eebf3775930451b87771e40b5e179e Signed-off-by: Grzegorz Grasza Signed-off-by: Artem Goncharov (cherry picked from commit 6cd25fecdab8b9261e916ee10f3dba5aeb0c1984) (cherry picked from commit 0a57564b1a273eb8719c31e32ad573eddb3e2136) --- keystone/auth/plugins/core.py | 7 +-- keystone/tests/unit/test_v3_auth.py | 75 +++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/keystone/auth/plugins/core.py b/keystone/auth/plugins/core.py index 1ae451df33..f48e360942 100644 --- a/keystone/auth/plugins/core.py +++ b/keystone/auth/plugins/core.py @@ -242,9 +242,10 @@ def _validate_and_normalize_auth_data(self, auth_payload): app_cred = app_cred_api.get_application_credential( auth_payload['id']) self.user_id = app_cred['user_id'] - if not auth_payload.get('user'): - auth_payload['user'] = {} - auth_payload['user']['id'] = self.user_id + # Always bind to the credential owner. Any user field supplied by + # the caller would allow impersonation of an arbitrary user + # (LP#2148477). + auth_payload['user'] = {'id': app_cred['user_id']} super(AppCredInfo, self)._validate_and_normalize_auth_data( auth_payload) elif auth_payload.get('name'): diff --git a/keystone/tests/unit/test_v3_auth.py b/keystone/tests/unit/test_v3_auth.py index cf456518aa..2d2dd80ff9 100644 --- a/keystone/tests/unit/test_v3_auth.py +++ b/keystone/tests/unit/test_v3_auth.py @@ -5736,3 +5736,78 @@ def test_application_credential_access_rules_without_header_fails(self): expected_status=http.client.CREATED) token = resp.headers.get('X-Subject-Token') self._validate_token(token, expected_status=http.client.NOT_FOUND) + + def test_app_cred_auth_with_injected_user_id_is_ignored(self): + """Caller-supplied user ID in app cred payload must be ignored. + + When authenticating by application credential ID, the token must + always be attributed to the credential owner. An attacker-supplied + user ID must not override the credential owner's identity. + LP#2148477 -- user impersonation via app credential auth. + """ + victim = unit.create_user( + PROVIDERS.identity_api, domain_id=self.domain_id + ) + PROVIDERS.assignment_api.add_role_to_user_and_project( + victim['id'], self.project_id, self.role_id + ) + + app_cred = self._make_app_cred() + app_cred_ref = self.app_cred_api.create_application_credential( + app_cred + ) + + auth_body = { + 'auth': { + 'identity': { + 'methods': ['application_credential'], + 'application_credential': { + 'id': app_cred_ref['id'], + 'secret': app_cred['secret'], + 'user': {'id': victim['id']}, + }, + } + } + } + r = self.v3_create_token(auth_body) + token_data = r.result['token'] + self.assertEqual(self.user['id'], token_data['user']['id']) + self.assertNotEqual(victim['id'], token_data['user']['id']) + + def test_app_cred_auth_with_injected_username_is_ignored(self): + """Caller-supplied username in app cred payload must be ignored. + + Same as the user ID variant but uses the victim's name and domain, + which are typically predictable. LP#2148477. + """ + victim = unit.create_user( + PROVIDERS.identity_api, domain_id=self.domain_id + ) + PROVIDERS.assignment_api.add_role_to_user_and_project( + victim['id'], self.project_id, self.role_id + ) + + app_cred = self._make_app_cred() + app_cred_ref = self.app_cred_api.create_application_credential( + app_cred + ) + + auth_body = { + 'auth': { + 'identity': { + 'methods': ['application_credential'], + 'application_credential': { + 'id': app_cred_ref['id'], + 'secret': app_cred['secret'], + 'user': { + 'name': victim['name'], + 'domain': {'name': self.domain['name']}, + }, + }, + } + } + } + r = self.v3_create_token(auth_body) + token_data = r.result['token'] + self.assertEqual(self.user['id'], token_data['user']['id']) + self.assertNotEqual(victim['id'], token_data['user']['id']) From e29c542fedc16ebd086fbb84e90ed7ac9e7cf2f9 Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Tue, 12 May 2026 09:22:47 +0200 Subject: [PATCH 7/9] Forbid trust operations using application credentials (CVE-2026-43000) Previously only restricted application credentials were blocked, and only for trust create and delete. This change blocks all application credentials (restricted and unrestricted alike) from all trust operations: list, get, create, delete, list-roles, and get-role. The 'unrestricted' flag governs credential management, not trust management. Closes-Bug: #2148477 Related-Bug: #2149789 Related-Bug: #2150089 Assisted-by: Claude Sonnet 4.6 Co-authored-by: Boris Bobrov Change-Id: I750156df18a1d6293ce99c42eb524575fcf16ea3 Signed-off-by: Grzegorz Grasza Signed-off-by: Artem Goncharov (cherry picked from commit 3c2043ab003cb4b8aa34502fe9a5a69b0a6a6e54) (cherry picked from commit 7befe0a268516344f6e3e8e5018a90939c03497a) --- doc/source/user/application_credentials.rst | 6 +- keystone/api/trusts.py | 40 +++++++-- keystone/tests/unit/test_v3_trust.py | 93 +++++++++++++++++++++ 3 files changed, 128 insertions(+), 11 deletions(-) diff --git a/doc/source/user/application_credentials.rst b/doc/source/user/application_credentials.rst index eff86f7b3d..7f168a29c7 100644 --- a/doc/source/user/application_credentials.rst +++ b/doc/source/user/application_credentials.rst @@ -142,9 +142,9 @@ You can provide an expiration date for application credentials: +--------------+----------------------------------------------------------------------------------------+ By default, application credentials are restricted from creating or deleting -other application credentials and from creating or deleting trusts. If your -application needs to be able to perform these actions and you accept the risks -involved, you can disable this protection: +other application credentials. If your application needs to be able to perform +these actions and you accept the risks involved, you can disable this +protection: .. warning:: diff --git a/keystone/api/trusts.py b/keystone/api/trusts.py index 781b99efe5..265d3be829 100644 --- a/keystone/api/trusts.py +++ b/keystone/api/trusts.py @@ -22,6 +22,7 @@ from oslo_policy import _checks as op_checks from keystone.api._shared import json_home_relations +from keystone.common import authorization from keystone.common import context from keystone.common import json_home from keystone.common import provider_api @@ -29,6 +30,7 @@ from keystone.common.rbac_enforcer import policy from keystone.common import utils from keystone.common import validation +import keystone.conf from keystone import exception from keystone.i18n import _ from keystone.server import flask as ks_flask @@ -36,6 +38,7 @@ LOG = log.getLogger(__name__) +CONF = keystone.conf.CONF ENFORCER = rbac_enforcer.RBACEnforcer PROVIDERS = provider_api.ProviderAPIs @@ -46,6 +49,30 @@ parameter_name='trust_id') +def _check_application_credential(): + """Block application credential tokens from all trust operations. + + Application credentials are single-project delegation tokens. Allowing + them to read or manage trusts would permit a compromised application + credential to enumerate or manipulate the trust delegation chain, + expanding its effective scope beyond the single project it was issued for. + This applies regardless of the 'unrestricted' flag. + """ + if CONF.security_compliance.allow_insecure_application_credential_trust_escalation: + return + auth_context = flask.request.environ.get( + authorization.AUTH_CONTEXT_ENV, {} + ) + token = auth_context.get('token') + if token and 'application_credential' in token.methods: + raise exception.ForbiddenAction( + action=_( + "Using method 'application_credential' is not " + "allowed for managing trusts." + ) + ) + + def _build_trust_target_enforcement(): target = {} # NOTE(cmurphy) unlike other APIs, in the event the trust doesn't exist or @@ -101,14 +128,7 @@ class TrustResource(ks_flask.ResourceBase): json_home_parameter_rel_func = _build_parameter_relation def _check_unrestricted(self): - if self.oslo_context.is_admin: - return - token = self.auth_context['token'] - if 'application_credential' in token.methods: - if not token.application_credential['unrestricted']: - action = _("Using method 'application_credential' is not " - "allowed for managing trusts.") - raise exception.ForbiddenAction(action=action) + _check_application_credential() def _find_redelegated_trust(self): # Check if delegated via trust @@ -166,6 +186,7 @@ def _normalize_role_list(self, trust_roles): def _get_trust(self, trust_id): ENFORCER.enforce_call(action='identity:get_trust', build_target=_build_trust_target_enforcement) + _check_application_credential() # NOTE(cmurphy) look up trust before doing is_admin authorization - to # maintain the API contract, we expect a missing trust to raise a 404 @@ -214,6 +235,7 @@ def _list_trusts(self): target_attr=target) else: ENFORCER.enforce_call(action='identity:list_trusts') + _check_application_credential() trusts = [] @@ -361,6 +383,7 @@ def get(self, trust_id): # block access here raise exception.ForbiddenAction( action=_('Requested user has no relation to this trust')) + _check_application_credential() trust = PROVIDERS.trust_api.get_trust(trust_id) @@ -410,6 +433,7 @@ def get(self, trust_id, role_id): # block access here raise exception.ForbiddenAction( action=_('Requested user has no relation to this trust')) + _check_application_credential() trust = PROVIDERS.trust_api.get_trust(trust_id) diff --git a/keystone/tests/unit/test_v3_trust.py b/keystone/tests/unit/test_v3_trust.py index 6cea445f24..474e18bf6f 100644 --- a/keystone/tests/unit/test_v3_trust.py +++ b/keystone/tests/unit/test_v3_trust.py @@ -566,6 +566,99 @@ def test_create_trust_with_application_credential(self): token=token_data.headers['x-subject-token'], expected_status=http.client.FORBIDDEN) + def _get_app_cred_token(self, unrestricted=False): + app_cred = { + 'id': uuid.uuid4().hex, + 'user_id': self.user_id, + 'project_id': self.project_id, + 'name': uuid.uuid4().hex, + 'roles': [{'id': self.role_id}], + 'secret': uuid.uuid4().hex, + } + if unrestricted: + app_cred['unrestricted'] = True + PROVIDERS.application_credential_api.create_application_credential( + app_cred + ) + auth_data = self.build_authentication_request( + app_cred_id=app_cred['id'], secret=app_cred['secret'] + ) + r = self.v3_create_token( + auth_data, expected_status=http.client.CREATED + ) + return r.headers['x-subject-token'] + + def test_create_trust_with_unrestricted_application_credential(self): + """Unrestricted app cred must also be blocked from creating trusts.""" + trust_body = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + role_ids=[self.role_id], + ) + self.post( + '/OS-TRUST/trusts', + body={'trust': trust_body}, + token=self._get_app_cred_token(unrestricted=True), + expected_status=http.client.FORBIDDEN, + ) + + def test_list_trusts_with_application_credential(self): + """App cred token must not be able to list trusts.""" + self.get( + '/OS-TRUST/trusts', + token=self._get_app_cred_token(), + expected_status=http.client.FORBIDDEN, + ) + + def test_get_trust_with_application_credential(self): + """App cred token must not be able to read a specific trust.""" + ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + role_ids=[self.role_id], + ) + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust_id = r.result['trust']['id'] + self.get( + f'/OS-TRUST/trusts/{trust_id}', + token=self._get_app_cred_token(), + expected_status=http.client.FORBIDDEN, + ) + + def test_list_trust_roles_with_application_credential(self): + """App cred token must not be able to list roles for a trust.""" + ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + role_ids=[self.role_id], + ) + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust_id = r.result['trust']['id'] + self.get( + f'/OS-TRUST/trusts/{trust_id}/roles', + token=self._get_app_cred_token(), + expected_status=http.client.FORBIDDEN, + ) + + def test_get_trust_role_with_application_credential(self): + """App cred token must not be able to get a specific trust role.""" + ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + role_ids=[self.role_id], + ) + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust_id = r.result['trust']['id'] + self.get( + f'/OS-TRUST/trusts/{trust_id}/roles/{self.role_id}', + token=self._get_app_cred_token(), + expected_status=http.client.FORBIDDEN, + ) + def test_delete_trust_with_application_credential(self): ref = unit.new_trust_ref( trustor_user_id=self.user_id, From ba55070037fe0f9122c8a81c522ad4eebd48f4c1 Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Tue, 12 May 2026 09:11:24 +0200 Subject: [PATCH 8/9] Preserve expires_at when rescoping federated tokens (CVE-2026-44394) When a federated token is rescoped via POST /v3/auth/tokens the handle_scoped_token function returned response_data without an expires_at value. Because issue_token falls back to default_expire_time when expires_at is None, each rescope issued a fresh full-TTL token instead of inheriting the remaining lifetime of the original token. A user with a federated token could extend their session indefinitely by rescoping repeatedly before expiry, bypassing operator-configured TTL policies and IdP-level account revocation. Fix: propagate token.expires_at from handle_scoped_token so that issue_token uses the original token's expiry rather than resetting to the default. The non-federated path in token.py already did this via response_data.setdefault('expires_at', token.expires_at). Closes-Bug: #2150379 Assisted-by: Claude Sonnet 4.6 Co-authored-by: Artem Goncharov Change-Id: I0bbb8520e12c52edd01fb47c873f0227819706f5 Signed-off-by: Grzegorz Grasza (cherry picked from commit 75a4a0c354c7f568b28dd85182dc729553fb3a33) (cherry picked from commit 778250f5ebe7846a3628b85fe3d1a77258fda20f) --- keystone/auth/plugins/mapped.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/keystone/auth/plugins/mapped.py b/keystone/auth/plugins/mapped.py index 7a45f109be..c562ffb84d 100644 --- a/keystone/auth/plugins/mapped.py +++ b/keystone/auth/plugins/mapped.py @@ -102,6 +102,10 @@ def handle_scoped_token(token, federation_api, identity_api): response_data['group_ids'] = group_ids response_data[federation_constants.IDENTITY_PROVIDER] = identity_provider response_data[federation_constants.PROTOCOL] = protocol + # Preserve the original token's expiry to prevent users from + # indefinitely extending their session by repeatedly rescoping. + # The non-federated path in token.py does the same via setdefault(). + response_data['expires_at'] = token.expires_at return response_data From 874b028c750c72dd76c134a376b450848c90f62b Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Tue, 12 May 2026 09:10:20 +0200 Subject: [PATCH 9/9] Prevent RBAC policy bypass via JSON body and query filters (CVE-2026-42999) The RBAC enforcer unconditionally merged the raw JSON request body into the policy enforcement dictionary after trusted target data had been set from the database. An attacker could include a "target" key in the JSON body to overwrite database-sourced RBAC target attributes, causing all %(target.*)s policy substitutions to evaluate against attacker-controlled values. This affected 88 endpoint/method combinations across all Keystone API resources. The fix namespaces user-controlled JSON body data under a "request_body" key in the policy dict, making it structurally impossible for request body fields to collide with internal keys like "target" or view_args. The only in-tree policy rule that depended on the old JSON body merge behavior was identity:create_trust, which referenced %(trust.trustor_user_id)s from the request body at the top level of the policy dict. This is updated to use target_attr and the %(target.trust.trustor_user_id)s substitution, consistent with all other trust policy rules. Additionally, query-string filter values had the same structural issue: _extract_filter_values() results were merged at the top level, meaning a filter key matching a view_arg key (e.g. user_id on /v3/users/{user_id}/... endpoints using ADMIN_OR_SYSTEM_READER_OR_OWNER) could be overwritten by an attacker-controlled ?user_id= query param, bypassing ownership checks. Filter values are now namespaced under "filter_attr". No in-tree policy rule references filter values via %(key)s substitutions, so this is backwards-compatible for upstream deployments. Closes-Bug: #2148398 Assisted-by: Claude Sonnet 4.6 Co-authored-by: Boris Bobrov Co-authored-by: Artem Goncharov Change-Id: I295d1ac27faad05a680bb2b3fac8cfa27fa1c4bd Signed-off-by: Grzegorz Grasza (cherry picked from commit 22b51f5d5d86350d3fbc66697e4097bacf2a8ce9) (cherry picked from commit fda75e53a350edde0bf510bd77912637afbdd40b) --- keystone/api/trusts.py | 4 +- keystone/common/policies/base.py | 2 +- keystone/common/rbac_enforcer/enforcer.py | 21 ++- .../tests/protection/v3/test_credentials.py | 120 ++++++++++++ keystone/tests/protection/v3/test_grants.py | 77 ++++++++ .../tests/unit/common/test_rbac_enforcer.py | 175 +++++++++++++++++- .../notes/bug-2148398-e35dd449b3a330e6.yaml | 40 ++++ 7 files changed, 429 insertions(+), 10 deletions(-) create mode 100644 releasenotes/notes/bug-2148398-e35dd449b3a330e6.yaml diff --git a/keystone/api/trusts.py b/keystone/api/trusts.py index 265d3be829..84e97b5ba5 100644 --- a/keystone/api/trusts.py +++ b/keystone/api/trusts.py @@ -296,8 +296,10 @@ def post(self): The User creating the trust must be the trustor. """ - ENFORCER.enforce_call(action='identity:create_trust') trust = self.request_body_json.get('trust', {}) + ENFORCER.enforce_call( + action='identity:create_trust', target_attr={'trust': trust} + ) validation.lazy_validate(schema.trust_create, trust) self._check_unrestricted() diff --git a/keystone/common/policies/base.py b/keystone/common/policies/base.py index c5cf4d8e41..41142d937a 100644 --- a/keystone/common/policies/base.py +++ b/keystone/common/policies/base.py @@ -30,7 +30,7 @@ RULE_SERVICE_ADMIN_OR_TOKEN_SUBJECT = ( 'rule:service_admin_or_token_subject') # nosec RULE_SERVICE_OR_ADMIN = 'rule:service_or_admin' -RULE_TRUST_OWNER = 'user_id:%(trust.trustor_user_id)s' +RULE_TRUST_OWNER = 'user_id:%(target.trust.trustor_user_id)s' # We are explicitly setting system_scope:all in these check strings because # they provide backwards compatibility in the event a deployment sets diff --git a/keystone/common/rbac_enforcer/enforcer.py b/keystone/common/rbac_enforcer/enforcer.py index 7add048ce8..aa9797f610 100644 --- a/keystone/common/rbac_enforcer/enforcer.py +++ b/keystone/common/rbac_enforcer/enforcer.py @@ -430,14 +430,21 @@ def enforce_call(cls, enforcer=None, action=None, target_attr=None, policy_dict['target'] = target_attr or build_target() - # Pull the data from the submitted json body to generate - # appropriate input/target attributes, we take an explicit copy here - # to ensure we're not somehow corrupting + # Pull the data from the submitted json body. We namespace it under + # 'request_body' to prevent user-controlled input from overwriting + # security-critical keys in the policy dict (e.g. 'target' populated + # by build_target/target_attr, or view_args like 'user_id'). json_input = flask.request.get_json(force=True, silent=True) or {} - policy_dict.update(json_input.copy()) - - # Generate the filter_attr dataset. - policy_dict.update(cls._extract_filter_values(filters)) + if json_input: + policy_dict['request_body'] = json_input.copy() + + # Namespace query-string filter values under 'filter_attr' to prevent + # attacker-controlled query params from overwriting view_args (e.g. + # user_id from /v3/users/{user_id}/...) or other trusted keys in the + # policy dict via %(key)s substitutions in policy rules. + filter_values = cls._extract_filter_values(filters) + if filter_values: + policy_dict['filter_attr'] = filter_values flattened = utils.flatten_dict(policy_dict) if LOG.logger.getEffectiveLevel() <= log.DEBUG: diff --git a/keystone/tests/protection/v3/test_credentials.py b/keystone/tests/protection/v3/test_credentials.py index 800452f670..11edec9fbe 100644 --- a/keystone/tests/protection/v3/test_credentials.py +++ b/keystone/tests/protection/v3/test_credentials.py @@ -18,11 +18,13 @@ from keystone.common.policies import base as bp from keystone.common import provider_api import keystone.conf +from keystone.credential.providers import fernet as credential_fernet from keystone.tests.common import auth as common_auth from keystone.tests import unit from keystone.tests.unit import base_classes from keystone.tests.unit import ksfixtures from keystone.tests.unit.ksfixtures import temporaryfile +from keystone.tests.unit import test_v3 CONF = keystone.conf.CONF PROVIDERS = provider_api.ProviderAPIs @@ -1250,3 +1252,121 @@ def setUp(self): r = c.post('/v3/auth/tokens', json=auth) self.token_id = r.headers['X-Subject-Token'] self.headers = {'X-Auth-Token': self.token_id} + + +class TargetInjectionCredentialTests(test_v3.RestfulTestCase): + """Test that JSON body injection cannot bypass credential RBAC. + + Verifies CVE-2026-42999: the RBAC enforcer must not allow the JSON + request body to overwrite security-critical keys in the policy dict. + """ + + def setUp(self): + super().setUp() + self.useFixture( + ksfixtures.KeyRepository( + self.config_fixture, + 'credential', + credential_fernet.MAX_ACTIVE_KEYS, + ) + ) + + def _make_user_with_project(self, role_id=None): + user = unit.create_user( + PROVIDERS.identity_api, domain_id=self.domain_id + ) + project = unit.new_project_ref(domain_id=self.domain_id) + PROVIDERS.resource_api.create_project(project['id'], project) + if role_id: + PROVIDERS.assignment_api.add_role_to_user_and_project( + user['id'], project['id'], role_id + ) + return user, project + + def test_list_credentials_cannot_read_other_users_secrets(self): + """GET /v3/credentials must not return other users' credentials. + + An attacker injects their own user_id into target.credential.user_id + in the JSON body. Without the fix the per-item policy filter would + see the attacker's user_id and pass every credential through. + """ + role = unit.new_role_ref() + PROVIDERS.role_api.create_role(role['id'], role) + + victim, victim_project = self._make_user_with_project(role['id']) + attacker, attacker_project = self._make_user_with_project(role['id']) + + victim_cred = unit.new_credential_ref( + user_id=victim['id'], project_id=victim_project['id'] + ) + PROVIDERS.credential_api.create_credential( + victim_cred['id'], victim_cred + ) + attacker_cred = unit.new_credential_ref( + user_id=attacker['id'], project_id=attacker_project['id'] + ) + PROVIDERS.credential_api.create_credential( + attacker_cred['id'], attacker_cred + ) + + attacker_auth = self.build_authentication_request( + user_id=attacker['id'], + password=attacker['password'], + project_id=attacker_project['id'], + ) + r = self.get( + '/credentials', + auth=attacker_auth, + body={'target': {'credential': {'user_id': attacker['id']}}}, + ) + + cred_ids = [c['id'] for c in r.result['credentials']] + self.assertIn(attacker_cred['id'], cred_ids) + self.assertNotIn(victim_cred['id'], cred_ids) + + def test_ec2_create_credential_cannot_create_for_other_user(self): + """EC2 credential creation must not allow impersonating other users. + + POST /v3/users/{user_id}/credentials/OS-EC2: the attacker injects + target.credential.user_id to bypass the ownership check. + """ + member_role = unit.new_role_ref(name='member') + PROVIDERS.role_api.create_role(member_role['id'], member_role) + + victim, victim_project = self._make_user_with_project( + member_role['id'] + ) + attacker, attacker_project = self._make_user_with_project( + member_role['id'] + ) + + attacker_auth = self.build_authentication_request( + user_id=attacker['id'], + password=attacker['password'], + project_id=attacker_project['id'], + ) + ec2_uri = f'/users/{victim["id"]}/credentials/OS-EC2' + + self.post( + ec2_uri, + auth=attacker_auth, + body={ + 'tenant_id': victim_project['id'], + 'target': {'credential': {'user_id': attacker['id']}}, + }, + expected_status=http.client.FORBIDDEN, + ) + self.post( + ec2_uri, + auth=attacker_auth, + body={ + 'tenant_id': victim_project['id'], + 'target': { + 'credential': { + 'user_id': attacker['id'], + 'project_id': attacker_project['id'], + } + }, + }, + expected_status=http.client.FORBIDDEN, + ) diff --git a/keystone/tests/protection/v3/test_grants.py b/keystone/tests/protection/v3/test_grants.py index bb74b09014..b4cbf45891 100644 --- a/keystone/tests/protection/v3/test_grants.py +++ b/keystone/tests/protection/v3/test_grants.py @@ -23,6 +23,7 @@ from keystone.tests.unit import base_classes from keystone.tests.unit import ksfixtures from keystone.tests.unit.ksfixtures import temporaryfile +from keystone.tests.unit import test_v3 CONF = keystone.conf.CONF PROVIDERS = provider_api.ProviderAPIs @@ -2275,3 +2276,79 @@ def test_cannot_revoke_grant_from_group_on_domain(self): headers=self.headers, expected_status_code=http.client.FORBIDDEN ) + + +class TargetInjectionGrantTests(test_v3.RestfulTestCase): + """Test that JSON body injection cannot bypass grant RBAC. + + Verifies CVE-2026-42999: the RBAC enforcer must not allow the JSON + request body to overwrite security-critical keys in the policy dict. + """ + + def setUp(self): + super().setUp() + policy_file = self.useFixture(temporaryfile.SecureTempFile()) + self.useFixture( + ksfixtures.Policy( + self.config_fixture, policy_file=policy_file.file_name + ) + ) + with open(policy_file.file_name, 'w') as f: + overrides = { + 'identity:create_grant': ( + '(role:admin and system_scope:all) or ' + '(role:admin and ' + 'domain_id:%(target.user.domain_id)s and ' + 'domain_id:%(target.domain.id)s) and ' + '(domain_id:%(target.role.domain_id)s or ' + 'None:%(target.role.domain_id)s)' + ) + } + f.write(jsonutils.dumps(overrides)) + + def test_inherited_grant_cannot_escalate_cross_domain(self): + """PUT OS-INHERIT grant must not allow cross-domain escalation. + + A domain admin in domain A tries to create an inherited admin role + grant on domain B by injecting target data. Without the fix the + policy would see all domains matching the attacker's domain. + """ + domain_a = unit.new_domain_ref() + PROVIDERS.resource_api.create_domain(domain_a['id'], domain_a) + attacker = unit.create_user( + PROVIDERS.identity_api, domain_id=domain_a['id'] + ) + admin_role = self.role + PROVIDERS.assignment_api.create_grant( + admin_role['id'], user_id=attacker['id'], domain_id=domain_a['id'] + ) + + domain_b = unit.new_domain_ref() + PROVIDERS.resource_api.create_domain(domain_b['id'], domain_b) + + attacker_auth = self.build_authentication_request( + user_id=attacker['id'], + password=attacker['password'], + domain_id=domain_a['id'], + ) + + inherit_url = ( + '/OS-INHERIT/domains/{domain_id}/users/{user_id}' + '/roles/{role_id}/inherited_to_projects' + ).format( + domain_id=domain_b['id'], + user_id=attacker['id'], + role_id=admin_role['id'], + ) + self.put( + inherit_url, + auth=attacker_auth, + body={ + 'target': { + 'user': {'domain_id': domain_a['id']}, + 'domain': {'id': domain_a['id']}, + 'role': {'domain_id': None, 'name': 'member'}, + } + }, + expected_status=http.client.FORBIDDEN, + ) diff --git a/keystone/tests/unit/common/test_rbac_enforcer.py b/keystone/tests/unit/common/test_rbac_enforcer.py index b235eb29d3..b83bbcb724 100644 --- a/keystone/tests/unit/common/test_rbac_enforcer.py +++ b/keystone/tests/unit/common/test_rbac_enforcer.py @@ -176,7 +176,7 @@ def _testing_policy_rules(self): ), policy.RuleDefault( name='example:with_filter', - check_str='user_id:%(user)s', + check_str='user_id:%(filter_attr.user)s', scope_types=['project'], ), policy.RuleDefault( @@ -423,6 +423,135 @@ def test_extract_member_target_data_bad_input(self): self.assertEqual({}, self.enforcer._extract_member_target_data( member_target={}, member_target_type=None)) + def test_json_body_cannot_overwrite_build_target(self): + # Verify that a JSON request body cannot overwrite the target + # data populated by build_target. The enforcer must namespace + # user-controlled JSON input so it cannot collide with the + # trusted 'target' key set from the database. + assertIn = self.assertIn + assertEq = self.assertEqual + + real_owner_id = uuid.uuid4().hex + attacker_id = uuid.uuid4().hex + + def _enforce_mock_func(credentials, action, target, do_raise=True): + assertIn('target.credential.user_id', target) + assertEq(target['target.credential.user_id'], real_owner_id) + + def _build_target(): + return {'credential': {'user_id': real_owner_id}} + + self.useFixture( + fixtures.MockPatchObject( + self.enforcer, '_enforce', _enforce_mock_func + ) + ) + + with self.test_client() as c: + path = '/v3/auth/tokens' + body = self._auth_json() + r = c.post( + path, + json=body, + follow_redirects=True, + expected_status_code=201, + ) + token_id = r.headers['X-Subject-Token'] + + # Send a request with a JSON body that attempts to + # overwrite the build_target-supplied credential owner. + c.get( + f'{self.restful_api_url_prefix}/argument/{uuid.uuid4().hex}', + headers={'X-Auth-Token': token_id}, + json={'target': {'credential': {'user_id': attacker_id}}}, + ) + self.enforcer.enforce_call( + action='example:allowed', build_target=_build_target + ) + + def test_json_body_cannot_overwrite_target_attr(self): + # Verify that a JSON request body cannot overwrite the target + # data populated by target_attr. The enforcer must namespace + # user-controlled JSON input so it cannot collide with the + # trusted 'target' key set explicitly by the API handler. + assertIn = self.assertIn + assertEq = self.assertEqual + + real_owner_id = uuid.uuid4().hex + attacker_id = uuid.uuid4().hex + + def _enforce_mock_func(credentials, action, target, do_raise=True): + assertIn('target.credential.user_id', target) + assertEq(target['target.credential.user_id'], real_owner_id) + + self.useFixture( + fixtures.MockPatchObject( + self.enforcer, '_enforce', _enforce_mock_func + ) + ) + + with self.test_client() as c: + path = '/v3/auth/tokens' + body = self._auth_json() + r = c.post( + path, + json=body, + follow_redirects=True, + expected_status_code=201, + ) + token_id = r.headers['X-Subject-Token'] + + c.get( + f'{self.restful_api_url_prefix}/argument/{uuid.uuid4().hex}', + headers={'X-Auth-Token': token_id}, + json={'target': {'credential': {'user_id': attacker_id}}}, + ) + target_attr = {'credential': {'user_id': real_owner_id}} + self.enforcer.enforce_call( + action='example:allowed', target_attr=target_attr + ) + + def test_json_body_cannot_overwrite_view_args(self): + # Verify that a JSON request body cannot overwrite URL path + # parameters (view_args) in the policy dict. The enforcer must + # namespace user-controlled JSON input so it cannot collide + # with trusted view_args like 'user_id' or 'argument_id'. + assertIn = self.assertIn + assertEq = self.assertEqual + + real_argument_id = uuid.uuid4().hex + injected_argument_id = uuid.uuid4().hex + + def _enforce_mock_func(credentials, action, target, do_raise=True): + assertIn('argument_id', target) + assertEq(target['argument_id'], real_argument_id) + + self.useFixture( + fixtures.MockPatchObject( + self.enforcer, '_enforce', _enforce_mock_func + ) + ) + + with self.test_client() as c: + path = '/v3/auth/tokens' + body = self._auth_json() + r = c.post( + path, + json=body, + follow_redirects=True, + expected_status_code=201, + ) + token_id = r.headers['X-Subject-Token'] + + # URL has argument_id=real_argument_id, but the JSON body + # tries to overwrite it. + c.get( + (f'{self.restful_api_url_prefix}/argument/{real_argument_id}'), + headers={'X-Auth-Token': token_id}, + json={'argument_id': injected_argument_id}, + ) + self.enforcer.enforce_call(action='example:allowed') + def test_call_build_enforcement_target(self): assertIn = self.assertIn assertEq = self.assertEqual @@ -638,6 +767,50 @@ def test_enforce_call_with_filter_values(self): self.enforcer.enforce_call, action='example:with_filter') + def test_query_filter_cannot_overwrite_view_args(self): + """Query-string filter values must not overwrite view_args in policy dict. + + Before the fix, policy_dict.update(filter_values) ran after + policy_dict.update(view_args). If a filter key matched a view_arg key + (e.g. both named 'user_id'), a ?user_id=attacker query param would + overwrite the URL-path-sourced value used in %(user_id)s policy + substitutions, bypassing ownership checks such as + ADMIN_OR_SYSTEM_READER_OR_OWNER on /v3/users/{user_id}/... endpoints. + """ + real_arg_id = uuid.uuid4().hex + injected_arg_id = uuid.uuid4().hex + seen = {} + + def _capture_enforce(credentials, action, target, do_raise=True): + seen.update(target) + + self.useFixture( + fixtures.MockPatchObject( + self.enforcer, '_enforce', _capture_enforce + ) + ) + + with self.test_client() as c: + r = c.post( + '/v3/auth/tokens', + json=self._auth_json(), + expected_status_code=201, + ) + token_id = r.headers['X-Subject-Token'] + c.get( + f'{self.restful_api_url_prefix}/argument/{real_arg_id}' + f'?argument_id={injected_arg_id}', + headers={'X-Auth-Token': token_id}, + ) + self.enforcer.enforce_call( + action='example:allowed', filters=['argument_id'] + ) + + # view_arg survives: argument_id at the top level is real_arg_id + self.assertEqual(real_arg_id, seen.get('argument_id')) + # filter value is namespaced, not overwriting view_arg + self.assertEqual(injected_arg_id, seen.get('filter_attr.argument_id')) + def test_enforce_call_with_pre_instantiated_enforcer(self): token_path = '/v3/auth/tokens' auth_json = self._auth_json() diff --git a/releasenotes/notes/bug-2148398-e35dd449b3a330e6.yaml b/releasenotes/notes/bug-2148398-e35dd449b3a330e6.yaml new file mode 100644 index 0000000000..1685cc04f8 --- /dev/null +++ b/releasenotes/notes/bug-2148398-e35dd449b3a330e6.yaml @@ -0,0 +1,40 @@ +--- +critical: + - | + [`bug 2148398 `_] + The RBAC enforcer unconditionally merged the raw JSON request body into + the policy enforcement dictionary after trusted target data had been set + from the database. An attacker could include a ``target`` key in the JSON + body to overwrite database-sourced RBAC target attributes, causing all + ``%(target.*)s`` policy substitutions to evaluate against + attacker-controlled values. This affected 88 endpoint/method combinations + across all Keystone API resource areas. Any authenticated user could + exploit this to read every credential secret in the deployment, create + EC2 credentials for arbitrary users, or revoke other users' tokens. A + domain administrator could escalate to full cloud admin by creating + inherited role grants on other domains. The vulnerability has been present + since the Rocky release (14.0.0). +security: + - | + [`bug 2148398 `_] + The RBAC policy enforcer now namespaces JSON request body data under a + ``request_body`` key in the policy dictionary instead of merging it at + the top level. This prevents user-controlled input from overwriting + security-critical keys such as ``target`` (populated from the database + by ``build_target`` or ``target_attr``) and URL path parameters like + ``user_id``. All upstream policy rules are unaffected by this change. + Deployments with custom policy rules that reference JSON body fields + directly via ``%(field_name)s`` substitutions (not under ``target.``) + will need to update those references to ``%(request_body.field_name)s``. +upgrade: + - | + [`bug 2148398 `_] + The ``identity:create_trust`` policy rule now uses + ``%(target.trust.trustor_user_id)s`` instead of + ``%(trust.trustor_user_id)s``. The trust data from the request body is + now passed explicitly via ``target_attr`` rather than relying on the + JSON body merge. This aligns ``create_trust`` with all other trust + policy rules which already use the ``target.trust.*`` prefix. + Deployments that override the ``identity:create_trust`` policy and + reference ``%(trust.trustor_user_id)s`` must update to + ``%(target.trust.trustor_user_id)s``.