Skip to content

Commit

Permalink
Make the local cache smart enough to handle changes in compression se…
Browse files Browse the repository at this point in the history
…ttings in pickled files.
  • Loading branch information
jamadden committed Jul 8, 2016
1 parent f96d277 commit b25bff0
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 28 deletions.
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@
for a discussion, and see the options ``cache-local-dir`` and
``cache-local-dir-count``.

- The in-memory cache is now smart enough not to store compressed
objects that grow during compression, and it uses the same
compression markers as zc.zlibstorage to avoid double-compression.
It can also gracefully handle changes to the compression format in
persistent files.

2.0.0b1 (2016-06-28)
====================

Expand Down
8 changes: 4 additions & 4 deletions doc/relstorage-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,10 @@ cache-local-compression

The default is ``zlib``.

If you use a compressing storage wrapper, such as
`zc.zlibstorage
<https://pypi.python.org/pypi/zc.zlibstorage>`_, you should
set this option to none.
If you use the compressing storage wrapper `zc.zlibstorage
<https://pypi.python.org/pypi/zc.zlibstorage>`_, this option
automatically does nothing. With other compressing storage
wrappers this should be set to ``none``.

.. versionadded:: 1.6

Expand Down
53 changes: 43 additions & 10 deletions relstorage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
from ZODB.POSException import ReadConflictError
from persistent.timestamp import TimeStamp
from persistent.ring import Ring
if Ring.__name__ == '_DequeRing':
if Ring.__name__ == '_DequeRing': # pragma: no cover
import warnings
warnings.warn("Install CFFI for best cache performance")

import importlib
import logging
import threading
import time
import zlib
import bz2

from ._compat import string_types
from ._compat import iteritems
Expand Down Expand Up @@ -779,8 +781,17 @@ def write_to_file(self, cache_file):
class LocalClient(object):
"""A memcache-like object that stores in Python dictionaries."""

_compress = None
_decompress = None
# Use the same markers as zc.zlibstorage (well, one marker)
# to automatically avoid double-compression
_compression_markers = {
'zlib': (b'.z', zlib.compress),
'bz2': (b'.b', bz2.compress),
'none': (None, None)
}
_decompression_functions = {
b'.z': zlib.decompress,
b'.b': bz2.decompress
}

def __init__(self, options, prefix=None):
self._lock = threading.Lock()
Expand All @@ -792,10 +803,33 @@ def __init__(self, options, prefix=None):
self.flush_all()

compression_module = options.cache_local_compression
if compression_module and compression_module != 'none':
module = importlib.import_module(compression_module)
self._compress = module.compress
self._decompress = module.decompress
try:
compression_markers = self._compression_markers[compression_module]
except KeyError:
raise ValueError("Unknown compression module")
else:
self.__compression_marker = compression_markers[0]
self.__compress = compression_markers[1]
if self.__compress is None:
self._compress = None

def _decompress(self, data):
pfx = data[:2]
if pfx not in self._decompression_functions:
return data
return self._decompression_functions[pfx](data[2:])

def _compress(self, data): # pylint:disable=method-hidden
# We override this if we're disabling compression
# altogether.
# Use the same basic rule as zc.zlibstorage, but bump the object size up from 20;
# many smaller object (under 100 bytes) like you get with small btrees,
# tend not to compress well, so don't bother.
if data and (len(data) > 100) and data[:2] not in self._decompression_functions:
compressed = self.__compression_marker + self.__compress(data)
if len(compressed) < len(data):
return compressed
return data

def save(self):
options = self.options
Expand Down Expand Up @@ -832,9 +866,8 @@ def get_multi(self, keys):
res = get(keys)

# Finally, while not holding the lock, decompress if needed
if decompress:
res = {k: decompress(v)
for k, v in iteritems(res)}
res = {k: decompress(v)
for k, v in iteritems(res)}

return res

Expand Down
33 changes: 19 additions & 14 deletions relstorage/tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,10 @@ def test_ctor(self):
c.reset_stats()
c.disconnect_all()

self.assertRaises(ValueError,
self._makeOne,
cache_local_compression='unsup')

def test_set_and_get_string_compressed(self):
c = self._makeOne(cache_local_compression='zlib')
c.set('abc', b'def')
Expand Down Expand Up @@ -560,28 +564,29 @@ def test_bucket_sizes_without_compression(self):

def test_bucket_sizes_with_compression(self):
c = self._makeOne(cache_local_compression='zlib')
c._bucket_limit = 21 * 2 + 1
c._bucket_limit = 23 * 2 + 1
c.flush_all()

c.set('k0', b'01234567' * 10)
self.assertEqual(c._bucket0.size, 21)
c.set('k0', b'01234567' * 15)
self.assertEqual(c._bucket0.size, 23)

c.set('k1', b'76543210' * 10)
self.assertEqual(c._bucket0.size, 21 * 2)
c.set('k1', b'76543210' * 15)
self.assertEqual(len(c._bucket0), 2)
self.assertEqual(c._bucket0.size, 23 * 2)

c.set('k2', b'abcdefgh' * 10)
self.assertEqual(c._bucket0.size, 21 * 2)
c.set('k2', b'abcdefgh' * 15)
self.assertEqual(c._bucket0.size, 23 * 2)

v = c.get('k0')
self.assertEqual(v, None) # This one got evicted :(

v = c.get('k1')
self.assertEqual(v, b'76543210' * 10)
self.assertEqual(c._bucket0.size, 42)
self.assertEqual(v, b'76543210' * 15)
self.assertEqual(c._bucket0.size, 46)

v = c.get('k2')
self.assertEqual(v, b'abcdefgh' * 10)
self.assertEqual(c._bucket0.size, 42)
self.assertEqual(v, b'abcdefgh' * 15)
self.assertEqual(c._bucket0.size, 46)

def test_add(self):
c = self._makeOne()
Expand Down Expand Up @@ -697,13 +702,13 @@ def list_changes(self, cursor, after_tid, last_tid):
def local_benchmark():
options = MockOptions()
options.cache_local_mb = 100
options.cache_local_compression = 'none'
#options.cache_local_compression = 'none'

from relstorage.cache import LocalClient
import time
client = LocalClient(options)
with open('/dev/urandom', 'rb') as f:
random_data = f.read(1024*1024)
random_data = f.read(1024)

key_groups = []
key_groups.append([str(i) for i in range(10)])
Expand All @@ -723,7 +728,7 @@ def read():
import timeit
import statistics
#import cProfile, pstats
number = 10000
number = 100
pop_timer = timeit.Timer(populate)
#pr = cProfile.Profile()
#pr.enable()
Expand Down

0 comments on commit b25bff0

Please sign in to comment.