@@ -21,6 +21,7 @@
from keystone.common import utils
from keystone import exception
from keystone.models import revoke_model
from keystone.revoke.backends import sql
from keystone.tests import unit
from keystone.tests.unit import test_backend_sql
from keystone.token import provider
@@ -113,6 +114,16 @@ def _matches(event, token_values):
class RevokeTests (object ):
def _assertTokenRevoked (self , events , token_data ):
return self .assertTrue(
revoke_model.is_revoked(events, token_data),
' Token should be revoked' )
def _assertTokenNotRevoked (self , events , token_data ):
return self .assertFalse(
revoke_model.is_revoked(events, token_data),
' Token should not be revoked' )
def test_list (self ):
self .revoke_api.revoke_by_user(user_id = 1 )
self .assertEqual(1 , len (self .revoke_api.list_events()))
@@ -129,6 +140,204 @@ def test_list_since(self):
self .assertEqual(0 ,
len (self .revoke_api.list_events(last_fetch = future)))
def test_list_revoked_user (self ):
revocation_backend = sql.Revoke()
events = []
# This simulates creating a token for a specific user. When we revoke
# the token we should have a single revocation event in the list. We
# are going to assert that the token values match the only revocation
# event in the backend.
first_token = _sample_blank_token()
first_token[' user_id' ] = uuid.uuid4().hex
add_event(
events, revoke_model.RevokeEvent(user_id = first_token[' user_id' ])
)
self .revoke_api.revoke_by_user(user_id = first_token[' user_id' ])
self ._assertTokenRevoked(events, first_token)
self .assertEqual(
1 , len (revocation_backend.list_events(token = first_token))
)
# This simulates creating a separate token for a separate user. We are
# going to revoke the token just like we did for the previous token.
# We should have two revocation events stored in the backend but only
# one should match the values of the second token.
second_token = _sample_blank_token()
second_token[' user_id' ] = uuid.uuid4().hex
add_event(
events, revoke_model.RevokeEvent(user_id = second_token[' user_id' ])
)
self .revoke_api.revoke_by_user(user_id = second_token[' user_id' ])
self ._assertTokenRevoked(events, second_token)
self .assertEqual(
1 , len (revocation_backend.list_events(token = second_token))
)
# This simulates creating another separate token for a separate user,
# but we're not going to issue a revocation event. Even though we have
# two revocation events persisted in the backend, neither of them
# should match the values of the third token. If they did - our
# revocation event matching would be too heavy handed, which would
# result in over-generalized revocation patterns.
third_token = _sample_blank_token()
third_token[' user_id' ] = uuid.uuid4().hex
self ._assertTokenNotRevoked(events, third_token)
self .assertEqual(
0 , len (revocation_backend.list_events(token = third_token))
)
# This gets a token but overrides the user_id of the token to be None.
# Technically this should never happen because tokens must belong to
# a user. What we're testing here is that the two revocation events
# we've created won't match None values for the user_id.
fourth_token = _sample_blank_token()
fourth_token[' user_id' ] = None
self ._assertTokenNotRevoked(events, fourth_token)
self .assertEqual(
0 , len (revocation_backend.list_events(token = fourth_token))
)
def test_list_revoked_project (self ):
revocation_backend = sql.Revoke()
events = []
token = _sample_blank_token()
# Create a token for a project, revoke token, check the token we
# created has been revoked, and check the list returned a match for
# the token when passed in.
first_token = _sample_blank_token()
first_token[' project_id' ] = uuid.uuid4().hex
add_event(events, revoke_model.RevokeEvent(
project_id = first_token[' project_id' ]))
revocation_backend.revoke(revoke_model.RevokeEvent(
project_id = first_token[' project_id' ]))
self ._assertTokenRevoked(events, first_token)
self .assertEqual(1 , len (revocation_backend.list_events(
token = first_token)))
# Create a second token, revoke it, check the token has been revoked,
# and check the list to make sure that even though we now have 2
# revoked events in the revocation list, it will only return 1 because
# only one match for our second_token should exist
second_token = _sample_blank_token()
second_token[' project_id' ] = uuid.uuid4().hex
add_event(events, revoke_model.RevokeEvent(
project_id = second_token[' project_id' ]))
revocation_backend.revoke(revoke_model.RevokeEvent(
project_id = second_token[' project_id' ]))
self ._assertTokenRevoked(events, second_token)
self .assertEqual(
1 , len (revocation_backend.list_events(token = second_token)))
# This gets a token but overrides project_id of the token to be None.
# We expect that since there are two events which both have populated
# project_ids, this should not match this third_token with any other
# event in the list so we should recieve 0
third_token = _sample_blank_token()
third_token[' project_id' ] = None
self ._assertTokenNotRevoked(events, token)
self .assertEqual(0 , len (revocation_backend.list_events(token = token)))
def test_list_revoked_audit (self ):
revocation_backend = sql.Revoke()
events = []
# Create a token with audit_id set, revoke it, check it is revoked,
# check to make sure that list_events matches the token to the event we
# just revoked.
first_token = _sample_blank_token()
first_token[' audit_id' ] = provider.random_urlsafe_str()
add_event(events, revoke_model.RevokeEvent(
audit_id = first_token[' audit_id' ]))
self .revoke_api.revoke_by_audit_id(
audit_id = first_token[' audit_id' ])
self ._assertTokenRevoked(events, first_token)
self .assertEqual(
1 , len (revocation_backend.list_events(token = first_token)))
# Create a second token, revoke it, check it is revoked, check to make
# sure that list events only finds 1 match since there are 2 and they
# dont both have different populated audit_id fields
second_token = _sample_blank_token()
second_token[' audit_id' ] = provider.random_urlsafe_str()
add_event(events, revoke_model.RevokeEvent(
audit_id = second_token[' audit_id' ]))
self .revoke_api.revoke_by_audit_id(
audit_id = second_token[' audit_id' ])
self ._assertTokenRevoked(events, second_token)
self .assertEqual(
1 , len (revocation_backend.list_events(token = second_token)))
# Create a third token with audit_id set to None to make sure that
# since there are no events currently revoked with audit_id None this
# finds no matches
third_token = _sample_blank_token()
third_token[' audit_id' ] = None
self ._assertTokenNotRevoked(events, third_token)
self .assertEqual(
0 , len (revocation_backend.list_events(token = third_token)))
def test_list_revoked_since (self ):
revocation_backend = sql.Revoke()
token = _sample_blank_token()
self .revoke_api.revoke_by_user(user_id = None )
self .revoke_api.revoke_by_user(user_id = None )
self .assertEqual(2 , len (revocation_backend.list_events(token = token)))
future = timeutils.utcnow() + datetime.timedelta(seconds = 1000 )
token[' issued_at' ] = future
self .assertEqual(0 , len (revocation_backend.list_events(token = token)))
def test_list_revoked_multiple_filters (self ):
revocation_backend = sql.Revoke()
events = []
# create token that sets key/value filters in list_revoked
first_token = _sample_blank_token()
first_token[' user_id' ] = uuid.uuid4().hex
first_token[' project_id' ] = uuid.uuid4().hex
first_token[' audit_id' ] = provider.random_urlsafe_str()
# revoke event and then verify that that there is only one revocation
# and verify the only revoked event is the token
add_event(events, revoke_model.RevokeEvent(
user_id = first_token[' user_id' ],
project_id = first_token[' project_id' ],
audit_id = first_token[' audit_id' ]))
self .revoke_api.revoke(revoke_model.RevokeEvent(
user_id = first_token[' user_id' ],
project_id = first_token[' project_id' ],
audit_id = first_token[' audit_id' ]))
self ._assertTokenRevoked(events, first_token)
self .assertEqual(
1 , len (revocation_backend.list_events(token = first_token)))
# If a token has None values which the event contains it shouldn't
# match and not be revoked
second_token = _sample_blank_token()
self ._assertTokenNotRevoked(events, second_token)
self .assertEqual(
0 , len (revocation_backend.list_events(token = second_token)))
# If an event column and corresponding dict value don't match, Then
# it should not add the event in the list. Demonstrate for project
third_token = _sample_blank_token()
third_token[' project_id' ] = uuid.uuid4().hex
self ._assertTokenNotRevoked(events, third_token)
self .assertEqual(
0 , len (revocation_backend.list_events(token = third_token)))
# A revoked event with user_id as null and token user_id non null
# should still be return an event and be revoked if other non null
# event fields match non null token fields
fourth_token = _sample_blank_token()
fourth_token[' user_id' ] = uuid.uuid4().hex
fourth_token[' project_id' ] = uuid.uuid4().hex
fourth_token[' audit_id' ] = provider.random_urlsafe_str()
add_event(events, revoke_model.RevokeEvent(
project_id = fourth_token[' project_id' ],
audit_id = fourth_token[' audit_id' ]))
self .revoke_api.revoke(revoke_model.RevokeEvent(
project_id = fourth_token[' project_id' ],
audit_id = fourth_token[' audit_id' ]))
self ._assertTokenRevoked(events, fourth_token)
self .assertEqual(
1 , len (revocation_backend.list_events(token = fourth_token)))
@mock.patch.object (timeutils, ' utcnow' )
def test_expired_events_are_removed (self , mock_utcnow ):
def _sample_token_values ():
0 comments on commit
9e84371