From 0853c9fd512a0f92895cfba411b866cf67ace78e Mon Sep 17 00:00:00 2001 From: Boris Bobrov Date: Tue, 7 Apr 2026 23:45:15 +0200 Subject: [PATCH 1/7] Block restricted app creds from creating EC2 credentials via /credentials The POST /v3/credentials endpoint accepted EC2 credential creation from restricted application credential tokens, bypassing the guard on the dedicated OS-EC2 endpoint. Add the same unrestricted application credential check to the generic credentials API for EC2-type credentials, and update the existing test to use an unrestricted application credential. Related-Bug: #2142138 Generated-By: claude-opus-4-6 (OpenCode) Signed-off-by: Boris Bobrov Change-Id: Idb192a2fd370fc26c7d76788e9ad1856483d3239 --- keystone/api/credentials.py | 19 +++++++--- keystone/tests/unit/test_v3_credential.py | 42 +++++++++++++++++++++-- 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/keystone/api/credentials.py b/keystone/api/credentials.py index 90b53dd686..09304185a4 100644 --- a/keystone/api/credentials.py +++ b/keystone/api/credentials.py @@ -32,6 +32,16 @@ ENFORCER = rbac_enforcer.RBACEnforcer +def _check_unrestricted_application_credential(token): + if 'application_credential' in token.methods: + if not token.application_credential['unrestricted']: + action = _( + "Using method 'application_credential' is not " + "allowed for managing additional credentials." + ) + raise exception.ForbiddenAction(action=action) + + def _build_target_enforcement(): target = {} try: @@ -155,12 +165,13 @@ def post(self): ENFORCER.enforce_call( action='identity:create_credential', target_attr=target ) + token = self.auth_context['token'] + if credential.get('type', '').lower() == 'ec2': + _check_unrestricted_application_credential(token) validation.lazy_validate(schema.credential_create, credential) trust_id = getattr(self.oslo_context, 'trust_id', None) - app_cred_id = getattr( - self.auth_context['token'], 'application_credential_id', None) - access_token_id = getattr( - self.auth_context['token'], 'access_token_id', None) + app_cred_id = getattr(token, 'application_credential_id', None) + access_token_id = getattr(token, 'access_token_id', None) ref = self._assign_unique_id( self._normalize_dict(credential), trust_id=trust_id, app_cred_id=app_cred_id, diff --git a/keystone/tests/unit/test_v3_credential.py b/keystone/tests/unit/test_v3_credential.py index b0d0e7fcc6..b0a1a06e08 100644 --- a/keystone/tests/unit/test_v3_credential.py +++ b/keystone/tests/unit/test_v3_credential.py @@ -688,9 +688,11 @@ def test_app_cred_ec2_credential(self): Call ``POST /credentials``. """ - # Create the app cred + # Create an unrestricted app cred (restricted app creds are + # blocked from creating EC2 credentials) ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}]) del ref['id'] + ref['unrestricted'] = True r = self.post('/users/%s/application_credentials' % self.user_id, body={'application_credential': ref}) app_cred = r.result['application_credential'] @@ -741,7 +743,43 @@ def test_app_cred_ec2_credential(self): '/credentials', body={'credential': ref}, token=token_id, - expected_status=http.client.CONFLICT) + expected_status=http.client.CONFLICT, + ) + + def _get_app_cred_token(self, unrestricted=False): + """Create an application credential and return its token.""" + ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}]) + del ref['id'] + if unrestricted: + ref['unrestricted'] = True + r = self.post( + f'/users/{self.user_id}/application_credentials', + body={'application_credential': ref}, + ) + app_cred = r.result['application_credential'] + auth_data = self.build_authentication_request( + app_cred_id=app_cred['id'], secret=app_cred['secret'] + ) + r = self.v3_create_token(auth_data) + return r.headers.get('X-Subject-Token') + + def test_restricted_app_cred_cannot_create_ec2_credential(self): + """Test that a restricted app cred cannot create EC2 credentials. + + A restricted application credential must not be allowed to + create EC2 credentials via POST /credentials either, as this + would bypass the guard on the OS-EC2 endpoint. + """ + token_id = self._get_app_cred_token(unrestricted=False) + blob, ref = unit.new_ec2_credential( + user_id=self.user_id, project_id=self.project_id + ) + self.post( + '/credentials', + body={'credential': ref}, + token=token_id, + expected_status=http.client.FORBIDDEN, + ) class TestCredentialAccessToken(CredentialBaseTestCase): From 01fb5bdfea0e7af7aba5b213c14b7769bee3a28d Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Wed, 22 Apr 2026 13:23:44 +0200 Subject: [PATCH 2/7] Enforce app cred project boundary on EC2 credential paths POST /v3/credentials did not validate that the caller-supplied project_id for an EC2-type credential matched the project of the authenticating application credential. This allowed an attacker holding an unrestricted application credential for project A to create an EC2 credential targeting project B; a subsequent /v3/ec2tokens exchange would then issue a Keystone token scoped to project B while still carrying the original app_cred_id, enabling cross-project lateral movement within the credential owner's role footprint. Two fixes: 1. credentials.py: after extracting app_cred_id from the token, check that credential['project_id'] == app_cred['project_id'] for EC2-type credentials and raise ForbiddenAction otherwise. 2. EC2_S3_Resource.py: in handle_authenticate(), assert that the stored EC2 credential project_id matches the application credential's project before issuing the token. This issue is orthogonal to CVE-2026-33551 (LP#2142138 / Gerrit 983655), which blocks restricted application credentials from creating EC2 credentials at all. The project-boundary check is absent regardless of the restricted flag and requires separate treatment. Closes-Bug: #2149775 Related-Bug: #OSPRH-29345 Assisted-by: claude-sonnet-4-6 Change-Id: I7c10c8a52e57e63cb9c66d03d69540abefe5425c Signed-off-by: Grzegorz Grasza (cherry picked from commit b6fd80996b882890a51f3e2aab41d952d7ff68ae) (cherry picked from commit d9e18a37888cabdea919c58b24f630fd722aa8b0) --- keystone/api/_shared/EC2_S3_Resource.py | 7 ++ keystone/api/credentials.py | 13 ++++ keystone/tests/unit/test_v3_credential.py | 92 +++++++++++++++++++++++ 3 files changed, 112 insertions(+) diff --git a/keystone/api/_shared/EC2_S3_Resource.py b/keystone/api/_shared/EC2_S3_Resource.py index 7b2fc21b29..80ef7fc35e 100644 --- a/keystone/api/_shared/EC2_S3_Resource.py +++ b/keystone/api/_shared/EC2_S3_Resource.py @@ -155,6 +155,13 @@ def handle_authenticate(self): app_cred = ac_client.get_application_credential( cred_data['app_cred_id']) roles = [r['id'] for r in app_cred['roles']] + if cred_data['project_id'] != app_cred['project_id']: + raise ks_exceptions.Unauthorized( + _( + 'EC2 credential project does not match the ' + 'application credential project.' + ) + ) elif cred_data['access_token_id']: access_token = PROVIDERS.oauth_api.get_access_token( cred_data['access_token_id']) diff --git a/keystone/api/credentials.py b/keystone/api/credentials.py index 09304185a4..068e9860aa 100644 --- a/keystone/api/credentials.py +++ b/keystone/api/credentials.py @@ -172,6 +172,19 @@ def post(self): trust_id = getattr(self.oslo_context, 'trust_id', None) app_cred_id = getattr(token, 'application_credential_id', None) access_token_id = getattr(token, 'access_token_id', None) + if ( + app_cred_id is not None + and credential.get('type', '').lower() == 'ec2' + ): + ac_api = PROVIDERS.application_credential_api + app_cred = ac_api.get_application_credential(app_cred_id) + if credential.get('project_id') != app_cred['project_id']: + action = _( + 'EC2 credential project_id must match the ' + 'project of the application credential used ' + 'to authenticate' + ) + raise exception.ForbiddenAction(action=action) ref = self._assign_unique_id( self._normalize_dict(credential), trust_id=trust_id, app_cred_id=app_cred_id, diff --git a/keystone/tests/unit/test_v3_credential.py b/keystone/tests/unit/test_v3_credential.py index b0a1a06e08..c3c3a7b55e 100644 --- a/keystone/tests/unit/test_v3_credential.py +++ b/keystone/tests/unit/test_v3_credential.py @@ -781,6 +781,98 @@ def test_restricted_app_cred_cannot_create_ec2_credential(self): expected_status=http.client.FORBIDDEN, ) + def test_app_cred_ec2_credential_cross_project_forbidden(self): + """EC2 credential project_id must match the app cred project. + + An unrestricted app cred scoped to project A must not be used to + create an EC2 credential targeting a different project B. + + Call ``POST /credentials``. + """ + token_id = self._get_app_cred_token(unrestricted=True) + + other_project = unit.new_project_ref(domain_id=self.domain_id) + PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + + _, ec2_ref = unit.new_ec2_credential( + user_id=self.user_id, project_id=other_project['id'] + ) + self.post( + '/credentials', + body={'credential': ec2_ref}, + token=token_id, + expected_status=http.client.FORBIDDEN, + ) + + def test_app_cred_ec2_auth_cross_project_rejected(self): + """EC2 auth is rejected when credential project differs from app cred. + + A pre-existing EC2 credential whose project_id does not match the + linked application credential's project must be rejected at + authentication time, preventing cross-project lateral movement. + + Call ``POST /ec2tokens``. + """ + ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}]) + del ref['id'] + r = self.post( + f'/users/{self.user_id}/application_credentials', + body={'application_credential': ref}, + ) + app_cred = r.result['application_credential'] + + other_project = unit.new_project_ref(domain_id=self.domain_id) + PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + + # Bypass the API to plant a credential with a mismatched project_id. + # This simulates a credential that existed before the creation-time + # check was added, or one created via a direct DB write. + blob = { + 'access': uuid.uuid4().hex, + 'secret': uuid.uuid4().hex, + 'trust_id': None, + 'app_cred_id': app_cred['id'], + } + _, ec2_ref = unit.new_ec2_credential( + user_id=self.user_id, project_id=other_project['id'], blob=blob + ) + PROVIDERS.credential_api.create_credential(ec2_ref['id'], ec2_ref) + + signer = ec2_utils.Ec2Signer(blob['secret']) + params = { + 'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': blob['access'], + } + request = { + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params, + } + sig_ref = { + 'access': blob['access'], + 'signature': signer.generate(request), + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params, + } + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.role_id + ) + token = self.get_system_scoped_token() + self.post( + '/ec2tokens', + body={'ec2Credentials': sig_ref}, + token=token, + expected_status=http.client.UNAUTHORIZED, + ) + class TestCredentialAccessToken(CredentialBaseTestCase): """Test credential with access token.""" From 50109fcb6a192cf348fb6c5299f7d882db6c1ed7 Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Thu, 23 Apr 2026 10:13:20 +0200 Subject: [PATCH 3/7] Enforce delegation project boundary for delegated tokens Delegated tokens (trusts, application credentials, OAuth1 access tokens) are scoped to a single project at delegation time. This must be enforced thoroughly while granting the API access to Keystone resources that might be also bound to a single project. Without this it is possible to gain different access (using trust to see application credentials for a different project, reuse the MFA seed, etc). * Credentials CRUD (/v3/credentials) All five CRUD operations verified ownership via user_id but did not bind credential.project_id to the delegating token's project scope. Fix: _check_credential_project_scope() - no-op for non-delegated tokens, raises ForbiddenAction on project mismatch. For list, out-of-scope credentials are silently filtered. Credentials with project_id=None (TOTP/MFA bindings) are treated as out-of-scope for any delegated token: they are user-level secrets with no project anchor, and a delegated token should never be able to enumerate, read, or mutate them - doing so would allow a stolen delegation token to exfiltrate or destroy a user's MFA binding. * OS-EC2 credential CRUD (/v3/users/{id}/credentials/OS-EC2) POST accepted any tenant_id from a delegated token. GET and DELETE had no delegation check at all. Fix: _check_delegation_for_ec2() enforces the project boundary; list silently filters. Additionally, pre-existing OAuth1 access-token-backed EC2 credentials with a mismatched project_id could be used at auth-time (POST /v3/ec2tokens) to obtain a cross-project token. Added a check in EC2_S3_Resource.py that cred_data['project_id'] matches access_token['project_id'] before issuing the token. The trust branch does not need this check - the token provider uses the trust's project regardless of the credential's project_id. * OS-OAUTH1 access token management (/v3/users/{id}/OS-OAUTH1/access_tokens) GET and DELETE had no delegation check. List blocked trust/OAuth but not app-cred tokens. Fix: _block_delegated_token() raises Forbidden for any delegation type on list, get, and delete. * Application credential management (/v3/users/{id}/application_credentials) Trust-scoped and OAuth1 tokens had no guard on the application credential and access rule management APIs. An impersonating trust could LIST, CREATE, or DELETE application credentials, creating a persistent backdoor that outlives the trust's own expiry. App credential tokens are intentionally excluded - the unrestricted/restricted distinction is handled separately by _check_unrestricted_application_credential. Fix: _block_delegated_token_app_creds() raises Forbidden for trust-scoped and OAuth1 tokens on all six app credential and access rule endpoints. Closes-Bug: #2150089 Related-Bug: #2149789 Related-Bug: #2149775 Assisted-by: Claude Sonnet 4.6 Change-Id: Iaaa0ec713a0a5e062acc3209d6010982899d8f6f Signed-off-by: Grzegorz Grasza Signed-off-by: Artem Goncharov (cherry picked from commit 16582e5192be354e26ebef4badca1213ddc4dc07) --- keystone/api/_shared/EC2_S3_Resource.py | 7 + keystone/api/credentials.py | 58 ++- keystone/api/users.py | 127 +++++- keystone/conf/security_compliance.py | 69 ++- .../unit/test_v3_application_credential.py | 328 ++++++++++++++- keystone/tests/unit/test_v3_credential.py | 392 ++++++++++++++++++ keystone/tests/unit/test_v3_oauth1.py | 112 +++++ .../notes/bug-2150089-e91b592c948e5771.yaml | 35 ++ 8 files changed, 1099 insertions(+), 29 deletions(-) create mode 100644 releasenotes/notes/bug-2150089-e91b592c948e5771.yaml diff --git a/keystone/api/_shared/EC2_S3_Resource.py b/keystone/api/_shared/EC2_S3_Resource.py index 80ef7fc35e..5e6f3c062a 100644 --- a/keystone/api/_shared/EC2_S3_Resource.py +++ b/keystone/api/_shared/EC2_S3_Resource.py @@ -166,6 +166,13 @@ def handle_authenticate(self): access_token = PROVIDERS.oauth_api.get_access_token( cred_data['access_token_id']) roles = jsonutils.loads(access_token['role_ids']) + if cred_data['project_id'] != access_token['project_id']: + raise ks_exceptions.Unauthorized( + _( + 'EC2 credential project does not match the ' + 'OAuth1 access token project.' + ) + ) auth_context = {'access_token_id': cred_data['access_token_id']} else: roles = PROVIDERS.assignment_api.get_roles_for_user_and_project( diff --git a/keystone/api/credentials.py b/keystone/api/credentials.py index 068e9860aa..6b0c0317a6 100644 --- a/keystone/api/credentials.py +++ b/keystone/api/credentials.py @@ -42,6 +42,48 @@ def _check_unrestricted_application_credential(token): raise exception.ForbiddenAction(action=action) +def _check_credential_project_scope(token, oslo_context, credential): + """Enforce project boundary for delegated tokens. + + Non-delegated tokens (password, totp, etc.) are not restricted here -- + an admin with a regular token can legitimately manage credentials across + projects. Delegated tokens (trusts, application credentials, OAuth1) are + always bound to a single project at delegation time; only credentials + whose project_id exactly matches the token's project scope are in bounds. + + Credentials with project_id=None (e.g. TOTP/MFA bindings) are treated as + out-of-scope for any delegated token: they are user-level secrets with no + project anchor, and a delegated token should never be able to enumerate, + read, or mutate them -- doing so would allow a stolen delegation token to + exfiltrate or destroy a user's MFA binding. + """ + trust_id = getattr(oslo_context, 'trust_id', None) + app_cred_id = getattr(token, 'application_credential_id', None) + access_token_id = getattr(token, 'access_token_id', None) + + if not (trust_id or app_cred_id or access_token_id): + return + + token_project_id = oslo_context.project_id + cred_project_id = credential.get('project_id') + + if cred_project_id != token_project_id: + if CONF.security_compliance.allow_insecure_admin_trust_cross_project_credentials_access: + # When insecure cross-project access is enabled, still restrict to + # admin-role delegated tokens only. See LP#2150089. + try: + ENFORCER.enforce_call(action='admin_required') + return + except Exception: + pass + raise exception.ForbiddenAction( + action=_( + 'Credential project does not match the ' + 'project scope of the delegated token' + ) + ) + + def _build_target_enforcement(): target = {} try: @@ -122,6 +164,7 @@ def _list_credentials(self): # If the request was filtered, make sure to return only the # credentials specific to that user. This makes it so that users with # roles on projects can't see credentials that aren't theirs. + token = self.auth_context['token'] filtered_refs = [] for ref in refs: # Check each credential again to make sure the user has access to @@ -134,8 +177,9 @@ def _list_credentials(self): action='identity:get_credential', target_attr={'credential': cred} ) + _check_credential_project_scope(token, self.oslo_context, cred) filtered_refs.append(ref) - except exception.Forbidden: + except (exception.Forbidden, exception.ForbiddenAction): pass refs = filtered_refs refs = [self._blob_to_json(r) for r in refs] @@ -147,6 +191,9 @@ def _get_credential(self, credential_id): build_target=_build_target_enforcement ) credential = PROVIDERS.credential_api.get_credential(credential_id) + _check_credential_project_scope( + self.auth_context['token'], self.oslo_context, credential + ) return self.wrap_member(self._blob_to_json(credential)) def get(self, credential_id=None): @@ -172,6 +219,7 @@ def post(self): trust_id = getattr(self.oslo_context, 'trust_id', None) app_cred_id = getattr(token, 'application_credential_id', None) access_token_id = getattr(token, 'access_token_id', None) + _check_credential_project_scope(token, self.oslo_context, credential) if ( app_cred_id is not None and credential.get('type', '').lower() == 'ec2' @@ -213,7 +261,9 @@ def patch(self, credential_id): build_target=_build_target_enforcement ) current = PROVIDERS.credential_api.get_credential(credential_id) - + _check_credential_project_scope( + self.auth_context['token'], self.oslo_context, current + ) credential = self.request_body_json.get('credential', {}) validation.lazy_validate(schema.credential_update, credential) self._validate_blob_update_keys(current.copy(), credential.copy()) @@ -233,6 +283,10 @@ def delete(self, credential_id): action='identity:delete_credential', build_target=_build_target_enforcement ) + credential = PROVIDERS.credential_api.get_credential(credential_id) + _check_credential_project_scope( + self.auth_context['token'], self.oslo_context, credential + ) return (PROVIDERS.credential_api.delete_credential( credential_id, initiator=self.audit_initiator), diff --git a/keystone/api/users.py b/keystone/api/users.py index 41c0c5011a..eb7b1366ec 100644 --- a/keystone/api/users.py +++ b/keystone/api/users.py @@ -92,6 +92,76 @@ def _check_unrestricted_application_credential(token): raise ks_exception.ForbiddenAction(action=action) +def _is_delegated_token(oslo_context, token): + """Return True if the token is any form of delegation.""" + trust_id = getattr(oslo_context, 'trust_id', None) + app_cred_id = getattr(token, 'application_credential_id', None) + access_token_id = getattr(token, 'access_token_id', None) + return bool(trust_id or app_cred_id or access_token_id) + + +def _check_delegation_for_ec2(oslo_context, token, project_id): + """For delegated tokens raise unless project_id exactly matches scope. + + Credentials with project_id=None (user-scoped secrets such as TOTP) are + treated as out-of-scope: a delegated token must not read or modify them. + """ + if not _is_delegated_token(oslo_context, token): + return + if project_id != oslo_context.project_id: + raise ks_exception.ForbiddenAction( + action=_( + 'EC2 credential project does not match the ' + 'project scope of the delegated token' + ) + ) + + +def _block_delegated_token(oslo_context, token): + """Raise Forbidden if the token is any form of delegation.""" + if oslo_context.is_delegated_auth: + raise ks_exception.Forbidden( + _( + 'Cannot manage OAuth access tokens with a token ' + 'issued via delegation.' + ) + ) + if 'application_credential' in token.methods: + raise ks_exception.Forbidden( + _( + 'Cannot manage OAuth access tokens with a token ' + 'issued via delegation.' + ) + ) + + +def _block_delegated_token_app_creds(oslo_context, token): + """Raise Forbidden if the token is a trust or OAuth1 delegation. + + Trust-scoped and OAuth1 access token-scoped tokens must not be used to + create, list, read, or delete application credentials or access rules. + Creating an application credential via such a token produces a persistent + credential that outlives the delegation's expiry or scope, providing a + backdoor that breaks the accountability model: the trust-scoped token + carries the full delegation chain enabling audit, but a derived application + credential does not. + + Application credential tokens are intentionally excluded from this check. + The unrestricted/restricted distinction for application credentials is a + documented feature handled separately by + _check_unrestricted_application_credential. + """ + trust_id = getattr(oslo_context, 'trust_id', None) + access_token_id = getattr(token, 'access_token_id', None) + if trust_id or access_token_id: + raise ks_exception.Forbidden( + _( + 'Cannot manage application credentials with a token ' + 'issued via delegation.' + ) + ) + + def _build_user_target_enforcement(): target = {} try: @@ -372,11 +442,18 @@ def get(self, user_id): ENFORCER.enforce_call(action='identity:ec2_list_credentials') PROVIDERS.identity_api.get_user(user_id) credential_refs = PROVIDERS.credential_api.list_credentials_for_user( - user_id, type=CRED_TYPE_EC2) - collection_refs = [ - _convert_v3_to_ec2_credential(cred) - for cred in credential_refs - ] + user_id, type=CRED_TYPE_EC2 + ) + token = self.auth_context['token'] + collection_refs = [] + for cred in credential_refs: + try: + _check_delegation_for_ec2( + self.oslo_context, token, cred.get('project_id') + ) + except (ks_exception.Forbidden, ks_exception.ForbiddenAction): + continue + collection_refs.append(_convert_v3_to_ec2_credential(cred)) return self.wrap_collection(collection_refs) def post(self, user_id): @@ -393,6 +470,7 @@ def post(self, user_id): PROVIDERS.identity_api.get_user(user_id) tenant_id = self.request_body_json.get('tenant_id') PROVIDERS.resource_api.get_project(tenant_id) + _check_delegation_for_ec2(self.oslo_context, token, tenant_id) blob = dict( access=uuid.uuid4().hex, secret=uuid.uuid4().hex, @@ -413,12 +491,12 @@ def post(self, user_id): class UserOSEC2CredentialsResourceGetDelete(_UserOSEC2CredBaseResource): @staticmethod - def _get_cred_data(credential_id): + def _get_raw_cred(credential_id): cred = PROVIDERS.credential_api.get_credential(credential_id) if not cred or cred['type'] != CRED_TYPE_EC2: raise ks_exception.Unauthorized( message=_('EC2 access key not found.')) - return _convert_v3_to_ec2_credential(cred) + return cred def get(self, user_id, credential_id): """Get a specific EC2 credential. @@ -431,8 +509,13 @@ def get(self, user_id, credential_id): build_target=func) PROVIDERS.identity_api.get_user(user_id) ec2_cred_id = utils.hash_access_key(credential_id) - cred_data = self._get_cred_data(ec2_cred_id) - return self.wrap_member(cred_data) + cred = self._get_raw_cred(ec2_cred_id) + _check_delegation_for_ec2( + self.oslo_context, + self.auth_context['token'], + cred.get('project_id'), + ) + return self.wrap_member(_convert_v3_to_ec2_credential(cred)) def delete(self, user_id, credential_id): """Delete a specific EC2 credential. @@ -444,7 +527,12 @@ def delete(self, user_id, credential_id): build_target=func) PROVIDERS.identity_api.get_user(user_id) ec2_cred_id = utils.hash_access_key(credential_id) - self._get_cred_data(ec2_cred_id) + cred = self._get_raw_cred(ec2_cred_id) + _check_delegation_for_ec2( + self.oslo_context, + self.auth_context['token'], + cred.get('project_id'), + ) PROVIDERS.credential_api.delete_credential(ec2_cred_id) return None, http.client.NO_CONTENT @@ -473,10 +561,7 @@ def get(self, user_id): GET /v3/users/{user_id}/OS-OAUTH1/access_tokens """ ENFORCER.enforce_call(action='identity:list_access_tokens') - if self.oslo_context.is_delegated_auth: - raise ks_exception.Forbidden( - _('Cannot list request tokens with a token ' - 'issued via delegation.')) + _block_delegated_token(self.oslo_context, self.auth_context['token']) refs = PROVIDERS.oauth_api.list_access_tokens(user_id) formatted_refs = ([_format_token_entity(x) for x in refs]) return self.wrap_collection(formatted_refs) @@ -489,6 +574,7 @@ def get(self, user_id, access_token_id): GET/HEAD /v3/users/{user_id}/OS-OAUTH1/access_tokens/{access_token_id} """ ENFORCER.enforce_call(action='identity:get_access_token') + _block_delegated_token(self.oslo_context, self.auth_context['token']) access_token = PROVIDERS.oauth_api.get_access_token(access_token_id) if access_token['authorizing_user_id'] != user_id: raise ks_exception.NotFound() @@ -503,6 +589,7 @@ def delete(self, user_id, access_token_id): ENFORCER.enforce_call( action='identity:ec2_delete_credential', build_target=_build_enforcer_target_data_owner_and_user_id_match) + _block_delegated_token(self.oslo_context, self.auth_context['token']) access_token = PROVIDERS.oauth_api.get_access_token(access_token_id) reason = ( 'Invalidating the token cache because an access token for ' @@ -637,6 +724,8 @@ def get(self, user_id): filters = ('name',) ENFORCER.enforce_call(action='identity:list_application_credentials', filters=filters) + token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) app_cred_api = PROVIDERS.application_credential_api hints = self.build_driver_hints(filters) refs = app_cred_api.list_application_credentials(user_id, hints=hints) @@ -653,6 +742,7 @@ def post(self, user_id): validation.lazy_validate(app_cred_schema.application_credential_create, app_cred_data) token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) _check_unrestricted_application_credential(token) if self.oslo_context.user_id != user_id: action = _('Cannot create an application credential for another ' @@ -708,6 +798,8 @@ def get(self, user_id, application_credential_id): action='identity:get_application_credential', target_attr=target, ) + token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) ref = PROVIDERS.application_credential_api.get_application_credential( application_credential_id) return self.wrap_member(ref) @@ -724,6 +816,7 @@ def delete(self, user_id, application_credential_id): target_attr=target ) token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) _check_unrestricted_application_credential(token) PROVIDERS.application_credential_api.delete_application_credential( application_credential_id, initiator=self.audit_initiator) @@ -743,6 +836,8 @@ def get(self, user_id): ENFORCER.enforce_call(action='identity:list_access_rules', filters=filters, build_target=_build_user_target_enforcement) + token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) app_cred_api = PROVIDERS.application_credential_api hints = self.build_driver_hints(filters) refs = app_cred_api.list_access_rules_for_user(user_id, hints=hints) @@ -763,6 +858,8 @@ def get(self, user_id, access_rule_id): action='identity:get_access_rule', build_target=_build_user_target_enforcement ) + token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) ref = PROVIDERS.application_credential_api.get_access_rule( access_rule_id) return self.wrap_member(ref) @@ -776,6 +873,8 @@ def delete(self, user_id, access_rule_id): action='identity:delete_access_rule', build_target=_build_user_target_enforcement ) + token = self.auth_context['token'] + _block_delegated_token_app_creds(self.oslo_context, token) PROVIDERS.application_credential_api.delete_access_rule( access_rule_id, initiator=self.audit_initiator) return None, http.client.NO_CONTENT diff --git a/keystone/conf/security_compliance.py b/keystone/conf/security_compliance.py index 686a957d29..2c7a74505e 100644 --- a/keystone/conf/security_compliance.py +++ b/keystone/conf/security_compliance.py @@ -125,6 +125,71 @@ """)) +allow_insecure_admin_trust_cross_project_credentials_access = cfg.BoolOpt( + 'allow_insecure_admin_trust_cross_project_credentials_access', + default=False, + deprecated_for_removal=True, + deprecated_reason=utils.fmt( + """ +Migrate automated workflows that use admin-role trusts to access credentials +across multiple projects (e.g. Mistral cron triggers) to use non-delegated +service account credentials instead, then remove this option. +""" + ), + deprecated_since='2026.1', + help=utils.fmt( + """ +INSECURE: When enabled, admin-role delegated tokens (trusts, application +credentials, OAuth1 access tokens) are allowed to access credentials outside +their project scope. By default (False), delegated tokens can only access +credentials whose project_id matches the token's project scope, preventing +cross-project lateral movement via a compromised delegation token. + +Enable this only if you have automated workflows (e.g. Mistral cron triggers) +that use admin-role trusts to access credentials across multiple projects and +cannot be migrated to use non-delegated service account credentials. Enabling +this option weakens the isolation guarantee provided by the delegation boundary +fix for LP#2150089. This option is deprecated and will be removed in a future +release. +""" + ), +) + + +allow_insecure_application_credential_trust_escalation = cfg.BoolOpt( + 'allow_insecure_application_credential_trust_escalation', + default=False, + deprecated_for_removal=True, + deprecated_reason=utils.fmt( + """ +Migrate workflows where application credentials create trusts to use OIDC +federation flows (v3oidcclientcredentials, v3oidcdeviceauthz) instead, then +remove this option. +""" + ), + deprecated_since='2026.1', + help=utils.fmt( + """ +INSECURE: When enabled, application credential tokens (including restricted +ones) are allowed to create, delete, and list trusts. By default (False), +application credential tokens are blocked from all trust operations regardless +of the unrestricted flag, because allowing an application credential to +bootstrap a trust creates a new delegation context. A trust-scoped token +produced from that trust can then access authentication material (EC2 +credentials, TOTP seeds) and operate entirely outside the delegation chain, +breaking the audit trail. The 'unrestricted' flag governs credential +management, not trust management. + +Enable this only if you have workflows where application credentials must +create trusts (e.g. Heat stacks authenticated via application credentials). +Use OIDC federation flows (v3oidcclientcredentials, v3oidcdeviceauthz) as the +proper long-term alternative. This option is deprecated and will be removed +in a future release. +""" + ), +) + + GROUP_NAME = __name__.split('.')[-1] ALL_OPTS = [ disable_user_account_days_inactive, @@ -135,7 +200,9 @@ minimum_password_age, password_regex, password_regex_description, - change_password_upon_first_use + change_password_upon_first_use, + allow_insecure_admin_trust_cross_project_credentials_access, + allow_insecure_application_credential_trust_escalation, ] diff --git a/keystone/tests/unit/test_v3_application_credential.py b/keystone/tests/unit/test_v3_application_credential.py index e581d0dbfa..1de466ff9e 100644 --- a/keystone/tests/unit/test_v3_application_credential.py +++ b/keystone/tests/unit/test_v3_application_credential.py @@ -12,6 +12,7 @@ import datetime from testtools import matchers +import unittest import uuid import http.client @@ -211,7 +212,6 @@ def test_create_application_credential_with_trust(self): self.user_id, self.project_id, second_role['id']) with self.test_client() as c: pw_token = self.get_scoped_token() - # create a self-trust - only the roles are important for this test trust_ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.user_id, @@ -228,14 +228,26 @@ def test_create_application_credential_with_trust(self): trust_token = self.v3_create_token( trust_auth).headers['X-Subject-Token'] app_cred = self._app_cred_body(roles=[{'id': self.role_id}]) - # only the roles from the trust token should be allowed, even if - # the user has the role assigned on the project - c.post('/v3/users/%s/application_credentials' % self.user_id, - headers={'X-Auth-Token': trust_token}, - json=app_cred, - expected_status_code=http.client.BAD_REQUEST) + # Trust-scoped tokens are entirely blocked from managing + # application credentials (LP#2150089). + c.post( + f'/v3/users/{self.user_id}/application_credentials', + headers={'X-Auth-Token': trust_token}, + json=app_cred, + expected_status_code=http.client.FORBIDDEN, + ) def test_create_application_credential_allow_recursion(self): + """Unrestricted app credential token can create new credentials. + + The `unrestricted` flag is a documented (unsafe) feature that allows + an application credential token to create additional application + credentials. This must continue to work -- restricted application + credentials are blocked by _check_unrestricted_application_credential, + but unrestricted ones are explicitly opted-in to this behaviour. + Trust-scoped and OAuth1 tokens are separately blocked by + _block_delegated_token_app_creds (LP#2150089). + """ with self.test_client() as c: roles = [{'id': self.role_id}] app_cred_body_1 = self._app_cred_body(roles=roles) @@ -554,6 +566,12 @@ def test_delete_application_credential_with_application_credential(self): headers={'X-Auth-Token': token}) def test_delete_application_credential_allow_recursion(self): + """Unrestricted app credential token can delete credentials. + + The `unrestricted` flag allows an application credential token to + delete application credentials. Trust-scoped and OAuth1 tokens are + separately blocked by _block_delegated_token_app_creds (LP#2150089). + """ with self.test_client() as c: roles = [{'id': self.role_id}] app_cred_body = self._app_cred_body(roles=roles) @@ -598,8 +616,294 @@ def test_update_application_credential(self): # need to be rolled into the base MEMBER_PATH_FMT member_path = '/v3%s' % MEMBER_PATH_FMT % { 'user_id': self.user_id, - 'app_cred_id': app_cred_id} - c.patch(member_path, - json=app_cred_body, - expected_status_code=http.client.METHOD_NOT_ALLOWED, - headers={'X-Auth-Token': token}) + 'app_cred_id': app_cred_id, + } + c.patch( + member_path, + json=app_cred_body, + expected_status_code=http.client.METHOD_NOT_ALLOWED, + headers={'X-Auth-Token': token}, + ) + + def _get_trust_token(self, c, pw_token): + """Return a trust-scoped token for self.user_id on self.project_id.""" + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.user_id, + project_id=self.project_id, + role_ids=[self.role_id], + ) + resp = c.post( + '/v3/OS-TRUST/trusts', + headers={'X-Auth-Token': pw_token}, + json={'trust': trust_ref}, + ) + trust_id = resp.json['trust']['id'] + trust_auth = self.build_authentication_request( + user_id=self.user_id, + password=self.user['password'], + trust_id=trust_id, + ) + return self.v3_create_token(trust_auth).headers['X-Subject-Token'] + + def test_delegation_guard_trust_list_app_creds(self): + """Trust-scoped token cannot list application credentials (LP#2150089). + + Previously GET /v3/users/{id}/application_credentials had no delegation + guard at all; any trust-scoped token could enumerate the user's + application credentials. + """ + with self.test_client() as c: + pw_token = self.get_scoped_token() + trust_token = self._get_trust_token(c, pw_token) + c.get( + f'/v3/users/{self.user_id}/application_credentials', + headers={'X-Auth-Token': trust_token}, + expected_status_code=http.client.FORBIDDEN, + ) + + def test_delegation_guard_trust_get_app_cred(self): + """Trust-scoped token cannot read a specific application credential. + + Previously GET /v3/users/{id}/application_credentials/{id} had no + delegation guard at all (LP#2150089). + """ + with self.test_client() as c: + pw_token = self.get_scoped_token() + roles = [{'id': self.role_id}] + resp = c.post( + f'/v3/users/{self.user_id}/application_credentials', + json=self._app_cred_body(roles=roles), + expected_status_code=http.client.CREATED, + headers={'X-Auth-Token': pw_token}, + ) + app_cred_id = resp.json['application_credential']['id'] + trust_token = self._get_trust_token(c, pw_token) + member_path = f'/v3{MEMBER_PATH_FMT}' % { + 'user_id': self.user_id, + 'app_cred_id': app_cred_id, + } + c.get( + member_path, + headers={'X-Auth-Token': trust_token}, + expected_status_code=http.client.FORBIDDEN, + ) + + def test_delegation_guard_trust_delete_app_cred(self): + """Trust-scoped token cannot delete an application credential. + + Previously DELETE /v3/users/{id}/application_credentials/{id} only + checked for restricted app-cred tokens; trust-scoped tokens had no + guard (LP#2150089). + """ + with self.test_client() as c: + pw_token = self.get_scoped_token() + roles = [{'id': self.role_id}] + resp = c.post( + f'/v3/users/{self.user_id}/application_credentials', + json=self._app_cred_body(roles=roles), + expected_status_code=http.client.CREATED, + headers={'X-Auth-Token': pw_token}, + ) + app_cred_id = resp.json['application_credential']['id'] + trust_token = self._get_trust_token(c, pw_token) + member_path = f'/v3{MEMBER_PATH_FMT}' % { + 'user_id': self.user_id, + 'app_cred_id': app_cred_id, + } + c.delete( + member_path, + headers={'X-Auth-Token': trust_token}, + expected_status_code=http.client.FORBIDDEN, + ) + + def test_delegation_guard_trust_list_access_rules(self): + """Trust-scoped token cannot list access rules (LP#2150089).""" + access_rules = [ + {'path': '/v3/projects', 'method': 'GET', 'service': 'identity'} + ] + with self.test_client() as c: + pw_token = self.get_scoped_token() + roles = [{'id': self.role_id}] + c.post( + f'/v3/users/{self.user_id}/application_credentials', + json=self._app_cred_body( + roles=roles, access_rules=access_rules + ), + expected_status_code=http.client.CREATED, + headers={'X-Auth-Token': pw_token}, + ) + trust_token = self._get_trust_token(c, pw_token) + c.get( + f'/v3/users/{self.user_id}/access_rules', + headers={'X-Auth-Token': trust_token}, + expected_status_code=http.client.FORBIDDEN, + ) + + def test_delegation_guard_trust_get_access_rule(self): + """Trust-scoped token cannot read a specific access rule (LP#2150089).""" + access_rules = [ + {'path': '/v3/projects', 'method': 'GET', 'service': 'identity'} + ] + with self.test_client() as c: + pw_token = self.get_scoped_token() + roles = [{'id': self.role_id}] + resp = c.post( + f'/v3/users/{self.user_id}/application_credentials', + json=self._app_cred_body( + roles=roles, access_rules=access_rules + ), + expected_status_code=http.client.CREATED, + headers={'X-Auth-Token': pw_token}, + ) + access_rule_id = resp.json['application_credential'][ + 'access_rules' + ][0]['id'] + trust_token = self._get_trust_token(c, pw_token) + c.get( + f'/v3/users/{self.user_id}/access_rules/{access_rule_id}', + headers={'X-Auth-Token': trust_token}, + expected_status_code=http.client.FORBIDDEN, + ) + + def test_delegation_guard_trust_delete_access_rule(self): + """Trust-scoped token cannot delete an access rule (LP#2150089).""" + access_rules = [ + {'path': '/v3/projects', 'method': 'GET', 'service': 'identity'} + ] + with self.test_client() as c: + pw_token = self.get_scoped_token() + roles = [{'id': self.role_id}] + resp = c.post( + f'/v3/users/{self.user_id}/application_credentials', + json=self._app_cred_body( + roles=roles, access_rules=access_rules + ), + expected_status_code=http.client.CREATED, + headers={'X-Auth-Token': pw_token}, + ) + ac = resp.json['application_credential'] + access_rule_id = ac['access_rules'][0]['id'] + c.delete( + f'/v3/users/{self.user_id}/application_credentials/{ac["id"]}', + headers={'X-Auth-Token': pw_token}, + expected_status_code=http.client.NO_CONTENT, + ) + trust_token = self._get_trust_token(c, pw_token) + c.delete( + f'/v3/users/{self.user_id}/access_rules/{access_rule_id}', + headers={'X-Auth-Token': trust_token}, + expected_status_code=http.client.FORBIDDEN, + ) + + def test_list_access_rules(self): + access_rules: list[dict[str, str]] = [ + {"service": "foo", "method": "GET", "path": "/bar"} + ] + with self.test_client() as c: + roles = [{'id': self.role_id}] + app_cred_body = self._app_cred_body( + roles=roles, access_rules=access_rules + ) + token = self.get_scoped_token() + c.post( + f"/v3/users/{self.user_id}/application_credentials", + json=app_cred_body, + expected_status_code=http.client.CREATED, + headers={"X-Auth-Token": token}, + ) + # Invoke GET access_rules and trigger internal validation + r = c.get( + f"/v3/users/{self.user_id}/access_rules", + expected_status_code=http.client.OK, + headers={"X-Auth-Token": token}, + ) + ar = r.json["access_rules"] + self.assertEqual(access_rules[0]["method"], ar[0]["method"]) + + # TODO(stephenfin): This will pass once we increase strictness of the query + # string validation + @unittest.expectedFailure + def test_list_access_rules_invalid_qs(self): + with self.test_client() as c: + token = self.get_scoped_token() + # Invoke GET access_rules with unsupported query parameters and + # trigger internal validation + c.get( + f"/v3/users/{self.user_id}/access_rules?user_id=foo", + expected_status_code=http.client.BAD_REQUEST, + headers={"X-Auth-Token": token}, + ) + + def test_show_access_rule(self): + access_rules: list[dict[str, str]] = [ + {"service": "foo", "method": "GET", "path": "/bar"} + ] + with self.test_client() as c: + roles = [{'id': self.role_id}] + app_cred_body = self._app_cred_body( + roles=roles, access_rules=access_rules + ) + token = self.get_scoped_token() + resp = c.post( + f"/v3/users/{self.user_id}/application_credentials", + json=app_cred_body, + expected_status_code=http.client.CREATED, + headers={"X-Auth-Token": token}, + ) + access_rule_id = resp.json["application_credential"][ + "access_rules" + ][0]["id"] + # Invoke GET access_rules/{id} and trigger internal validation + c.get( + f"/v3/users/{self.user_id}/access_rules/{access_rule_id}", + expected_status_code=http.client.OK, + headers={"X-Auth-Token": token}, + ) + + # TODO(stephenfin): This will pass once we increase strictness of the query + # string validation + @unittest.expectedFailure + def test_show_access_rule_invalid_qs(self): + with self.test_client() as c: + token = self.get_scoped_token() + # Invoke GET access_rules/{id} with unsupported query parameters and + # trigger internal validation + c.get( + f"/v3/users/{self.user_id}/access_rules/{access_rule_id}" + "?foo=bar", + expected_status_code=http.client.BAD_REQUEST, + headers={"X-Auth-Token": token}, + ) + + def test_delete_access_rule(self): + access_rules: list[dict[str, str]] = [ + {"service": "foo", "method": "GET", "path": "/bar"} + ] + with self.test_client() as c: + roles = [{'id': self.role_id}] + app_cred_body = self._app_cred_body( + roles=roles, access_rules=access_rules + ) + token = self.get_scoped_token() + resp = c.post( + f"/v3/users/{self.user_id}/application_credentials", + json=app_cred_body, + expected_status_code=http.client.CREATED, + headers={"X-Auth-Token": token}, + ) + app_cred: dict = resp.json["application_credential"] + access_rule_id = app_cred["access_rules"][0]["id"] + c.delete( + f"/v3/users/{self.user_id}/application_credentials" + f"/{app_cred['id']}", + json=app_cred_body, + expected_status_code=http.client.NO_CONTENT, + headers={"X-Auth-Token": token}, + ) + # Invoke GET access_rules/{id} and trigger internal validation + c.delete( + f"/v3/users/{self.user_id}/access_rules/{access_rule_id}", + expected_status_code=http.client.NO_CONTENT, + headers={"X-Auth-Token": token}, + ) diff --git a/keystone/tests/unit/test_v3_credential.py b/keystone/tests/unit/test_v3_credential.py index c3c3a7b55e..7daec4af0e 100644 --- a/keystone/tests/unit/test_v3_credential.py +++ b/keystone/tests/unit/test_v3_credential.py @@ -65,6 +65,30 @@ def _create_dict_blob_credential(self): return json.dumps(blob), credential_id + def _get_ec2_sig_ref(self, blob): + """Return a signed ec2Credentials dict for use with POST /ec2tokens.""" + signer = ec2_utils.Ec2Signer(blob['secret']) + params = { + 'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': blob['access'], + } + return { + 'access': blob['access'], + 'signature': signer.generate( + { + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params, + } + ), + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params, + } + def _test_get_token(self, access, secret): """Test signature validation with the access/secret provided.""" signer = ec2_utils.Ec2Signer(secret) @@ -669,6 +693,152 @@ def test_trust_scoped_ec2_credential(self): token=token_id, expected_status=http.client.CONFLICT) + def _get_trust_token(self): + ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=True, + role_ids=[self.role_id], + ) + del ref['id'] + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + auth_data = self.build_authentication_request( + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], + trust_id=trust['id'], + ) + r = self.v3_create_token(auth_data) + return r.headers.get('X-Subject-Token') + + def test_trust_token_cannot_list_totp_credentials(self): + """Trust-scoped token must not see TOTP/MFA credentials (project_id=None). + + TOTP credentials have no project anchor. Before this fix the + project boundary check skipped null-project credentials, allowing a + delegation token to enumerate and exfiltrate MFA secrets. + """ + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + + trust_token = self._get_trust_token() + + r = self.get(f'/credentials?user_id={self.user_id}', token=trust_token) + listed_ids = [c['id'] for c in r.result['credentials']] + self.assertNotIn(totp_id, listed_ids) + + def test_trust_token_cannot_read_totp_credential(self): + """Trust-scoped token must not read a TOTP credential blob.""" + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + + trust_token = self._get_trust_token() + self.get( + f'/credentials/{totp_id}', + token=trust_token, + expected_status=http.client.FORBIDDEN, + ) + + def test_trust_token_cannot_update_totp_credential(self): + """Trust-scoped token must not be able to update a TOTP credential blob.""" + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + + trust_token = self._get_trust_token() + self.patch( + f'/credentials/{totp_id}', + token=trust_token, + body={'credential': totp_ref}, + expected_status=http.client.FORBIDDEN, + ) + + def test_trust_token_cannot_delete_totp_credential(self): + """Trust-scoped token must not delete a TOTP credential.""" + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + + trust_token = self._get_trust_token() + self.delete( + f'/credentials/{totp_id}', + token=trust_token, + expected_status=http.client.FORBIDDEN, + ) + # Confirm it still exists + self.get(f'/credentials/{totp_id}', expected_status=http.client.OK) + + def test_ec2_auth_trust_cross_project_scoped_to_trust(self): + """Trust-backed EC2 credential with mismatched project_id is safe. + + When an EC2 credential's project_id differs from the trust's + project_id, the trust mechanism constrains the resulting token to + the trust's project -- not the credential's project. This means the + cross-project escalation does not occur for trust-backed credentials, + and no additional auth-time check is needed in that branch. + + This test documents and protects that invariant. + """ + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=True, + role_ids=[self.role_id], + ) + del trust_ref['id'] + r = self.post('/OS-TRUST/trusts', body={'trust': trust_ref}) + trust = self.assertValidTrustResponse(r) + + other_project = unit.new_project_ref(domain_id=self.domain_id) + other_project = PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + + # Plant a credential with project_id pointing to the other project + # but trust_id from the trust above (scoped to self.project_id). + blob, ref = unit.new_ec2_credential( + user_id=self.user_id, project_id=other_project['id'] + ) + blob['trust_id'] = trust['id'] + ref['blob'] = json.dumps(blob) + PROVIDERS.credential_api.create_credential(ref['id'], ref) + + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.role_id + ) + token = self.get_system_scoped_token() + r = self.post( + '/ec2tokens', + body={'ec2Credentials': self._get_ec2_sig_ref(blob)}, + token=token, + expected_status=http.client.OK, + ) + # The resulting token is scoped to the trust's project, not to + # other_project -- the trust mechanism prevents cross-project escalation. + token_project = r.result['token']['project']['id'] + self.assertEqual(self.project_id, token_project) + self.assertNotEqual(other_project['id'], token_project) + class TestCredentialAppCreds(CredentialBaseTestCase): """Test credential with application credential token.""" @@ -873,6 +1043,81 @@ def test_app_cred_ec2_auth_cross_project_rejected(self): expected_status=http.client.UNAUTHORIZED, ) + def test_app_cred_token_cannot_list_totp_credentials(self): + """App cred token must not see TOTP/MFA credentials (project_id=None). + + TOTP credentials have no project anchor. Before this fix the + project boundary check skipped null-project credentials, allowing a + delegation token to enumerate and exfiltrate MFA secrets. + """ + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + + app_cred_token = self._get_app_cred_token(unrestricted=True) + + r = self.get( + f'/credentials?user_id={self.user_id}', token=app_cred_token + ) + listed_ids = [c['id'] for c in r.result['credentials']] + self.assertNotIn(totp_id, listed_ids) + + def test_app_cred_token_cannot_read_totp_credential(self): + """App cred token must not read a TOTP credential blob.""" + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + app_cred_token = self._get_app_cred_token(unrestricted=True) + + self.get( + f'/credentials/{totp_id}', + token=app_cred_token, + expected_status=http.client.FORBIDDEN, + ) + + def test_app_cred_token_cannot_update_totp_credential(self): + """App cred token must not update a TOTP credential blob.""" + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + app_cred_token = self._get_app_cred_token(unrestricted=True) + + self.patch( + f'/credentials/{totp_id}', + token=app_cred_token, + body={'credential': totp_ref}, + expected_status=http.client.FORBIDDEN, + ) + + def test_app_cred_token_cannot_delete_totp_credential(self): + """App cred token must not delete a TOTP credential blob.""" + totp_ref = { + 'user_id': self.user_id, + 'type': 'totp', + 'blob': '{"seed": "JBSWY3DPEHPK3PXP"}', + } + r = self.post('/credentials', body={'credential': totp_ref}) + totp_id = r.result['credential']['id'] + app_cred_token = self._get_app_cred_token(unrestricted=True) + + self.delete( + f'/credentials/{totp_id}', + token=app_cred_token, + expected_status=http.client.FORBIDDEN, + ) + class TestCredentialAccessToken(CredentialBaseTestCase): """Test credential with access token.""" @@ -1020,6 +1265,53 @@ def test_access_token_ec2_credential(self): self.assertIn(self.role_id, ec2_roles) self.assertNotIn(role_id, ec2_roles) + def test_ec2_auth_access_token_cross_project_blocked(self): + """OAuth1 access-token-backed EC2 credential must not auth cross-project. + + Auth-time check: if a cross-project EC2 credential backed by an OAuth1 + access token exists, POST /ec2tokens must reject it when the + credential's project_id differs from the access token's project_id. + """ + access_key, _ = self._get_access_token() + + # Retrieve the stored access token to get its project_id + access_token = PROVIDERS.oauth_api.get_access_token( + access_key.decode('utf-8') + if isinstance(access_key, bytes) + else access_key + ) + + # Create a second project (cross-project target) + other_project = unit.new_project_ref(domain_id=self.domain_id) + other_project = PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + + # Directly inject an EC2 credential whose project_id points to the + # other project but whose access_token_id references the token above. + # This simulates a pre-existing cross-project credential. + blob, ref = unit.new_ec2_credential( + user_id=self.user_id, project_id=other_project['id'] + ) + blob['access_token_id'] = ( + access_key.decode('utf-8') + if isinstance(access_key, bytes) + else access_key + ) + ref['blob'] = json.dumps(blob) + PROVIDERS.credential_api.create_credential(ref['id'], ref) + + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.role_id + ) + token = self.get_system_scoped_token() + self.post( + '/ec2tokens', + body={'ec2Credentials': self._get_ec2_sig_ref(blob)}, + token=token, + expected_status=http.client.UNAUTHORIZED, + ) + class TestCredentialEc2(CredentialBaseTestCase): """Test v3 credential compatibility with ec2tokens.""" @@ -1124,3 +1416,103 @@ def test_ec2_delete_credential(self): self.assertRaises(exception.CredentialNotFound, PROVIDERS.credential_api.get_credential, cred_from_credential_api[0]['id']) + + def _get_trust_token(self): + """Create a trust and return a trust-scoped token for the trustee.""" + trustee = unit.new_user_ref(domain_id=self.domain_id) + password = trustee['password'] + trustee = PROVIDERS.identity_api.create_user(trustee) + trustee['password'] = password + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=trustee['id'], + project_id=self.project_id, + impersonation=True, + role_ids=[self.role_id], + ) + del trust_ref['id'] + r = self.post('/OS-TRUST/trusts', body={'trust': trust_ref}) + trust = r.result['trust'] + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password'], + trust_id=trust['id'], + ) + r = self.v3_create_token(auth_data) + return r.headers.get('X-Subject-Token') + + def test_ec2_create_credential_trust_cross_project_blocked(self): + """Trust-scoped token cannot create EC2 cred for a different project.""" + other_project = unit.new_project_ref(domain_id=self.domain_id) + other_project = PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + trust_token = self._get_trust_token() + uri = f'/users/{self.user_id}/credentials/OS-EC2' + self.post( + uri, + body={'tenant_id': other_project['id']}, + token=trust_token, + expected_status=http.client.FORBIDDEN, + ) + + def test_ec2_create_credential_trust_same_project_allowed(self): + """Trust-scoped token can create EC2 cred for the trust project.""" + trust_token = self._get_trust_token() + uri = self._get_ec2_cred_uri() + r = self.post( + uri, + body={'tenant_id': self.project_id}, + token=trust_token, + expected_status=http.client.CREATED, + ) + self.assertEqual(self.project_id, r.result['credential']['tenant_id']) + + def test_ec2_get_credential_trust_cross_project_blocked(self): + """Trust-scoped token cannot get an EC2 cred from a different project.""" + other_project = unit.new_project_ref(domain_id=self.domain_id) + other_project = PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, other_project['id'], self.role_id + ) + ec2_cred = self._get_ec2_cred() + # Change the credential's project to the other project directly + PROVIDERS.credential_api.update_credential( + next( + c['id'] + for c in PROVIDERS.credential_api.list_credentials_for_user( + self.user_id, type=CRED_TYPE_EC2 + ) + ), + {'project_id': other_project['id']}, + ) + trust_token = self._get_trust_token() + uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) + self.get(uri, token=trust_token, expected_status=http.client.FORBIDDEN) + + def test_ec2_delete_credential_trust_cross_project_blocked(self): + """Trust-scoped token cannot delete EC2 cred from a different project.""" + other_project = unit.new_project_ref(domain_id=self.domain_id) + other_project = PROVIDERS.resource_api.create_project( + other_project['id'], other_project + ) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, other_project['id'], self.role_id + ) + ec2_cred = self._get_ec2_cred() + PROVIDERS.credential_api.update_credential( + next( + c['id'] + for c in PROVIDERS.credential_api.list_credentials_for_user( + self.user_id, type=CRED_TYPE_EC2 + ) + ), + {'project_id': other_project['id']}, + ) + trust_token = self._get_trust_token() + uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) + self.delete( + uri, token=trust_token, expected_status=http.client.FORBIDDEN + ) diff --git a/keystone/tests/unit/test_v3_oauth1.py b/keystone/tests/unit/test_v3_oauth1.py index 6b6942b612..debe17aa68 100644 --- a/keystone/tests/unit/test_v3_oauth1.py +++ b/keystone/tests/unit/test_v3_oauth1.py @@ -430,6 +430,118 @@ def test_list_and_delete_access_tokens(self): self.assertEqual([], entities) self.assertValidListLinks(resp.result['links']) + def _get_app_cred_token(self): + app_cred = { + 'id': uuid.uuid4().hex, + 'user_id': self.user_id, + 'project_id': self.project_id, + 'name': uuid.uuid4().hex, + 'roles': [{'id': self.role_id}], + 'secret': uuid.uuid4().hex, + } + PROVIDERS.application_credential_api.create_application_credential( + app_cred + ) + auth_data = self.build_authentication_request( + app_cred_id=app_cred['id'], secret=app_cred['secret'] + ) + r = self.v3_create_token(auth_data) + return r.headers['X-Subject-Token'] + + def test_list_access_tokens_with_app_cred_blocked(self): + """Application credential token must not list OAuth1 access tokens.""" + self.test_oauth_flow() + token = self._get_app_cred_token() + self.get( + f'/users/{self.user_id}/OS-OAUTH1/access_tokens', + token=token, + expected_status=http.client.FORBIDDEN, + ) + + def test_get_access_token_with_app_cred_blocked(self): + """Application credential token must not get a specific access token.""" + self.test_oauth_flow() + token = self._get_app_cred_token() + access_token_key = self.access_token.key.decode() + self.get( + f'/users/{self.user_id}/OS-OAUTH1/access_tokens/{access_token_key}', + token=token, + expected_status=http.client.FORBIDDEN, + ) + + def test_delete_access_token_with_app_cred_blocked(self): + """Application credential token must not delete an access token.""" + self.test_oauth_flow() + token = self._get_app_cred_token() + access_token_key = self.access_token.key.decode() + self.delete( + f'/users/{self.user_id}/OS-OAUTH1/access_tokens/{access_token_key}', + token=token, + expected_status=http.client.FORBIDDEN, + ) + + def test_get_access_token_with_trust_token_blocked(self): + """Trust-scoped token must not get a specific access token.""" + self.test_oauth_flow() + trustee = unit.new_user_ref(domain_id=self.domain_id) + password = trustee['password'] + trustee = PROVIDERS.identity_api.create_user(trustee) + trustee['password'] = password + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=trustee['id'], + project_id=self.project_id, + impersonation=True, + role_ids=[self.role_id], + ) + del trust_ref['id'] + r = self.post('/OS-TRUST/trusts', body={'trust': trust_ref}) + trust_id = r.result['trust']['id'] + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password'], + trust_id=trust_id, + ) + r = self.v3_create_token(auth_data) + trust_token = r.headers['X-Subject-Token'] + access_token_key = self.access_token.key.decode() + self.get( + f'/users/{self.user_id}/OS-OAUTH1/access_tokens/{access_token_key}', + token=trust_token, + expected_status=http.client.FORBIDDEN, + ) + + def test_delete_access_token_with_trust_token_blocked(self): + """Trust-scoped token must not delete an access token.""" + self.test_oauth_flow() + trustee = unit.new_user_ref(domain_id=self.domain_id) + password = trustee['password'] + trustee = PROVIDERS.identity_api.create_user(trustee) + trustee['password'] = password + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=trustee['id'], + project_id=self.project_id, + impersonation=True, + role_ids=[self.role_id], + ) + del trust_ref['id'] + r = self.post('/OS-TRUST/trusts', body={'trust': trust_ref}) + trust_id = r.result['trust']['id'] + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password'], + trust_id=trust_id, + ) + r = self.v3_create_token(auth_data) + trust_token = r.headers['X-Subject-Token'] + access_token_key = self.access_token.key.decode() + self.delete( + f'/users/{self.user_id}/OS-OAUTH1/access_tokens/{access_token_key}', + token=trust_token, + expected_status=http.client.FORBIDDEN, + ) + class AuthTokenTests(object): diff --git a/releasenotes/notes/bug-2150089-e91b592c948e5771.yaml b/releasenotes/notes/bug-2150089-e91b592c948e5771.yaml new file mode 100644 index 0000000000..9d44424625 --- /dev/null +++ b/releasenotes/notes/bug-2150089-e91b592c948e5771.yaml @@ -0,0 +1,35 @@ +--- +security: + - | + [`bug 2150089 `_] + Delegated tokens (trusts, application credentials, OAuth1 access tokens) + are now restricted to credentials whose ``project_id`` matches the token's + project scope. This closes a cross-project lateral movement vector where a + delegated token could read, modify, or delete credentials belonging to a + different project, including EC2 keys and TOTP/MFA seed bindings. + + Application credential tokens are now blocked from all trust operations + (create, delete, list, get). Allowing an application credential to bootstrap + a trust creates a new delegation context whose token can access + authentication material outside the delegation chain, breaking the audit + trail. The ``unrestricted`` flag governs credential management, not trust + management. +upgrade: + - | + [`bug 2150089 `_] + Two new ``[security_compliance]`` options control opt-in insecure behaviour + for operators with workflows that break after this upgrade: + + ``allow_insecure_admin_trust_cross_project_credentials_access`` (default + ``False``): set to ``True`` if admin-role trusts or application credentials + need to access credentials across multiple projects (e.g. Mistral cron + triggers syncing EC2 credentials system-wide). + + ``allow_insecure_application_credential_trust_escalation`` (default + ``False``): set to ``True`` if application credentials must create or manage + trusts (e.g. Heat stacks authenticated via application credentials). Use + OIDC federation flows (``v3oidcclientcredentials``, ``v3oidcdeviceauthz``) + as the proper long-term alternative. + + Both options are intentionally named to signal that enabling them is + insecure. Migrate affected workflows away from these options. From 7ae065dde0874a319ec1f87ef1e62759e7f7141c Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Tue, 12 May 2026 09:22:34 +0200 Subject: [PATCH 4/7] Fix user impersonation through application credentials (CVE-2026-42998) When authenticating by application credential ID, the caller can supply a 'user' field in the payload. AppCredInfo conditionally set the user from the credential owner only when no user field was present. If present, BaseUserInfo resolved the caller-supplied user and attributed the resulting token to that user instead of the credential owner. Fix: always set auth_payload['user'] from the credential's stored user_id, ignoring any caller-supplied value. Closes-Bug: #2148477 Assisted-by: Claude Sonnet 4.6 Co-authored-by: Boris Bobrov Change-Id: I2fe6089886eebf3775930451b87771e40b5e179e Signed-off-by: Grzegorz Grasza Signed-off-by: Artem Goncharov (cherry picked from commit 6cd25fecdab8b9261e916ee10f3dba5aeb0c1984) --- keystone/auth/plugins/core.py | 7 +-- keystone/tests/unit/test_v3_auth.py | 75 +++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/keystone/auth/plugins/core.py b/keystone/auth/plugins/core.py index 1ae451df33..f48e360942 100644 --- a/keystone/auth/plugins/core.py +++ b/keystone/auth/plugins/core.py @@ -242,9 +242,10 @@ def _validate_and_normalize_auth_data(self, auth_payload): app_cred = app_cred_api.get_application_credential( auth_payload['id']) self.user_id = app_cred['user_id'] - if not auth_payload.get('user'): - auth_payload['user'] = {} - auth_payload['user']['id'] = self.user_id + # Always bind to the credential owner. Any user field supplied by + # the caller would allow impersonation of an arbitrary user + # (LP#2148477). + auth_payload['user'] = {'id': app_cred['user_id']} super(AppCredInfo, self)._validate_and_normalize_auth_data( auth_payload) elif auth_payload.get('name'): diff --git a/keystone/tests/unit/test_v3_auth.py b/keystone/tests/unit/test_v3_auth.py index eb7ea0e292..37256fe76c 100644 --- a/keystone/tests/unit/test_v3_auth.py +++ b/keystone/tests/unit/test_v3_auth.py @@ -5919,3 +5919,78 @@ def test_application_credential_access_rules_without_header_fails(self): expected_status=http.client.CREATED) token = resp.headers.get('X-Subject-Token') self._validate_token(token, expected_status=http.client.NOT_FOUND) + + def test_app_cred_auth_with_injected_user_id_is_ignored(self): + """Caller-supplied user ID in app cred payload must be ignored. + + When authenticating by application credential ID, the token must + always be attributed to the credential owner. An attacker-supplied + user ID must not override the credential owner's identity. + LP#2148477 -- user impersonation via app credential auth. + """ + victim = unit.create_user( + PROVIDERS.identity_api, domain_id=self.domain_id + ) + PROVIDERS.assignment_api.add_role_to_user_and_project( + victim['id'], self.project_id, self.role_id + ) + + app_cred = self._make_app_cred() + app_cred_ref = self.app_cred_api.create_application_credential( + app_cred + ) + + auth_body = { + 'auth': { + 'identity': { + 'methods': ['application_credential'], + 'application_credential': { + 'id': app_cred_ref['id'], + 'secret': app_cred['secret'], + 'user': {'id': victim['id']}, + }, + } + } + } + r = self.v3_create_token(auth_body) + token_data = r.result['token'] + self.assertEqual(self.user['id'], token_data['user']['id']) + self.assertNotEqual(victim['id'], token_data['user']['id']) + + def test_app_cred_auth_with_injected_username_is_ignored(self): + """Caller-supplied username in app cred payload must be ignored. + + Same as the user ID variant but uses the victim's name and domain, + which are typically predictable. LP#2148477. + """ + victim = unit.create_user( + PROVIDERS.identity_api, domain_id=self.domain_id + ) + PROVIDERS.assignment_api.add_role_to_user_and_project( + victim['id'], self.project_id, self.role_id + ) + + app_cred = self._make_app_cred() + app_cred_ref = self.app_cred_api.create_application_credential( + app_cred + ) + + auth_body = { + 'auth': { + 'identity': { + 'methods': ['application_credential'], + 'application_credential': { + 'id': app_cred_ref['id'], + 'secret': app_cred['secret'], + 'user': { + 'name': victim['name'], + 'domain': {'name': self.domain['name']}, + }, + }, + } + } + } + r = self.v3_create_token(auth_body) + token_data = r.result['token'] + self.assertEqual(self.user['id'], token_data['user']['id']) + self.assertNotEqual(victim['id'], token_data['user']['id']) From f350c25b47394e743853e9ab053c8420569da79a Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Tue, 12 May 2026 09:22:47 +0200 Subject: [PATCH 5/7] Forbid trust operations using application credentials (CVE-2026-43000) Previously only restricted application credentials were blocked, and only for trust create and delete. This change blocks all application credentials (restricted and unrestricted alike) from all trust operations: list, get, create, delete, list-roles, and get-role. The 'unrestricted' flag governs credential management, not trust management. Closes-Bug: #2148477 Related-Bug: #2149789 Related-Bug: #2150089 Assisted-by: Claude Sonnet 4.6 Co-authored-by: Boris Bobrov Change-Id: I750156df18a1d6293ce99c42eb524575fcf16ea3 Signed-off-by: Grzegorz Grasza Signed-off-by: Artem Goncharov (cherry picked from commit 3c2043ab003cb4b8aa34502fe9a5a69b0a6a6e54) --- doc/source/user/application_credentials.rst | 6 +- keystone/api/trusts.py | 40 +++++++-- keystone/tests/unit/test_v3_trust.py | 93 +++++++++++++++++++++ 3 files changed, 128 insertions(+), 11 deletions(-) diff --git a/doc/source/user/application_credentials.rst b/doc/source/user/application_credentials.rst index 5455a04e7b..2fa808ef53 100644 --- a/doc/source/user/application_credentials.rst +++ b/doc/source/user/application_credentials.rst @@ -142,9 +142,9 @@ You can provide an expiration date for application credentials: +--------------+----------------------------------------------------------------------------------------+ By default, application credentials are restricted from creating or deleting -other application credentials and from creating or deleting trusts. If your -application needs to be able to perform these actions and you accept the risks -involved, you can disable this protection: +other application credentials. If your application needs to be able to perform +these actions and you accept the risks involved, you can disable this +protection: .. warning:: diff --git a/keystone/api/trusts.py b/keystone/api/trusts.py index 781b99efe5..265d3be829 100644 --- a/keystone/api/trusts.py +++ b/keystone/api/trusts.py @@ -22,6 +22,7 @@ from oslo_policy import _checks as op_checks from keystone.api._shared import json_home_relations +from keystone.common import authorization from keystone.common import context from keystone.common import json_home from keystone.common import provider_api @@ -29,6 +30,7 @@ from keystone.common.rbac_enforcer import policy from keystone.common import utils from keystone.common import validation +import keystone.conf from keystone import exception from keystone.i18n import _ from keystone.server import flask as ks_flask @@ -36,6 +38,7 @@ LOG = log.getLogger(__name__) +CONF = keystone.conf.CONF ENFORCER = rbac_enforcer.RBACEnforcer PROVIDERS = provider_api.ProviderAPIs @@ -46,6 +49,30 @@ parameter_name='trust_id') +def _check_application_credential(): + """Block application credential tokens from all trust operations. + + Application credentials are single-project delegation tokens. Allowing + them to read or manage trusts would permit a compromised application + credential to enumerate or manipulate the trust delegation chain, + expanding its effective scope beyond the single project it was issued for. + This applies regardless of the 'unrestricted' flag. + """ + if CONF.security_compliance.allow_insecure_application_credential_trust_escalation: + return + auth_context = flask.request.environ.get( + authorization.AUTH_CONTEXT_ENV, {} + ) + token = auth_context.get('token') + if token and 'application_credential' in token.methods: + raise exception.ForbiddenAction( + action=_( + "Using method 'application_credential' is not " + "allowed for managing trusts." + ) + ) + + def _build_trust_target_enforcement(): target = {} # NOTE(cmurphy) unlike other APIs, in the event the trust doesn't exist or @@ -101,14 +128,7 @@ class TrustResource(ks_flask.ResourceBase): json_home_parameter_rel_func = _build_parameter_relation def _check_unrestricted(self): - if self.oslo_context.is_admin: - return - token = self.auth_context['token'] - if 'application_credential' in token.methods: - if not token.application_credential['unrestricted']: - action = _("Using method 'application_credential' is not " - "allowed for managing trusts.") - raise exception.ForbiddenAction(action=action) + _check_application_credential() def _find_redelegated_trust(self): # Check if delegated via trust @@ -166,6 +186,7 @@ def _normalize_role_list(self, trust_roles): def _get_trust(self, trust_id): ENFORCER.enforce_call(action='identity:get_trust', build_target=_build_trust_target_enforcement) + _check_application_credential() # NOTE(cmurphy) look up trust before doing is_admin authorization - to # maintain the API contract, we expect a missing trust to raise a 404 @@ -214,6 +235,7 @@ def _list_trusts(self): target_attr=target) else: ENFORCER.enforce_call(action='identity:list_trusts') + _check_application_credential() trusts = [] @@ -361,6 +383,7 @@ def get(self, trust_id): # block access here raise exception.ForbiddenAction( action=_('Requested user has no relation to this trust')) + _check_application_credential() trust = PROVIDERS.trust_api.get_trust(trust_id) @@ -410,6 +433,7 @@ def get(self, trust_id, role_id): # block access here raise exception.ForbiddenAction( action=_('Requested user has no relation to this trust')) + _check_application_credential() trust = PROVIDERS.trust_api.get_trust(trust_id) diff --git a/keystone/tests/unit/test_v3_trust.py b/keystone/tests/unit/test_v3_trust.py index fe6462c62e..82685bb69c 100644 --- a/keystone/tests/unit/test_v3_trust.py +++ b/keystone/tests/unit/test_v3_trust.py @@ -569,6 +569,99 @@ def test_create_trust_with_application_credential(self): token=token_data.headers['x-subject-token'], expected_status=http.client.FORBIDDEN) + def _get_app_cred_token(self, unrestricted=False): + app_cred = { + 'id': uuid.uuid4().hex, + 'user_id': self.user_id, + 'project_id': self.project_id, + 'name': uuid.uuid4().hex, + 'roles': [{'id': self.role_id}], + 'secret': uuid.uuid4().hex, + } + if unrestricted: + app_cred['unrestricted'] = True + PROVIDERS.application_credential_api.create_application_credential( + app_cred + ) + auth_data = self.build_authentication_request( + app_cred_id=app_cred['id'], secret=app_cred['secret'] + ) + r = self.v3_create_token( + auth_data, expected_status=http.client.CREATED + ) + return r.headers['x-subject-token'] + + def test_create_trust_with_unrestricted_application_credential(self): + """Unrestricted app cred must also be blocked from creating trusts.""" + trust_body = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + role_ids=[self.role_id], + ) + self.post( + '/OS-TRUST/trusts', + body={'trust': trust_body}, + token=self._get_app_cred_token(unrestricted=True), + expected_status=http.client.FORBIDDEN, + ) + + def test_list_trusts_with_application_credential(self): + """App cred token must not be able to list trusts.""" + self.get( + '/OS-TRUST/trusts', + token=self._get_app_cred_token(), + expected_status=http.client.FORBIDDEN, + ) + + def test_get_trust_with_application_credential(self): + """App cred token must not be able to read a specific trust.""" + ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + role_ids=[self.role_id], + ) + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust_id = r.result['trust']['id'] + self.get( + f'/OS-TRUST/trusts/{trust_id}', + token=self._get_app_cred_token(), + expected_status=http.client.FORBIDDEN, + ) + + def test_list_trust_roles_with_application_credential(self): + """App cred token must not be able to list roles for a trust.""" + ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + role_ids=[self.role_id], + ) + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust_id = r.result['trust']['id'] + self.get( + f'/OS-TRUST/trusts/{trust_id}/roles', + token=self._get_app_cred_token(), + expected_status=http.client.FORBIDDEN, + ) + + def test_get_trust_role_with_application_credential(self): + """App cred token must not be able to get a specific trust role.""" + ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + role_ids=[self.role_id], + ) + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust_id = r.result['trust']['id'] + self.get( + f'/OS-TRUST/trusts/{trust_id}/roles/{self.role_id}', + token=self._get_app_cred_token(), + expected_status=http.client.FORBIDDEN, + ) + def test_delete_trust_with_application_credential(self): ref = unit.new_trust_ref( trustor_user_id=self.user_id, From c525411c699dad731c6be403cef52f9690f6b1fe Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Tue, 12 May 2026 09:11:24 +0200 Subject: [PATCH 6/7] Preserve expires_at when rescoping federated tokens (CVE-2026-44394) When a federated token is rescoped via POST /v3/auth/tokens the handle_scoped_token function returned response_data without an expires_at value. Because issue_token falls back to default_expire_time when expires_at is None, each rescope issued a fresh full-TTL token instead of inheriting the remaining lifetime of the original token. A user with a federated token could extend their session indefinitely by rescoping repeatedly before expiry, bypassing operator-configured TTL policies and IdP-level account revocation. Fix: propagate token.expires_at from handle_scoped_token so that issue_token uses the original token's expiry rather than resetting to the default. The non-federated path in token.py already did this via response_data.setdefault('expires_at', token.expires_at). Closes-Bug: #2150379 Assisted-by: Claude Sonnet 4.6 Co-authored-by: Artem Goncharov Change-Id: I0bbb8520e12c52edd01fb47c873f0227819706f5 Signed-off-by: Grzegorz Grasza (cherry picked from commit 75a4a0c354c7f568b28dd85182dc729553fb3a33) --- keystone/auth/plugins/mapped.py | 4 + .../tests/unit/auth/plugins/test_mapped.py | 120 ++++++++++++++++++ 2 files changed, 124 insertions(+) diff --git a/keystone/auth/plugins/mapped.py b/keystone/auth/plugins/mapped.py index 23ba337246..0bd9cd304f 100644 --- a/keystone/auth/plugins/mapped.py +++ b/keystone/auth/plugins/mapped.py @@ -102,6 +102,10 @@ def handle_scoped_token(token, federation_api, identity_api): response_data['group_ids'] = group_ids response_data[federation_constants.IDENTITY_PROVIDER] = identity_provider response_data[federation_constants.PROTOCOL] = protocol + # Preserve the original token's expiry to prevent users from + # indefinitely extending their session by repeatedly rescoping. + # The non-federated path in token.py does the same via setdefault(). + response_data['expires_at'] = token.expires_at return response_data diff --git a/keystone/tests/unit/auth/plugins/test_mapped.py b/keystone/tests/unit/auth/plugins/test_mapped.py index a79bdaa6c7..0a1cec8abf 100644 --- a/keystone/tests/unit/auth/plugins/test_mapped.py +++ b/keystone/tests/unit/auth/plugins/test_mapped.py @@ -16,6 +16,7 @@ from keystone.assignment.core import Manager as AssignmentApi from keystone.auth.plugins import mapped from keystone.exception import ProjectNotFound +from keystone.federation import constants as federation_constants from keystone.resource.core import Manager as ResourceApi from keystone.tests import unit @@ -150,3 +151,122 @@ def create_project_mock_for_shadow_project(self, shadow_project): project = shadow_project.copy() project['id'] = uuid.uuid4().hex return project + + def _make_federated_token_mock(self, expires_at): + token = mock.Mock() + token.audit_id = uuid.uuid4().hex + token.user_id = uuid.uuid4().hex + token.identity_provider_id = 'test-idp' + token.protocol_id = 'mapped' + token.federated_groups = [{'id': uuid.uuid4().hex}] + token.expires_at = expires_at + return token + + +class TestHandleScopedToken(unit.TestCase): + """Tests for the handle_scoped_token security fix. + + Verify that rescoping a federated token preserves the original + token's expires_at rather than falling back to a fresh TTL. + Without the fix, an attacker can extend their session indefinitely + by rescoping before expiry, bypassing IdP-level account revocation. + """ + + def setUp(self): + super().setUp() + self.federation_api = mock.Mock() + self.identity_api = mock.Mock() + mapping_ref = {'id': uuid.uuid4().hex} + self.federation_api.get_mapping_from_idp_and_protocol.return_value = ( + mapping_ref + ) + + @mock.patch( + 'keystone.auth.plugins.mapped.notifications' + '.send_saml_audit_notification', + autospec=True, + ) + @mock.patch( + 'keystone.auth.plugins.mapped.utils.validate_mapped_group_ids', + autospec=True, + ) + @mock.patch( + 'keystone.auth.plugins.mapped.utils.assert_enabled_identity_provider', + autospec=True, + ) + @mock.patch( + 'keystone.auth.plugins.mapped.utils.validate_expiration', autospec=True + ) + def test_handle_scoped_token_preserves_expires_at( + self, + mock_validate_exp, + mock_assert_idp, + mock_validate_groups, + mock_notify, + ): + """Rescoped federated token must inherit original expiry (not fresh TTL). + + This is the security regression test for the authentication expiry + bypass vulnerability: handle_scoped_token must include expires_at in + the returned response_data so that issue_token() does not fall back to + default_expire_time(). + """ + original_expiry = '2026-04-26T08:59:30.000000Z' + token = self._make_federated_token_mock(original_expiry) + + result = mapped.handle_scoped_token( + token, self.federation_api, self.identity_api + ) + + self.assertIn('expires_at', result) + self.assertEqual(original_expiry, result['expires_at']) + + @mock.patch( + 'keystone.auth.plugins.mapped.notifications' + '.send_saml_audit_notification', + autospec=True, + ) + @mock.patch( + 'keystone.auth.plugins.mapped.utils.validate_mapped_group_ids', + autospec=True, + ) + @mock.patch( + 'keystone.auth.plugins.mapped.utils.assert_enabled_identity_provider', + autospec=True, + ) + @mock.patch( + 'keystone.auth.plugins.mapped.utils.validate_expiration', autospec=True + ) + def test_handle_scoped_token_returns_federation_metadata( + self, + mock_validate_exp, + mock_assert_idp, + mock_validate_groups, + mock_notify, + ): + """Rescoped federated token still returns all required federation data.""" + token = self._make_federated_token_mock('2026-04-26T08:59:30.000000Z') + + result = mapped.handle_scoped_token( + token, self.federation_api, self.identity_api + ) + + self.assertEqual(token.user_id, result['user_id']) + self.assertEqual( + token.identity_provider_id, + result[federation_constants.IDENTITY_PROVIDER], + ) + self.assertEqual( + token.protocol_id, result[federation_constants.PROTOCOL] + ) + self.assertIsInstance(result['group_ids'], list) + + def _make_federated_token_mock(self, expires_at): + token = mock.Mock() + token.audit_id = uuid.uuid4().hex + token.user_id = uuid.uuid4().hex + token.identity_provider_id = 'test-idp' + token.protocol_id = 'mapped' + token.federated_groups = [{'id': uuid.uuid4().hex}] + token.expires_at = expires_at + return token From 7114e0f66c7e852edd7af8c2182a1c6339c9c266 Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Tue, 12 May 2026 09:10:20 +0200 Subject: [PATCH 7/7] Prevent RBAC policy bypass via JSON body and query filters (CVE-2026-42999) The RBAC enforcer unconditionally merged the raw JSON request body into the policy enforcement dictionary after trusted target data had been set from the database. An attacker could include a "target" key in the JSON body to overwrite database-sourced RBAC target attributes, causing all %(target.*)s policy substitutions to evaluate against attacker-controlled values. This affected 88 endpoint/method combinations across all Keystone API resources. The fix namespaces user-controlled JSON body data under a "request_body" key in the policy dict, making it structurally impossible for request body fields to collide with internal keys like "target" or view_args. The only in-tree policy rule that depended on the old JSON body merge behavior was identity:create_trust, which referenced %(trust.trustor_user_id)s from the request body at the top level of the policy dict. This is updated to use target_attr and the %(target.trust.trustor_user_id)s substitution, consistent with all other trust policy rules. Additionally, query-string filter values had the same structural issue: _extract_filter_values() results were merged at the top level, meaning a filter key matching a view_arg key (e.g. user_id on /v3/users/{user_id}/... endpoints using ADMIN_OR_SYSTEM_READER_OR_OWNER) could be overwritten by an attacker-controlled ?user_id= query param, bypassing ownership checks. Filter values are now namespaced under "filter_attr". No in-tree policy rule references filter values via %(key)s substitutions, so this is backwards-compatible for upstream deployments. Closes-Bug: #2148398 Assisted-by: Claude Sonnet 4.6 Co-authored-by: Boris Bobrov Co-authored-by: Artem Goncharov Change-Id: I295d1ac27faad05a680bb2b3fac8cfa27fa1c4bd Signed-off-by: Grzegorz Grasza (cherry picked from commit 22b51f5d5d86350d3fbc66697e4097bacf2a8ce9) --- keystone/api/trusts.py | 4 +- keystone/common/policies/base.py | 2 +- keystone/common/rbac_enforcer/enforcer.py | 21 ++- .../tests/protection/v3/test_credentials.py | 120 ++++++++++++ keystone/tests/protection/v3/test_grants.py | 77 ++++++++ .../tests/unit/common/test_rbac_enforcer.py | 175 +++++++++++++++++- .../notes/bug-2148398-e35dd449b3a330e6.yaml | 40 ++++ 7 files changed, 429 insertions(+), 10 deletions(-) create mode 100644 releasenotes/notes/bug-2148398-e35dd449b3a330e6.yaml diff --git a/keystone/api/trusts.py b/keystone/api/trusts.py index 265d3be829..84e97b5ba5 100644 --- a/keystone/api/trusts.py +++ b/keystone/api/trusts.py @@ -296,8 +296,10 @@ def post(self): The User creating the trust must be the trustor. """ - ENFORCER.enforce_call(action='identity:create_trust') trust = self.request_body_json.get('trust', {}) + ENFORCER.enforce_call( + action='identity:create_trust', target_attr={'trust': trust} + ) validation.lazy_validate(schema.trust_create, trust) self._check_unrestricted() diff --git a/keystone/common/policies/base.py b/keystone/common/policies/base.py index 73a2ca26f0..aa444733eb 100644 --- a/keystone/common/policies/base.py +++ b/keystone/common/policies/base.py @@ -30,7 +30,7 @@ RULE_SERVICE_ADMIN_OR_TOKEN_SUBJECT = ( 'rule:service_admin_or_token_subject') # nosec RULE_SERVICE_OR_ADMIN = 'rule:service_or_admin' -RULE_TRUST_OWNER = 'user_id:%(trust.trustor_user_id)s' +RULE_TRUST_OWNER = 'user_id:%(target.trust.trustor_user_id)s' # We are explicitly setting system_scope:all in these check strings because # they provide backwards compatibility in the event a deployment sets diff --git a/keystone/common/rbac_enforcer/enforcer.py b/keystone/common/rbac_enforcer/enforcer.py index 7add048ce8..aa9797f610 100644 --- a/keystone/common/rbac_enforcer/enforcer.py +++ b/keystone/common/rbac_enforcer/enforcer.py @@ -430,14 +430,21 @@ def enforce_call(cls, enforcer=None, action=None, target_attr=None, policy_dict['target'] = target_attr or build_target() - # Pull the data from the submitted json body to generate - # appropriate input/target attributes, we take an explicit copy here - # to ensure we're not somehow corrupting + # Pull the data from the submitted json body. We namespace it under + # 'request_body' to prevent user-controlled input from overwriting + # security-critical keys in the policy dict (e.g. 'target' populated + # by build_target/target_attr, or view_args like 'user_id'). json_input = flask.request.get_json(force=True, silent=True) or {} - policy_dict.update(json_input.copy()) - - # Generate the filter_attr dataset. - policy_dict.update(cls._extract_filter_values(filters)) + if json_input: + policy_dict['request_body'] = json_input.copy() + + # Namespace query-string filter values under 'filter_attr' to prevent + # attacker-controlled query params from overwriting view_args (e.g. + # user_id from /v3/users/{user_id}/...) or other trusted keys in the + # policy dict via %(key)s substitutions in policy rules. + filter_values = cls._extract_filter_values(filters) + if filter_values: + policy_dict['filter_attr'] = filter_values flattened = utils.flatten_dict(policy_dict) if LOG.logger.getEffectiveLevel() <= log.DEBUG: diff --git a/keystone/tests/protection/v3/test_credentials.py b/keystone/tests/protection/v3/test_credentials.py index 5a1960e38e..a08ba033b1 100644 --- a/keystone/tests/protection/v3/test_credentials.py +++ b/keystone/tests/protection/v3/test_credentials.py @@ -18,11 +18,13 @@ from keystone.common.policies import base as bp from keystone.common import provider_api import keystone.conf +from keystone.credential.providers import fernet as credential_fernet from keystone.tests.common import auth as common_auth from keystone.tests import unit from keystone.tests.unit import base_classes from keystone.tests.unit import ksfixtures from keystone.tests.unit.ksfixtures import temporaryfile +from keystone.tests.unit import test_v3 CONF = keystone.conf.CONF PROVIDERS = provider_api.ProviderAPIs @@ -1252,3 +1254,121 @@ def setUp(self): r = c.post('/v3/auth/tokens', json=auth) self.token_id = r.headers['X-Subject-Token'] self.headers = {'X-Auth-Token': self.token_id} + + +class TargetInjectionCredentialTests(test_v3.RestfulTestCase): + """Test that JSON body injection cannot bypass credential RBAC. + + Verifies CVE-2026-42999: the RBAC enforcer must not allow the JSON + request body to overwrite security-critical keys in the policy dict. + """ + + def setUp(self): + super().setUp() + self.useFixture( + ksfixtures.KeyRepository( + self.config_fixture, + 'credential', + credential_fernet.MAX_ACTIVE_KEYS, + ) + ) + + def _make_user_with_project(self, role_id=None): + user = unit.create_user( + PROVIDERS.identity_api, domain_id=self.domain_id + ) + project = unit.new_project_ref(domain_id=self.domain_id) + PROVIDERS.resource_api.create_project(project['id'], project) + if role_id: + PROVIDERS.assignment_api.add_role_to_user_and_project( + user['id'], project['id'], role_id + ) + return user, project + + def test_list_credentials_cannot_read_other_users_secrets(self): + """GET /v3/credentials must not return other users' credentials. + + An attacker injects their own user_id into target.credential.user_id + in the JSON body. Without the fix the per-item policy filter would + see the attacker's user_id and pass every credential through. + """ + role = unit.new_role_ref() + PROVIDERS.role_api.create_role(role['id'], role) + + victim, victim_project = self._make_user_with_project(role['id']) + attacker, attacker_project = self._make_user_with_project(role['id']) + + victim_cred = unit.new_credential_ref( + user_id=victim['id'], project_id=victim_project['id'] + ) + PROVIDERS.credential_api.create_credential( + victim_cred['id'], victim_cred + ) + attacker_cred = unit.new_credential_ref( + user_id=attacker['id'], project_id=attacker_project['id'] + ) + PROVIDERS.credential_api.create_credential( + attacker_cred['id'], attacker_cred + ) + + attacker_auth = self.build_authentication_request( + user_id=attacker['id'], + password=attacker['password'], + project_id=attacker_project['id'], + ) + r = self.get( + '/credentials', + auth=attacker_auth, + body={'target': {'credential': {'user_id': attacker['id']}}}, + ) + + cred_ids = [c['id'] for c in r.result['credentials']] + self.assertIn(attacker_cred['id'], cred_ids) + self.assertNotIn(victim_cred['id'], cred_ids) + + def test_ec2_create_credential_cannot_create_for_other_user(self): + """EC2 credential creation must not allow impersonating other users. + + POST /v3/users/{user_id}/credentials/OS-EC2: the attacker injects + target.credential.user_id to bypass the ownership check. + """ + member_role = unit.new_role_ref(name='member') + PROVIDERS.role_api.create_role(member_role['id'], member_role) + + victim, victim_project = self._make_user_with_project( + member_role['id'] + ) + attacker, attacker_project = self._make_user_with_project( + member_role['id'] + ) + + attacker_auth = self.build_authentication_request( + user_id=attacker['id'], + password=attacker['password'], + project_id=attacker_project['id'], + ) + ec2_uri = f'/users/{victim["id"]}/credentials/OS-EC2' + + self.post( + ec2_uri, + auth=attacker_auth, + body={ + 'tenant_id': victim_project['id'], + 'target': {'credential': {'user_id': attacker['id']}}, + }, + expected_status=http.client.FORBIDDEN, + ) + self.post( + ec2_uri, + auth=attacker_auth, + body={ + 'tenant_id': victim_project['id'], + 'target': { + 'credential': { + 'user_id': attacker['id'], + 'project_id': attacker_project['id'], + } + }, + }, + expected_status=http.client.FORBIDDEN, + ) diff --git a/keystone/tests/protection/v3/test_grants.py b/keystone/tests/protection/v3/test_grants.py index bb74b09014..b4cbf45891 100644 --- a/keystone/tests/protection/v3/test_grants.py +++ b/keystone/tests/protection/v3/test_grants.py @@ -23,6 +23,7 @@ from keystone.tests.unit import base_classes from keystone.tests.unit import ksfixtures from keystone.tests.unit.ksfixtures import temporaryfile +from keystone.tests.unit import test_v3 CONF = keystone.conf.CONF PROVIDERS = provider_api.ProviderAPIs @@ -2275,3 +2276,79 @@ def test_cannot_revoke_grant_from_group_on_domain(self): headers=self.headers, expected_status_code=http.client.FORBIDDEN ) + + +class TargetInjectionGrantTests(test_v3.RestfulTestCase): + """Test that JSON body injection cannot bypass grant RBAC. + + Verifies CVE-2026-42999: the RBAC enforcer must not allow the JSON + request body to overwrite security-critical keys in the policy dict. + """ + + def setUp(self): + super().setUp() + policy_file = self.useFixture(temporaryfile.SecureTempFile()) + self.useFixture( + ksfixtures.Policy( + self.config_fixture, policy_file=policy_file.file_name + ) + ) + with open(policy_file.file_name, 'w') as f: + overrides = { + 'identity:create_grant': ( + '(role:admin and system_scope:all) or ' + '(role:admin and ' + 'domain_id:%(target.user.domain_id)s and ' + 'domain_id:%(target.domain.id)s) and ' + '(domain_id:%(target.role.domain_id)s or ' + 'None:%(target.role.domain_id)s)' + ) + } + f.write(jsonutils.dumps(overrides)) + + def test_inherited_grant_cannot_escalate_cross_domain(self): + """PUT OS-INHERIT grant must not allow cross-domain escalation. + + A domain admin in domain A tries to create an inherited admin role + grant on domain B by injecting target data. Without the fix the + policy would see all domains matching the attacker's domain. + """ + domain_a = unit.new_domain_ref() + PROVIDERS.resource_api.create_domain(domain_a['id'], domain_a) + attacker = unit.create_user( + PROVIDERS.identity_api, domain_id=domain_a['id'] + ) + admin_role = self.role + PROVIDERS.assignment_api.create_grant( + admin_role['id'], user_id=attacker['id'], domain_id=domain_a['id'] + ) + + domain_b = unit.new_domain_ref() + PROVIDERS.resource_api.create_domain(domain_b['id'], domain_b) + + attacker_auth = self.build_authentication_request( + user_id=attacker['id'], + password=attacker['password'], + domain_id=domain_a['id'], + ) + + inherit_url = ( + '/OS-INHERIT/domains/{domain_id}/users/{user_id}' + '/roles/{role_id}/inherited_to_projects' + ).format( + domain_id=domain_b['id'], + user_id=attacker['id'], + role_id=admin_role['id'], + ) + self.put( + inherit_url, + auth=attacker_auth, + body={ + 'target': { + 'user': {'domain_id': domain_a['id']}, + 'domain': {'id': domain_a['id']}, + 'role': {'domain_id': None, 'name': 'member'}, + } + }, + expected_status=http.client.FORBIDDEN, + ) diff --git a/keystone/tests/unit/common/test_rbac_enforcer.py b/keystone/tests/unit/common/test_rbac_enforcer.py index b235eb29d3..b83bbcb724 100644 --- a/keystone/tests/unit/common/test_rbac_enforcer.py +++ b/keystone/tests/unit/common/test_rbac_enforcer.py @@ -176,7 +176,7 @@ def _testing_policy_rules(self): ), policy.RuleDefault( name='example:with_filter', - check_str='user_id:%(user)s', + check_str='user_id:%(filter_attr.user)s', scope_types=['project'], ), policy.RuleDefault( @@ -423,6 +423,135 @@ def test_extract_member_target_data_bad_input(self): self.assertEqual({}, self.enforcer._extract_member_target_data( member_target={}, member_target_type=None)) + def test_json_body_cannot_overwrite_build_target(self): + # Verify that a JSON request body cannot overwrite the target + # data populated by build_target. The enforcer must namespace + # user-controlled JSON input so it cannot collide with the + # trusted 'target' key set from the database. + assertIn = self.assertIn + assertEq = self.assertEqual + + real_owner_id = uuid.uuid4().hex + attacker_id = uuid.uuid4().hex + + def _enforce_mock_func(credentials, action, target, do_raise=True): + assertIn('target.credential.user_id', target) + assertEq(target['target.credential.user_id'], real_owner_id) + + def _build_target(): + return {'credential': {'user_id': real_owner_id}} + + self.useFixture( + fixtures.MockPatchObject( + self.enforcer, '_enforce', _enforce_mock_func + ) + ) + + with self.test_client() as c: + path = '/v3/auth/tokens' + body = self._auth_json() + r = c.post( + path, + json=body, + follow_redirects=True, + expected_status_code=201, + ) + token_id = r.headers['X-Subject-Token'] + + # Send a request with a JSON body that attempts to + # overwrite the build_target-supplied credential owner. + c.get( + f'{self.restful_api_url_prefix}/argument/{uuid.uuid4().hex}', + headers={'X-Auth-Token': token_id}, + json={'target': {'credential': {'user_id': attacker_id}}}, + ) + self.enforcer.enforce_call( + action='example:allowed', build_target=_build_target + ) + + def test_json_body_cannot_overwrite_target_attr(self): + # Verify that a JSON request body cannot overwrite the target + # data populated by target_attr. The enforcer must namespace + # user-controlled JSON input so it cannot collide with the + # trusted 'target' key set explicitly by the API handler. + assertIn = self.assertIn + assertEq = self.assertEqual + + real_owner_id = uuid.uuid4().hex + attacker_id = uuid.uuid4().hex + + def _enforce_mock_func(credentials, action, target, do_raise=True): + assertIn('target.credential.user_id', target) + assertEq(target['target.credential.user_id'], real_owner_id) + + self.useFixture( + fixtures.MockPatchObject( + self.enforcer, '_enforce', _enforce_mock_func + ) + ) + + with self.test_client() as c: + path = '/v3/auth/tokens' + body = self._auth_json() + r = c.post( + path, + json=body, + follow_redirects=True, + expected_status_code=201, + ) + token_id = r.headers['X-Subject-Token'] + + c.get( + f'{self.restful_api_url_prefix}/argument/{uuid.uuid4().hex}', + headers={'X-Auth-Token': token_id}, + json={'target': {'credential': {'user_id': attacker_id}}}, + ) + target_attr = {'credential': {'user_id': real_owner_id}} + self.enforcer.enforce_call( + action='example:allowed', target_attr=target_attr + ) + + def test_json_body_cannot_overwrite_view_args(self): + # Verify that a JSON request body cannot overwrite URL path + # parameters (view_args) in the policy dict. The enforcer must + # namespace user-controlled JSON input so it cannot collide + # with trusted view_args like 'user_id' or 'argument_id'. + assertIn = self.assertIn + assertEq = self.assertEqual + + real_argument_id = uuid.uuid4().hex + injected_argument_id = uuid.uuid4().hex + + def _enforce_mock_func(credentials, action, target, do_raise=True): + assertIn('argument_id', target) + assertEq(target['argument_id'], real_argument_id) + + self.useFixture( + fixtures.MockPatchObject( + self.enforcer, '_enforce', _enforce_mock_func + ) + ) + + with self.test_client() as c: + path = '/v3/auth/tokens' + body = self._auth_json() + r = c.post( + path, + json=body, + follow_redirects=True, + expected_status_code=201, + ) + token_id = r.headers['X-Subject-Token'] + + # URL has argument_id=real_argument_id, but the JSON body + # tries to overwrite it. + c.get( + (f'{self.restful_api_url_prefix}/argument/{real_argument_id}'), + headers={'X-Auth-Token': token_id}, + json={'argument_id': injected_argument_id}, + ) + self.enforcer.enforce_call(action='example:allowed') + def test_call_build_enforcement_target(self): assertIn = self.assertIn assertEq = self.assertEqual @@ -638,6 +767,50 @@ def test_enforce_call_with_filter_values(self): self.enforcer.enforce_call, action='example:with_filter') + def test_query_filter_cannot_overwrite_view_args(self): + """Query-string filter values must not overwrite view_args in policy dict. + + Before the fix, policy_dict.update(filter_values) ran after + policy_dict.update(view_args). If a filter key matched a view_arg key + (e.g. both named 'user_id'), a ?user_id=attacker query param would + overwrite the URL-path-sourced value used in %(user_id)s policy + substitutions, bypassing ownership checks such as + ADMIN_OR_SYSTEM_READER_OR_OWNER on /v3/users/{user_id}/... endpoints. + """ + real_arg_id = uuid.uuid4().hex + injected_arg_id = uuid.uuid4().hex + seen = {} + + def _capture_enforce(credentials, action, target, do_raise=True): + seen.update(target) + + self.useFixture( + fixtures.MockPatchObject( + self.enforcer, '_enforce', _capture_enforce + ) + ) + + with self.test_client() as c: + r = c.post( + '/v3/auth/tokens', + json=self._auth_json(), + expected_status_code=201, + ) + token_id = r.headers['X-Subject-Token'] + c.get( + f'{self.restful_api_url_prefix}/argument/{real_arg_id}' + f'?argument_id={injected_arg_id}', + headers={'X-Auth-Token': token_id}, + ) + self.enforcer.enforce_call( + action='example:allowed', filters=['argument_id'] + ) + + # view_arg survives: argument_id at the top level is real_arg_id + self.assertEqual(real_arg_id, seen.get('argument_id')) + # filter value is namespaced, not overwriting view_arg + self.assertEqual(injected_arg_id, seen.get('filter_attr.argument_id')) + def test_enforce_call_with_pre_instantiated_enforcer(self): token_path = '/v3/auth/tokens' auth_json = self._auth_json() diff --git a/releasenotes/notes/bug-2148398-e35dd449b3a330e6.yaml b/releasenotes/notes/bug-2148398-e35dd449b3a330e6.yaml new file mode 100644 index 0000000000..1685cc04f8 --- /dev/null +++ b/releasenotes/notes/bug-2148398-e35dd449b3a330e6.yaml @@ -0,0 +1,40 @@ +--- +critical: + - | + [`bug 2148398 `_] + The RBAC enforcer unconditionally merged the raw JSON request body into + the policy enforcement dictionary after trusted target data had been set + from the database. An attacker could include a ``target`` key in the JSON + body to overwrite database-sourced RBAC target attributes, causing all + ``%(target.*)s`` policy substitutions to evaluate against + attacker-controlled values. This affected 88 endpoint/method combinations + across all Keystone API resource areas. Any authenticated user could + exploit this to read every credential secret in the deployment, create + EC2 credentials for arbitrary users, or revoke other users' tokens. A + domain administrator could escalate to full cloud admin by creating + inherited role grants on other domains. The vulnerability has been present + since the Rocky release (14.0.0). +security: + - | + [`bug 2148398 `_] + The RBAC policy enforcer now namespaces JSON request body data under a + ``request_body`` key in the policy dictionary instead of merging it at + the top level. This prevents user-controlled input from overwriting + security-critical keys such as ``target`` (populated from the database + by ``build_target`` or ``target_attr``) and URL path parameters like + ``user_id``. All upstream policy rules are unaffected by this change. + Deployments with custom policy rules that reference JSON body fields + directly via ``%(field_name)s`` substitutions (not under ``target.``) + will need to update those references to ``%(request_body.field_name)s``. +upgrade: + - | + [`bug 2148398 `_] + The ``identity:create_trust`` policy rule now uses + ``%(target.trust.trustor_user_id)s`` instead of + ``%(trust.trustor_user_id)s``. The trust data from the request body is + now passed explicitly via ``target_attr`` rather than relying on the + JSON body merge. This aligns ``create_trust`` with all other trust + policy rules which already use the ``target.trust.*`` prefix. + Deployments that override the ``identity:create_trust`` policy and + reference ``%(trust.trustor_user_id)s`` must update to + ``%(target.trust.trustor_user_id)s``.