Skip to content

Commit

Permalink
Use persistent.ring for true LRU semantics.
Browse files Browse the repository at this point in the history
Drop the complicated bucket scheme.

This does require us to go back to locking for reads and writes, but it
doesn't hurt us on read benchmarks. It may slightly slow down writes but
it's within the margins.

Fixes #103. Fixes #102.
  • Loading branch information
jamadden committed Jul 7, 2016
1 parent 9946350 commit 1568538
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 147 deletions.
6 changes: 5 additions & 1 deletion CHANGES.rst
Expand Up @@ -58,10 +58,14 @@
longer has the side-effect of registering ``PyMySQL`` as ``MySQLdb`` and
``psycopg2cffi`` as ``psycopg2``.) See :issue:`86`.

- The in-memory cache allows for much higher levels of concurrent
- The in-memory cache allows for higher levels of concurrent
operation via finer-grained locks. For example, compression and
decompression are no longer done while holding a lock.

- The in-memory cache now uses a better approximation of a LRU
algorithm with less overhead, so more data should fit in the same
size cache. (For best performance, CFFI should be installed.)

2.0.0b1 (2016-06-28)
====================

Expand Down
178 changes: 71 additions & 107 deletions relstorage/cache.py
Expand Up @@ -18,6 +18,7 @@
from ZODB.utils import u64
from ZODB.POSException import ReadConflictError
from persistent.TimeStamp import TimeStamp
from persistent.ring import Ring

import importlib
import logging
Expand Down Expand Up @@ -571,32 +572,38 @@ def _suggest_shifted_checkpoints(self, tid_int, oversize):
"len(delta_after0) == %d.", old_value, len(self.delta_after0))


class SizeOverflow(Exception):
"""Too much memory would be consumed by a new key"""

class _RingEntry(object):
__slots__ = ('_p_oid', '_Persistent__ring', 'value')

def __init__(self, key, value):
self._p_oid = key
self.value = value


class LocalClientBucket(object):
"""
A map that keeps a record of its approx. size.
keys must be `str`` and values must be bytestrings.
keys must be `str`` and values must be byte strings.
This class is not threadsafe, accesses to __setitem__ and get_and_bubble_all
must be protected by a lock.
"""
__slots__ = ('size', 'limit', '_dict', '_next_bucket')
__slots__ = ('size', 'limit', '_dict', '_ring')

def __init__(self, limit, next_bucket=None):
def __init__(self, limit):
self._dict = {}
self._next_bucket = next_bucket
self._ring = Ring()
self.size = 0
self.limit = limit

def set_ignore_size(self, key, value):
self.__setitem__(key, value, False)

def __setitem__(self, key, value, check_size=True):
def __setitem__(self, key, value):
"""
Set an item.
Throws SizeOverflow if the new item would cause this map to
surpass its memory limit.
If the memory limit would be exceeded, remove old items until
that is no longer the case.
"""
# These types are gated by LocalClient, we don't need to double
# check.
Expand All @@ -606,42 +613,54 @@ def __setitem__(self, key, value, check_size=True):
sizedelta = len(value)

if key in self._dict:
oldvalue = self._dict[key]
entry = self._dict[key]
oldvalue = entry.value
sizedelta -= len(oldvalue)
entry.value = value
self._ring.move_to_head(entry)
else:
sizedelta += len(key)
entry = _RingEntry(key, value)
self._ring.add(entry)
self._dict[key] = entry

while self._dict and self.size + sizedelta > self.limit:
oldest = next(iter(self._ring))
if oldest._p_oid is key:
break
self.__delitem__(oldest._p_oid)

if check_size and self.size + sizedelta > self.limit:
raise SizeOverflow()
self._dict[key] = value
self.size += sizedelta
return True

def __getitem__(self, key):
return self._dict[key]

def __contains__(self, key):
return key in self._dict

def __delitem__(self, key):
oldvalue = self._dict[key]
entry = self._dict[key]
oldvalue = entry.value
del self._dict[key]
self._ring.delete(entry)
sizedelta = len(key)
sizedelta += len(oldvalue)
self.size -= sizedelta

def get_and_bubble(self, key, parent):
if key in self._dict:
res = self._dict[key]
if parent is not None:
self.__delitem__(key)
parent.__setitem__(key, res, False)
return res
elif self._next_bucket is not None:
return self._next_bucket.get_and_bubble(key, self)
def get_and_bubble_all(self, keys):
dct = self._dict
rng = self._ring
res = {}
for key in keys:
entry = dct.get(key)
if entry is not None:
rng.move_to_head(entry)
res[key] = entry.value
return res

