Skip to content

Commit

Permalink
Optimize the cache reading and writing format, backed up by substanti…
Browse files Browse the repository at this point in the history
…al benchmarks and experiments (comments in source.) Things are fast enough now that even though it's possible to provide a time limit, I'm going to skip that for now. Fixes #116.
  • Loading branch information
jamadden committed Aug 24, 2016
1 parent e58126a commit 2b3cbe0
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 82 deletions.
7 changes: 4 additions & 3 deletions CHANGES.rst
Expand Up @@ -18,14 +18,15 @@
will need a patched version, such as the one provided in
`this pull request
<https://github.com/esnme/ultramysql/pull/61>`_.
- The local persistent cache file format has been changed to improve
reading and writing speed. Old files will be cleaned up
automatically. Users of the default settings could see improvements
of up to 3x or more on reading and writing.
- Compression of local persistent cache files has been disabled by
default (but there is still an option to turn it back on).
Operational experience showed that it didn't actually save that much
disk space, while substantially slowing down the reading and writing
process (2-4x).
- Writing persistent cache files to disk is substantially (3x) faster for
any Python version less than 3.4 (including 2.7). However, it now
requires an amount of memory equal to the local cache size.
- Add an option, ``cache-local-dir-read-count`` to limit the maximum
number of persistent local cache files will be used to populate a
storages's cache. This can be useful to reduce startup time if cache
Expand Down
142 changes: 100 additions & 42 deletions relstorage/cache.py
Expand Up @@ -738,7 +738,6 @@ def key(self):
def __reduce__(self):
return _RingEntry, (self._p_oid, self.value)


class LocalClientBucket(object):
"""
A map that keeps a record of its approx. size.
Expand All @@ -750,6 +749,15 @@ class LocalClientBucket(object):
"""

def __init__(self, limit):
# We experimented with using OOBTree and LOBTree
# for the type of self._dict. The OOBTree has a similar
# but slightly slower performance profile (as would be expected
# given the big-O complexity) as a dict, but very large ones can't
# be pickled in a single shot! The LOBTree works faster and uses less
# memory than the OOBTree or the dict *if* all the keys are integers;
# which they currently are not. Plus the LOBTrees are slower on PyPy than its
# own dict specializations. We were hoping to be able to write faster pickles with
# large BTrees, but since that's not the case, we abandoned the idea.
self._dict = {}
self._ring = Ring()
self._hits = 0
Expand Down Expand Up @@ -843,44 +851,94 @@ def __getitem__(self, key):
# Testing only
return self._dict[key].value

# Benchmark for the general approach:

# Pickle is about 3x faster than marshal if we write single large
# objects, surprisingly. If we stick to writing smaller objects, the
# difference narrows to almost negligible.

# Writing 525MB of data, 655K keys (no compression):
# - code as-of commit e58126a (the previous major optimizations for version 1 format)
# version 1 format, solid dict under 3.4: write: 3.8s/read 7.09s
# 2.68s to update ring, 2.6s to read pickle
#
# -in a btree under 3.4: write: 4.8s/read 8.2s
# written as single list of the items
# 3.1s to load the pickle, 2.6s to update the ring
#
# -in a dict under 3.4: write: 3.7s/read 7.6s
# written as the dict and updated into the dict
# 2.7s loading the pickle, 2.9s to update the dict
# - in a dict under 3.4: write: 3.0s/read 12.8s
# written by iterating the ring and writing one key/value pair
# at a time, so this is the only solution that
# automatically preserves the LRU property (and would be amenable to
# capping read based on time, and written file size); this format also lets us avoid the
# full write buffer for HIGHEST_PROTOCOL < 4
# 2.5s spent in pickle.load, 8.9s spent in __setitem__,5.7s in ring.add
# - in a dict: write 3.2/read 9.1s
# same as above, but custom code to set the items
# 1.9s in pickle.load, 4.3s in ring.add
# - same as above, but in a btree: write 2.76s/read 10.6
# 1.8s in pickle.load, 3.8s in ring.add,
#
# For the final version with optimizations, the write time is 2.3s/read is 6.4s

_FILE_VERSION = 2

def load_from_file(self, cache_file):
now = time.time()
# Unlike write_to_file, using the raw stream
# is fine for both Py 2 and 3.
unpick = Unpickler(cache_file)
version = unpick.load()
if version != 1: # pragma: no cover

# Local optimizations
load = unpick.load

version = load()
if version != self._FILE_VERSION: # pragma: no cover
raise ValueError("Incorrect version of cache_file")
count = unpick.load()
count = load()
entries_oldest_first = [load() for _ in range(count)]
assert len(entries_oldest_first) == count

def _insert_entries(entries):
stored = 0

