Skip to content

Commit

Permalink
Fix security issues with EC2 credentials
Browse files Browse the repository at this point in the history
This change addresses several issues in the creation and use of EC2/S3
credentials with keystone tokens.

1. Disable altering credential owner attributes or metadata

Without this patch, an authenticated user can create an EC2 credential
for themself for a project they have a role on, then update the
credential to target a user and project completely unrelated to them. In
the worst case, this could be the admin user and a project the admin
user has a role assignment on. A token granted for an altered credential
like this would allow the user to masquerade as the victim user. This
patch ensures that when updating a credential, the new form of the
credential is one the acting user has access to: if the system admin
user is changing the credential, the new user ID or project ID could be
anything, but regular users may only change the credential to be one
that they still own.

Relatedly, when a user uses an application credential or a trust to
create an EC2 credential, keystone automatically adds the trust ID or
application credential ID as metadata in the EC2 access blob so that it
knows how the token can be scoped when it is used. Without this patch, a
user who has created a credential in this way can update the access blob
to remove or alter this metadata and escalate their privileges to be
fully authorized for the trustor's, application credential creator's, or
OAuth1 access token authorizor's privileges on the project. This patch
fixes the issue by simply disallowing updates to keystone-controlled
metadata in the credential.

2. Respect token roles when creating EC2 credentials

Without this patch, a trustee, an application credential user, or an
OAuth1 access token holder could create an EC2 credential or an
application credential using any roles the trustor, application
credential creator, or access token authorizor had on the project,
regardless of whether the creator had delegated only a limited subset of
roles. This was because the trust_id attribute of the EC2 access blob
was ignored, and no metadata for the application credential or access
token was recorded either. This change ensures that the access
delegation resource is recorded in the metadata of the EC2 credential
when created and passed to the token provider when used for
authentication so that the token provider can look up the correct roles
for the request.

Change-Id: I39d0d705839fbe31ac518ac9a82959e108cb7c1d
Closes-bug: #1872733
Closes-bug: #1872755
Closes-bug: #1872735
  • Loading branch information
cmurphy committed May 2, 2020
1 parent 6c73690 commit 37e9907
Show file tree
Hide file tree
Showing 8 changed files with 603 additions and 59 deletions.
45 changes: 40 additions & 5 deletions keystone/api/_shared/EC2_S3_Resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ def handle_authenticate(self):
project_id=cred.get('project_id'),
access=loaded.get('access'),
secret=loaded.get('secret'),
trust_id=loaded.get('trust_id')
trust_id=loaded.get('trust_id'),
app_cred_id=loaded.get('app_cred_id'),
access_token_id=loaded.get('access_token_id')
)

# validate the signature
Expand All @@ -132,8 +134,34 @@ def handle_authenticate(self):
raise ks_exceptions.Unauthorized from e

self._check_timestamp(credentials)
roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
user_ref['id'], project_ref['id'])

trustee_user_id = None
auth_context = None
if cred_data['trust_id']:
trust = PROVIDERS.trust_api.get_trust(cred_data['trust_id'])
roles = [r['id'] for r in trust['roles']]
# NOTE(cmurphy): if this credential was created using a
# trust-scoped token with impersonation, the user_id will be for
# the trustor, not the trustee. In this case, issuing a
# trust-scoped token to the trustor will fail. In order to get a
# trust-scoped token, use the user ID of the trustee. With
# impersonation, the resulting token will still be for the trustor.
# Without impersonation, the token will be for the trustee.
if trust['impersonation'] is True:
trustee_user_id = trust['trustee_user_id']
elif cred_data['app_cred_id']:
ac_client = PROVIDERS.application_credential_api
app_cred = ac_client.get_application_credential(
cred_data['app_cred_id'])
roles = [r['id'] for r in app_cred['roles']]
elif cred_data['access_token_id']:
access_token = PROVIDERS.oauth_api.get_access_token(
cred_data['access_token_id'])
roles = jsonutils.loads(access_token['role_ids'])
auth_context = {'access_token_id': cred_data['access_token_id']}
else:
roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
user_ref['id'], project_ref['id'])

