Skip to content


Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

87 lines (69 sloc) 2.623 kb
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import memcache
from keystone.common import utils
from keystone import exception
from keystone.openstack.common import timeutils
from keystone import test
from keystone.token.backends import memcache as token_memcache
import test_backend
class MemcacheClient(object):
"""Replicates a tiny subset of memcached client interface."""
def __init__(self, *args, **kwargs):
"""Ignores the passed in args."""
self.cache = {}
def add(self, key, value):
if self.get(key):
return False
self.set(key, value)
def append(self, key, value):
existing_value = self.get(key)
if existing_value:
self.set(key, existing_value + value)
return True
return False
def check_key(self, key):
if not isinstance(key, str):
raise memcache.Client.MemcachedStringEncodingError()
def get(self, key):
"""Retrieves the value for a key or None."""
obj = self.cache.get(key)
now = utils.unixtime(timeutils.utcnow())
if obj and (obj[1] == 0 or obj[1] > now):
return obj[0]
def set(self, key, value, time=0):
"""Sets the value for a key."""
self.cache[key] = (value, time)
return True
def delete(self, key):
del self.cache[key]
except KeyError:
#NOTE(bcwaldon): python-memcached always returns the same value
class MemcacheToken(test.TestCase, test_backend.TokenTests):
def setUp(self):
super(MemcacheToken, self).setUp()
fake_client = MemcacheClient()
self.token_api = token_memcache.Token(client=fake_client)
def test_get_unicode(self):
token_id = unicode(uuid.uuid4().hex)
data = {'id': token_id, 'a': 'b'}
self.token_api.create_token(token_id, data)
Jump to Line
Something went wrong with that request. Please try again.