# local optimizations
RE = _RingEntry
ring_add = self._ring.add
data = self._dict
limit = self.limit
size = self.size # update locally, copy back at end

for k, v in entries:
if k in data:
continue

if size >= limit:
break

ring_entry = data[k] = RE(k, v)
size += len(k) + len(v)
ring_add(ring_entry)
stored += 1

self.size = size # copy back
return stored

stored = 0
loaded_dict = unpick.load()
if not self._dict:
# bulk-update in C for speed
stored = len(loaded_dict)
self._dict.update(loaded_dict)
for ring_entry in itervalues(loaded_dict):
if self.size < self.limit:
self._ring.add(ring_entry)
self.size += len(ring_entry.key) + len(ring_entry.value)
else:
# We're too big! ignore these things from now on.
# This is unlikely.
del self._dict[ring_entry.key]
# Empty, so quickly take everything they give us,
# oldest first so that the result is actually LRU
stored = _insert_entries(entries_oldest_first)
else:
new_keys = set(loaded_dict.keys()) - set(self._dict.keys())
stored += len(new_keys)
# Loading more data into an existing bucket.
# Load only the *new* keys, but don't care about LRU,
# it's all screwed up anyway at this point
for new_key in new_keys:
new_ring_entry = loaded_dict[new_key]
self._dict[new_key] = new_ring_entry
self._ring.add(new_ring_entry)

self.size += len(new_key) + len(new_ring_entry.value)
if self.size >= self.limit: # pragma: no cover
break
# Load only the *new* keys, trying to get the newest ones
# because LRU is going to get messed up anyway.

entries_newest_first = reversed(entries_oldest_first)
stored = _insert_entries(entries_newest_first)

then = time.time()
log.info("Examined %d and stored %d items from %s in %s",
Expand All @@ -891,28 +949,28 @@ def write_to_file(self, cache_file):
now = time.time()
# pickling the items is about 3x faster than marshal


# Under Python 2, (or generally, under any pickle protocol
# less than 4, when framing was introduced) whether we are
# writing to an io.BufferedWriter, a <file> opened by name or
# fd, with default buffer or a large (16K) buffer, putting the
# Pickler directly on top of that stream is SLOW. Writing a
# 512MB dict takes ~40-50seconds. If instead we use a BytesIO
# to buffer in memory, that time goes down to about 7s.
if HIGHEST_PROTOCOL >= 4:
bio = cache_file
else:
bio = BytesIO()
# Pickler directly on top of that stream is SLOW for large
# singe objects. Writing a 512MB dict takes ~40-50seconds. If
# instead we use a BytesIO to buffer in memory, that time goes
# down to about 7s. However, since we switched to writing many
# smaller objects, that need goes away.

pickler = Pickler(bio, -1) # Highest protocol
pickler = Pickler(cache_file, -1) # Highest protocol
dump = pickler.dump

pickler.dump(1) # Version marker
dump(self._FILE_VERSION) # Version marker
assert len(self._dict) == len(self._ring)
pickler.dump(len(self._dict)) # How many pairs we write
# We lose the order. We'll have to build it up again as we go.
pickler.dump(self._dict)
dump(len(self._dict)) # How many pairs we write

if bio is not cache_file:
cache_file.write(bio.getvalue())
# Dump from oldest to newest, so when we read, we
# wind up with the correct order
for entry in self._ring:
dump((entry._p_oid, entry.value))

then = time.time()
stats = self.stats()
Expand Down
145 changes: 108 additions & 37 deletions relstorage/tests/test_cache.py
Expand Up @@ -443,16 +443,17 @@ def test_load_and_store(self, options=None):

client1.reset_stats()
client1['def'] = b'123'
self.assertEqual(2, len(client1))
client1_max_size = client1.size
self._save(bio, client1, options)

# This time there's too much data, so an arbitrary
# entry gets dropped
client2 = self.getClass()(3)
count, stored = self._load(bio, client2, options)
self.assertEqual(count, stored)
self.assertEqual(count, 2)
self.assertEqual(1, len(client2))
self.assertEqual(count, 2)
self.assertEqual(stored, 1)


# Duplicate keys ignored.
Expand Down Expand Up @@ -743,62 +744,132 @@ def list_changes(self, cursor, after_tid, last_tid):
if tid > after_tid and tid <= last_tid)

def local_benchmark():
from relstorage.cache import LocalClient, LocalClientBucket
options = MockOptions()
options.cache_local_mb = 100
options.cache_local_mb = 500
#options.cache_local_compression = 'none'

from relstorage.cache import LocalClient
import time
client = LocalClient(options)
REPEAT_COUNT = 4

KEY_GROUP_SIZE = 400
DATA_SIZE = 1024

# With 1000 in a key group, and 1024 bytes of data, we produce
# 909100 keys, and 930918400 = 887MB of data, which will overflow
# a cache of 500 MB.