if not roles:
raise ks_exceptions.Unauthorized(_('User not valid for project.'))
Expand All @@ -144,7 +172,14 @@ def handle_authenticate(self):

method_names = ['ec2credential']

if trustee_user_id:
user_id = trustee_user_id
else:
user_id = user_ref['id']
token = PROVIDERS.token_provider_api.issue_token(
user_id=user_ref['id'], method_names=method_names,
project_id=project_ref['id'])
user_id=user_id, method_names=method_names,
project_id=project_ref['id'],
trust_id=cred_data['trust_id'],
app_cred_id=cred_data['app_cred_id'],
auth_context=auth_context)
return token
71 changes: 53 additions & 18 deletions keystone/api/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,41 @@ def _blob_to_json(ref):
ref['blob'] = jsonutils.dumps(blob)
return ref

def _assign_unique_id(self, ref, trust_id=None):
def _validate_blob_json(self, ref):
try:
blob = jsonutils.loads(ref.get('blob'))
except (ValueError, TabError):
raise exception.ValidationError(
message=_('Invalid blob in credential'))
if not blob or not isinstance(blob, dict):
raise exception.ValidationError(attribute='blob',
target='credential')
if blob.get('access') is None:
raise exception.ValidationError(attribute='access',
target='credential')
return blob

def _assign_unique_id(
self, ref, trust_id=None, app_cred_id=None, access_token_id=None):
# Generates an assigns a unique identifier to a credential reference.
if ref.get('type', '').lower() == 'ec2':
try:
blob = jsonutils.loads(ref.get('blob'))
except (ValueError, TabError):
raise exception.ValidationError(
message=_('Invalid blob in credential'))
if not blob or not isinstance(blob, dict):
raise exception.ValidationError(attribute='blob',
target='credential')
if blob.get('access') is None:
raise exception.ValidationError(attribute='access',
target='credential')

blob = self._validate_blob_json(ref)
ref = ref.copy()
ref['id'] = hashlib.sha256(
blob['access'].encode('utf8')).hexdigest()
# update the blob with the trust_id, so credentials created with
# a trust scoped token will result in trust scoped tokens when
# authentication via ec2tokens happens
# update the blob with the trust_id or app_cred_id, so credentials
# created with a trust- or app cred-scoped token will result in
# trust- or app cred-scoped tokens when authentication via
# ec2tokens happens
if trust_id is not None:
blob['trust_id'] = trust_id
ref['blob'] = jsonutils.dumps(blob)
if app_cred_id is not None:
blob['app_cred_id'] = app_cred_id
ref['blob'] = jsonutils.dumps(blob)
if access_token_id is not None:
blob['access_token_id'] = access_token_id
ref['blob'] = jsonutils.dumps(blob)
return ref
else:
return super(CredentialResource, self)._assign_unique_id(ref)
Expand Down Expand Up @@ -146,23 +157,47 @@ def post(self):
)
validation.lazy_validate(schema.credential_create, credential)
trust_id = getattr(self.oslo_context, 'trust_id', None)
app_cred_id = getattr(
self.auth_context['token'], 'application_credential_id', None)
access_token_id = getattr(
self.auth_context['token'], 'access_token_id', None)
ref = self._assign_unique_id(
self._normalize_dict(credential), trust_id=trust_id)
self._normalize_dict(credential),
trust_id=trust_id, app_cred_id=app_cred_id,
access_token_id=access_token_id)
ref = PROVIDERS.credential_api.create_credential(
ref['id'], ref, initiator=self.audit_initiator)
return self.wrap_member(ref), http.client.CREATED

def _validate_blob_update_keys(self, credential, ref):
if credential.get('type', '').lower() == 'ec2':
new_blob = self._validate_blob_json(ref)
old_blob = credential.get('blob')
if isinstance(old_blob, str):
old_blob = jsonutils.loads(old_blob)
# if there was a scope set, prevent changing it or unsetting it
for key in ['trust_id', 'app_cred_id', 'access_token_id']:
if old_blob.get(key) != new_blob.get(key):
message = _('%s can not be updated for credential') % key
raise exception.ValidationError(message=message)

