Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions doc/source/user/application_credentials.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ You can provide an expiration date for application credentials:
+--------------+----------------------------------------------------------------------------------------+

By default, application credentials are restricted from creating or deleting
other application credentials and from creating or deleting trusts. If your
application needs to be able to perform these actions and you accept the risks
involved, you can disable this protection:
other application credentials. If your application needs to be able to perform
these actions and you accept the risks involved, you can disable this
protection:

.. warning::

Expand Down
14 changes: 14 additions & 0 deletions keystone/api/_shared/EC2_S3_Resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,24 @@ def handle_authenticate(self):
app_cred = ac_client.get_application_credential(
cred_data['app_cred_id'])
roles = [r['id'] for r in app_cred['roles']]
if cred_data['project_id'] != app_cred['project_id']:
raise ks_exceptions.Unauthorized(
_(
'EC2 credential project does not match the '
'application credential project.'
)
)
elif cred_data['access_token_id']:
access_token = PROVIDERS.oauth_api.get_access_token(
cred_data['access_token_id'])
roles = jsonutils.loads(access_token['role_ids'])
if cred_data['project_id'] != access_token['project_id']:
raise ks_exceptions.Unauthorized(
_(
'EC2 credential project does not match the '
'OAuth1 access token project.'
)
)
auth_context = {'access_token_id': cred_data['access_token_id']}
else:
roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
Expand Down
90 changes: 84 additions & 6 deletions keystone/api/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,58 @@
ENFORCER = rbac_enforcer.RBACEnforcer


def _check_unrestricted_application_credential(token):
if 'application_credential' in token.methods:
if not token.application_credential['unrestricted']:
action = _(
"Using method 'application_credential' is not "
"allowed for managing additional credentials."
)
raise exception.ForbiddenAction(action=action)


def _check_credential_project_scope(token, oslo_context, credential):
"""Enforce project boundary for delegated tokens.

Non-delegated tokens (password, totp, etc.) are not restricted here --
an admin with a regular token can legitimately manage credentials across
projects. Delegated tokens (trusts, application credentials, OAuth1) are
always bound to a single project at delegation time; only credentials
whose project_id exactly matches the token's project scope are in bounds.

Credentials with project_id=None (e.g. TOTP/MFA bindings) are treated as
out-of-scope for any delegated token: they are user-level secrets with no
project anchor, and a delegated token should never be able to enumerate,
read, or mutate them -- doing so would allow a stolen delegation token to
exfiltrate or destroy a user's MFA binding.
"""
trust_id = getattr(oslo_context, 'trust_id', None)
app_cred_id = getattr(token, 'application_credential_id', None)
access_token_id = getattr(token, 'access_token_id', None)

if not (trust_id or app_cred_id or access_token_id):
return

token_project_id = oslo_context.project_id
cred_project_id = credential.get('project_id')

if cred_project_id != token_project_id:
if CONF.security_compliance.allow_insecure_admin_trust_cross_project_credentials_access:
# When insecure cross-project access is enabled, still restrict to
# admin-role delegated tokens only. See LP#2150089.
try:
ENFORCER.enforce_call(action='admin_required')
return
except Exception:
pass
raise exception.ForbiddenAction(
action=_(
'Credential project does not match the '
'project scope of the delegated token'
)
)


def _build_target_enforcement():
target = {}
try:
Expand Down Expand Up @@ -112,6 +164,7 @@ def _list_credentials(self):
# If the request was filtered, make sure to return only the
# credentials specific to that user. This makes it so that users with
# roles on projects can't see credentials that aren't theirs.
token = self.auth_context['token']
filtered_refs = []
for ref in refs:
# Check each credential again to make sure the user has access to
Expand All @@ -124,8 +177,9 @@ def _list_credentials(self):
action='identity:get_credential',
target_attr={'credential': cred}
)
_check_credential_project_scope(token, self.oslo_context, cred)
filtered_refs.append(ref)
except exception.Forbidden:
except (exception.Forbidden, exception.ForbiddenAction):
pass
refs = filtered_refs
refs = [self._blob_to_json(r) for r in refs]
Expand All @@ -137,6 +191,9 @@ def _get_credential(self, credential_id):
build_target=_build_target_enforcement
)
credential = PROVIDERS.credential_api.get_credential(credential_id)
_check_credential_project_scope(
self.auth_context['token'], self.oslo_context, credential
)
return self.wrap_member(self._blob_to_json(credential))

def get(self, credential_id=None):
Expand All @@ -155,12 +212,27 @@ def post(self):
ENFORCER.enforce_call(
action='identity:create_credential', target_attr=target
)
token = self.auth_context['token']
if credential.get('type', '').lower() == 'ec2':
_check_unrestricted_application_credential(token)
validation.lazy_validate(schema.credential_create, credential)
trust_id = getattr(self.oslo_context, 'trust_id', None)
app_cred_id = getattr(
self.auth_context['token'], 'application_credential_id', None)
access_token_id = getattr(
self.auth_context['token'], 'access_token_id', None)
app_cred_id = getattr(token, 'application_credential_id', None)
access_token_id = getattr(token, 'access_token_id', None)
_check_credential_project_scope(token, self.oslo_context, credential)
if (
app_cred_id is not None
and credential.get('type', '').lower() == 'ec2'
):
ac_api = PROVIDERS.application_credential_api
app_cred = ac_api.get_application_credential(app_cred_id)
if credential.get('project_id') != app_cred['project_id']:
action = _(
'EC2 credential project_id must match the '
'project of the application credential used '
'to authenticate'
)
raise exception.ForbiddenAction(action=action)
ref = self._assign_unique_id(
self._normalize_dict(credential),
trust_id=trust_id, app_cred_id=app_cred_id,
Expand Down Expand Up @@ -189,7 +261,9 @@ def patch(self, credential_id):
build_target=_build_target_enforcement
)
current = PROVIDERS.credential_api.get_credential(credential_id)