def has_key(self, key):
return self.get_and_bubble(key, None) is not None
def get(self, key):
# Testing only. Does not bubble.
entry = self._dict.get(key)
if entry is not None:
return entry.value


class LocalClient(object):
Expand All @@ -650,21 +669,11 @@ class LocalClient(object):
_compress = None
_decompress = None

MAX_CLIENT_BUCKETS = 5

def __init__(self, options):
self._lock = threading.Lock()
# We need at least 2 buckets to make this algorithm work, but will
# shift through up to MAX_CLIENT_BUCKETS of them. This is a balance between how much we
# throw-away when we GC vs how much iteration we have to do to find
# any given key.
self._bucket_count = max(options.cache_local_mb // 10, 2)
self._bucket_count = min(self._bucket_count, self.MAX_CLIENT_BUCKETS)
self._bucket_limit = int(1000000 * options.cache_local_mb // self._bucket_count)
self._bucket_limit = int(1000000 * options.cache_local_mb)
self._value_limit = options.cache_local_object_max
# We store the buckets together in a tuple so they can be atomically
# switched out together.
self.__buckets = ()
self.__bucket = None
self.flush_all()

compression_module = options.cache_local_compression
Expand All @@ -675,81 +684,34 @@ def __init__(self, options):

@property
def _bucket0(self):
return self.__buckets[0]

@property
def _bucket1(self):
return self.__buckets[1]
# For testing only.
return self.__bucket

def flush_all(self):
with self._lock:
self.__buckets = (LocalClientBucket(self._bucket_limit),)
for _i in range(self._bucket_count - 1):
self.__buckets = (LocalClientBucket(self._bucket_limit, self.__buckets[0]),) + self.__buckets
self.__bucket = LocalClientBucket(self._bucket_limit)

def get(self, key):
return self.get_multi([key]).get(key)

def get_multi(self, keys):
res = {}
decompress = self._decompress
# Read from a self-consistent set of buckets.
buckets = self.__buckets
bucket0 = buckets[0]
get = self.__bucket.get_and_bubble_all

for key in keys:
v = bucket0.get_and_bubble(key, None)
if v is not None:
res[key] = v
with self._lock:
res = get(keys)

# Finally, while not holding the lock, decompress if needed
if decompress:
res = {k: decompress(v) for k, v in iteritems(res)}
res = {k: decompress(v)
for k, v in iteritems(res)}

return res

def _set_one(self, key, cvalue, buckets):
bucket0 = buckets[0]
try:
bucket0[key] = cvalue
# In case thread 1 is writing little tiny keys that will never
# split, but thread 2 writes one massive key that does split,
# always return current buckets.
buckets = self.__buckets
except SizeOverflow:
# Shift bucket0 to bucket1.
new_buckets = (LocalClientBucket(self._bucket_limit, bucket0),) + buckets[:-1]
with self._lock:
if self.__buckets is buckets:
# We won
self.__buckets = new_buckets
new_buckets[-1]._next_bucket = None
# Watch for the log message below to decide whether the
# cache_local_mb parameter is set to a reasonable value.
# The log message indicates that old cache data has
# been garbage collected.
log.debug("LocalClient buckets shifted")

buckets = new_buckets = self.__buckets

bucket0 = buckets[0]

try:
bucket0[key] = cvalue
except SizeOverflow:
# The value doesn't fit in the cache at all, apparently.
pass

return buckets

def set(self, key, value):
self.set_multi({key: value})

def _has_key(self, key, buckets):
for bucket in buckets:
if key in bucket:
return True

def set_multi(self, d, allow_replace=True):
if not self._bucket_limit:
# don't bother
Expand All @@ -770,16 +732,18 @@ def set_multi(self, d, allow_replace=True):
continue
items.append((key, cvalue))

buckets = self.__buckets
bucket0 = self.__bucket
has_key = bucket0.__contains__
set_key = bucket0.__setitem__

for key, cvalue in items:
if not allow_replace and buckets[0].has_key(key):
continue
# Bucket0 could be shifted out at any
# point during this operation, and that's ok
# because it would mean the key still goes away

buckets = self._set_one(key, cvalue, buckets)
with self._lock:
for key, cvalue in items:
if not allow_replace and has_key(key):
continue
# Bucket0 could be shifted out at any
# point during this operation, and that's ok
# because it would mean the key still goes away
set_key(key, cvalue)

def add(self, key, value):
self.set_multi({key: value}, allow_replace=False)
Expand Down

0 comments on commit 1568538

Please sign in to comment.