def patch(self, credential_id):
# Update Credential
ENFORCER.enforce_call(
action='identity:update_credential',
build_target=_build_target_enforcement
)
PROVIDERS.credential_api.get_credential(credential_id)
current = PROVIDERS.credential_api.get_credential(credential_id)

credential = self.request_body_json.get('credential', {})
validation.lazy_validate(schema.credential_update, credential)
self._validate_blob_update_keys(current.copy(), credential.copy())
self._require_matching_id(credential)
# Check that the user hasn't illegally modified the owner or scope
target = {'credential': dict(current, **credential)}
ENFORCER.enforce_call(
action='identity:update_credential', target_attr=target
)
ref = PROVIDERS.credential_api.update_credential(
credential_id, credential)
return self.wrap_member(ref)
Expand Down
22 changes: 20 additions & 2 deletions keystone/api/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,25 @@ def _normalize_role_list(app_cred_roles):
role['name']))
return roles

def _get_roles(self, app_cred_data, token):
if app_cred_data.get('roles'):
roles = self._normalize_role_list(app_cred_data['roles'])
# NOTE(cmurphy): The user is not allowed to add a role that is not
# in their token. This is to prevent trustees or application
# credential users from escallating their privileges to include
# additional roles that the trustor or application credential
# creator has assigned on the project.
token_roles = [r['id'] for r in token.roles]
for role in roles:
if role['id'] not in token_roles:
detail = _('Cannot create an application credential with '
'unassigned role')
raise ks_exception.ApplicationCredentialValidationError(
detail=detail)
else:
roles = token.roles
return roles

def get(self, user_id):
"""List application credentials for user.
Expand Down Expand Up @@ -594,8 +613,7 @@ def post(self, user_id):
app_cred_data['secret'] = self._generate_secret()
app_cred_data['user_id'] = user_id
app_cred_data['project_id'] = project_id
app_cred_data['roles'] = self._normalize_role_list(
app_cred_data.get('roles', token.roles))
app_cred_data['roles'] = self._get_roles(app_cred_data, token)
if app_cred_data.get('expires_at'):
app_cred_data['expires_at'] = utils.parse_expiration_date(
app_cred_data['expires_at'])
Expand Down
31 changes: 31 additions & 0 deletions keystone/tests/unit/test_v3_application_credential.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,37 @@ def test_create_application_credential_with_application_credential(self):
expected_status_code=http.client.FORBIDDEN,
headers={'X-Auth-Token': token})

def test_create_application_credential_with_trust(self):
second_role = unit.new_role_ref(name='reader')
PROVIDERS.role_api.create_role(second_role['id'], second_role)
PROVIDERS.assignment_api.add_role_to_user_and_project(
self.user_id, self.project_id, second_role['id'])
with self.test_client() as c:
pw_token = self.get_scoped_token()
# create a self-trust - only the roles are important for this test
trust_ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.user_id,
project_id=self.project_id,
role_ids=[second_role['id']])
resp = c.post('/v3/OS-TRUST/trusts',
headers={'X-Auth-Token': pw_token},
json={'trust': trust_ref})
trust_id = resp.json['trust']['id']
trust_auth = self.build_authentication_request(
user_id=self.user_id,
password=self.user['password'],
trust_id=trust_id)
trust_token = self.v3_create_token(
trust_auth).headers['X-Subject-Token']
app_cred = self._app_cred_body(roles=[{'id': self.role_id}])
# only the roles from the trust token should be allowed, even if
# the user has the role assigned on the project
c.post('/v3/users/%s/application_credentials' % self.user_id,
headers={'X-Auth-Token': trust_token},
json=app_cred,
expected_status_code=http.client.BAD_REQUEST)

def test_create_application_credential_allow_recursion(self):
with self.test_client() as c:
roles = [{'id': self.role_id}]
Expand Down

0 comments on commit 37e9907

Please sign in to comment.