_check_credential_project_scope(
self.auth_context['token'], self.oslo_context, current
)
credential = self.request_body_json.get('credential', {})
validation.lazy_validate(schema.credential_update, credential)
self._validate_blob_update_keys(current.copy(), credential.copy())
Expand All @@ -209,6 +283,10 @@ def delete(self, credential_id):
action='identity:delete_credential',
build_target=_build_target_enforcement
)
credential = PROVIDERS.credential_api.get_credential(credential_id)
_check_credential_project_scope(
self.auth_context['token'], self.oslo_context, credential
)

return (PROVIDERS.credential_api.delete_credential(
credential_id, initiator=self.audit_initiator),
Expand Down
44 changes: 35 additions & 9 deletions keystone/api/trusts.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@
from oslo_policy import _checks as op_checks

from keystone.api._shared import json_home_relations
from keystone.common import authorization
from keystone.common import context
from keystone.common import json_home
from keystone.common import provider_api
from keystone.common import rbac_enforcer
from keystone.common.rbac_enforcer import policy
from keystone.common import utils
from keystone.common import validation
import keystone.conf
from keystone import exception
from keystone.i18n import _
from keystone.server import flask as ks_flask
from keystone.trust import schema


LOG = log.getLogger(__name__)
CONF = keystone.conf.CONF
ENFORCER = rbac_enforcer.RBACEnforcer
PROVIDERS = provider_api.ProviderAPIs

Expand All @@ -46,6 +49,30 @@
parameter_name='trust_id')


def _check_application_credential():
"""Block application credential tokens from all trust operations.

Application credentials are single-project delegation tokens. Allowing
them to read or manage trusts would permit a compromised application
credential to enumerate or manipulate the trust delegation chain,
expanding its effective scope beyond the single project it was issued for.
This applies regardless of the 'unrestricted' flag.
"""
if CONF.security_compliance.allow_insecure_application_credential_trust_escalation:
return
auth_context = flask.request.environ.get(
authorization.AUTH_CONTEXT_ENV, {}
)
token = auth_context.get('token')
if token and 'application_credential' in token.methods:
raise exception.ForbiddenAction(
action=_(
"Using method 'application_credential' is not "
"allowed for managing trusts."
)
)


def _build_trust_target_enforcement():
target = {}
# NOTE(cmurphy) unlike other APIs, in the event the trust doesn't exist or
Expand Down Expand Up @@ -101,14 +128,7 @@ class TrustResource(ks_flask.ResourceBase):
json_home_parameter_rel_func = _build_parameter_relation

def _check_unrestricted(self):
if self.oslo_context.is_admin:
return
token = self.auth_context['token']
if 'application_credential' in token.methods:
if not token.application_credential['unrestricted']:
action = _("Using method 'application_credential' is not "
"allowed for managing trusts.")
raise exception.ForbiddenAction(action=action)
_check_application_credential()

def _find_redelegated_trust(self):
# Check if delegated via trust
Expand Down Expand Up @@ -166,6 +186,7 @@ def _normalize_role_list(self, trust_roles):
def _get_trust(self, trust_id):
ENFORCER.enforce_call(action='identity:get_trust',
build_target=_build_trust_target_enforcement)
_check_application_credential()

# NOTE(cmurphy) look up trust before doing is_admin authorization - to
# maintain the API contract, we expect a missing trust to raise a 404
Expand Down Expand Up @@ -214,6 +235,7 @@ def _list_trusts(self):
target_attr=target)
else:
ENFORCER.enforce_call(action='identity:list_trusts')
_check_application_credential()

trusts = []

Expand Down Expand Up @@ -274,8 +296,10 @@ def post(self):

The User creating the trust must be the trustor.
"""
ENFORCER.enforce_call(action='identity:create_trust')
trust = self.request_body_json.get('trust', {})
ENFORCER.enforce_call(
action='identity:create_trust', target_attr={'trust': trust}
)
validation.lazy_validate(schema.trust_create, trust)
self._check_unrestricted()

Expand Down Expand Up @@ -361,6 +385,7 @@ def get(self, trust_id):
# block access here
raise exception.ForbiddenAction(
action=_('Requested user has no relation to this trust'))
_check_application_credential()

trust = PROVIDERS.trust_api.get_trust(trust_id)

Expand Down Expand Up @@ -410,6 +435,7 @@ def get(self, trust_id, role_id):
# block access here
raise exception.ForbiddenAction(
action=_('Requested user has no relation to this trust'))
_check_application_credential()

trust = PROVIDERS.trust_api.get_trust(trust_id)

Expand Down
Loading
Loading