# A group size of 100 produces 9100 keys with 9318400 = 8.8MB of data.
# Likewise, group of 200 produces 36380 keys with 35.5MB of data.

# Most of our time is spent in compression, it seems.
# In the 8.8mb case, populating all the data with default compression
# takes about 2.5-2.8s. Using no compression, it takes 0.38 to 0.42s.
# Reading is the same at about 0.2s.

with open('/dev/urandom', 'rb') as f:
random_data = f.read(1024)
random_data = f.read(DATA_SIZE)

key_groups = []
key_groups.append([str(i) for i in range(10)])
for i in range(1, 10): # 1 - 9
keys = [str(i) + str(j) for j in range(10)]
key_groups.append([int(str(i)) for i in range(KEY_GROUP_SIZE)])
for i in range(1, KEY_GROUP_SIZE):
keys = [int(str(i) + str(j)) for j in range(KEY_GROUP_SIZE)]
key_groups.append(keys)

def populate():
data = {str(k): random_data for k in range(120)}
for k, v in data.items():
client.set(k, v)
ALL_DATA = {}
for group in key_groups:
for key in group:
ALL_DATA[key] = random_data
print(len(ALL_DATA), sum((len(v) for v in ALL_DATA.values()))/1024/1024)

def read():
for keys in key_groups:
client.get_multi(keys)
class DLocalBucket(LocalClientBucket):
CACHE_TYPE = dict

import timeit
import statistics
#import cProfile, pstats
number = 100
pop_timer = timeit.Timer(populate)
#pr = cProfile.Profile()
#pr.enable()
pop_times = pop_timer.repeat(number=number)
#pr.disable()
#ps = pstats.Stats(pr).sort_stats('cumulative')
#ps.print_stats()
class DLocalClient(LocalClient):
bucket_type = DLocalBucket

read_timer = timeit.Timer(read)
#pr = cProfile.Profile()
#pr.enable()
read_times = read_timer.repeat(number=number)
#pr.disable()
#ps = pstats.Stats(pr).sort_stats('cumulative')
#ps.print_stats()
from BTrees.OOBTree import OOBTree
from BTrees.LOBTree import LOBTree
class BLocalBucket(LocalClientBucket):
CACHE_TYPE = OOBTree

class BLocalClient(LocalClient):
bucket_type = BLocalBucket

def do_times(client_type):
client = client_type(options)
print("Testing", type(client._bucket0._dict))

print("pop average", statistics.mean(pop_times), "stddev", statistics.stdev(pop_times))
print("read average", statistics.mean(read_times), "stddev", statistics.stdev(read_times))
def populate():
for k, v in ALL_DATA.items():
client.set(k, v)

def populate_empty():
c = LocalClient(options)
for k, v in ALL_DATA.items():
c.set(k, v)

def read():
for keys in key_groups:
res = client.get_multi(keys)
assert len(res) == len(keys)
assert res.popitem()[1] == random_data



import timeit
import statistics
try:
import cProfile, pstats
raise ImportError
except ImportError:
class cProfile(object):
class Profile(object):
def enable(self): pass
def disable(self): pass
class pstats(object):
class Stats(object):
def __init__(self, *args): pass
def sort_stats(self, *args): return self
def print_stats(self, *args): pass


number = REPEAT_COUNT
pop_timer = timeit.Timer(populate)
pr = cProfile.Profile()
pr.enable()
pop_times = pop_timer.repeat(number=number)
pr.disable()
ps = pstats.Stats(pr).sort_stats('cumulative')
ps.print_stats(.4)

read_timer = timeit.Timer(read)
pr = cProfile.Profile()
pr.enable()
read_times = read_timer.repeat(number=number)
pr.disable()
ps = pstats.Stats(pr).sort_stats('cumulative')
ps.print_stats(.4)

empty_pop = timeit.Timer(populate_empty)
epop_times = empty_pop.repeat(number=number)

print("pop average", statistics.mean(pop_times), "stddev", statistics.stdev(pop_times))
print("epop average", statistics.mean(epop_times), "stddev", statistics.stdev(epop_times))
print("read average", statistics.mean(read_times), "stddev", statistics.stdev(read_times))

do_times(DLocalClient)
do_times(BLocalClient)

def save_load_benchmark():
from relstorage.cache import LocalClientBucket, _Loader
from io import BytesIO
import os
import itertools

import sys
sys.setrecursionlimit(500000)
bucket = LocalClientBucket(500*1024*1024)
print("Testing", type(bucket._dict))


size_dists = [100] * 800 + [300] * 500 + [1024] * 300 + [2048] * 200 + [4096] * 150
Expand Down

0 comments on commit 2b3cbe0

Please sign in to comment.