Skip to content

Commit

Permalink
Merge 3800e3c into 50ab16f
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Jul 13, 2016
2 parents 50ab16f + 3800e3c commit 3736683
Show file tree
Hide file tree
Showing 7 changed files with 458 additions and 11 deletions.
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ recursive-include .travis *

recursive-include notes *.ods *.py *.sql *.txt

recursive-include relstorage *.py *.xml *.txt *.test *.conf
recursive-include relstorage *.py *.xml *.txt *.test *.conf *.rst

recursive-include doc *.py *.rst Makefile
recursive-exclude doc changelog.rst relstorage.*.rst
Expand Down
133 changes: 130 additions & 3 deletions relstorage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from relstorage.autotemp import AutoTemporaryFile
from ZODB.utils import p64
from ZODB.utils import u64
from ZODB.utils import z64
from ZODB.POSException import ReadConflictError
from persistent.timestamp import TimeStamp
from persistent.ring import Ring
Expand All @@ -29,6 +30,7 @@
import time
import zlib
import bz2
import struct

from ._compat import string_types
from ._compat import iteritems
Expand All @@ -51,6 +53,52 @@ class _UsedAfterRelease(object):
pass
_UsedAfterRelease = _UsedAfterRelease()

class _ZEOTracer(object):
# Knows how to write ZEO trace files.

def __init__(self, trace_file):
self._trace_file = trace_file
self._pack = struct.Struct(">iiH8s8s").pack
self._lock = threading.Lock()

def _trace(self, code, oid_int=0, tid_int=0, end_tid_int=0, dlen=0, now=None):
# This method was originally part of ZEO.cache.ClientCache. The below
# comment is verbatim:
# The code argument is two hex digits; bits 0 and 7 must be zero.
# The first hex digit shows the operation, the second the outcome.
# ...
# Note: when tracing is disabled, this method is hidden by a dummy.
encoded = (dlen << 8) + code
tid = p64(tid_int) if tid_int else z64
end_tid = p64(end_tid_int) if end_tid_int else z64
oid = b'' if not oid_int else p64(oid_int)

now = now or time.time()
try:
self._trace_file.write(
self._pack(
int(now), encoded, len(oid), tid, end_tid) + oid,
)
except: # pragma: no cover
log.exception("Problem writing trace info for %r at tid %r and end tid %r",
oid, tid, end_tid)
raise

def __call__(self, *args, **kwargs):
with self._lock:
self._trace(*args, **kwargs)

def trace_store_current(self, tid_int, items):
# As a locking optimization, we accept this in bulk
with self._lock:
now = time.time()
for startpos, endpos, oid_int in items:
self._trace(0x52, oid_int, tid_int, dlen=endpos-startpos, now=now)

def close(self):
self._trace_file.close()


class StorageCache(object):
"""RelStorage integration with memcached or similar.
Expand Down Expand Up @@ -81,7 +129,8 @@ class StorageCache(object):
# self.checkpoints[0] < tid <= self.current_tid
current_tid = 0

def __init__(self, adapter, options, prefix, local_client=None):
def __init__(self, adapter, options, prefix, local_client=None,
_create_trace=True):
self.adapter = adapter
self.options = options
self.prefix = prefix or ''
Expand Down Expand Up @@ -118,14 +167,26 @@ def __init__(self, adapter, options, prefix, local_client=None):
# entries in the delta_after maps.
self.delta_size_limit = options.cache_delta_size_limit

if _create_trace:
tracefile = _Loader.trace_file(options, self.prefix)
if tracefile:
self._trace = _ZEOTracer(tracefile)
self._trace_store_current = self._trace.trace_store_current
self._trace(0x00)

def new_instance(self):
"""Return a copy of this instance sharing the same local client"""
local_client = None
if self.options.share_local_cache:
local_client = self.clients_local_first[0]

return type(self)(self.adapter, self.options, self.prefix,
local_client)
cache = type(self)(self.adapter, self.options, self.prefix,
local_client,
_create_trace=False)
if isinstance(self._trace, _ZEOTracer):
cache._trace = self._trace
cache._trace_store_current = self._trace.trace_store_current
return cache

def release(self):
"""
Expand Down Expand Up @@ -158,6 +219,12 @@ def close(self):
else:
save()
self.release()
try:
self._trace.close()
del self._trace
del self._trace_store_current
except AttributeError:
pass

def clear(self):
"""Remove all data from the cache. Called by speed tests."""
Expand All @@ -168,6 +235,16 @@ def clear(self):
self.delta_after1 = {}
self.current_tid = 0

@staticmethod
def _trace(*_args, **_kwargs): # pylint:disable=method-hidden
# Dummy method for when we don't do tracing
return

@staticmethod
def _trace_store_current(_tid_int, _items): # pylint:disable=method-hidden
# Dummy method for when we don't do tracing
return

def _check_tid_after_load(self, oid_int, actual_tid_int,
expect_tid_int=None):
"""Verify the tid of an object loaded from the database is sane."""
Expand Down Expand Up @@ -243,6 +320,7 @@ def load(self, cursor, oid_int):
"""
if not self.checkpoints:
# No poll has occurred yet. For safety, don't use the cache.
self._trace(0x20, oid_int)
return self.adapter.mover.load_current(cursor, oid_int)

prefix = self.prefix
Expand Down Expand Up @@ -275,9 +353,14 @@ def load(self, cursor, oid_int):
cache_data = client.get(cachekey)
if cache_data and len(cache_data) >= 8:
# Cache hit.
# Note that we trace all cache hits, not just the local cache hit.
# This makes the simulation less useful, but the stats might still have
# value to people trying different tuning options manually.
self._trace(0x22, oid_int, tid_int, dlen=len(cache_data) - 8)
assert cache_data[:8] == p64(tid_int)
return cache_data[8:], tid_int
# Cache miss.
self._trace(0x20, oid_int)
state, actual_tid_int = self.adapter.mover.load_current(
cursor, oid_int)
self._check_tid_after_load(oid_int, actual_tid_int, tid_int)
Expand Down Expand Up @@ -315,6 +398,7 @@ def load(self, cursor, oid_int):
if client is not local_client:
# Copy to the local client.
local_client.set(cp0_key, cache_data)
self._trace(0x22, oid_int, u64(cache_data[:8]), dlen=len(cache_data) - 8)
return cache_data[8:], u64(cache_data[:8])

if da1_key:
Expand All @@ -324,11 +408,13 @@ def load(self, cursor, oid_int):
if cache_data and len(cache_data) >= 8:
# Cache hit, but copy the state to
# the currently preferred key.
self._trace(0x22, oid_int, u64(cache_data[:8]), dlen=len(cache_data) - 8)
for client_to_set in self.clients_local_first:
client_to_set.set(cp0_key, cache_data)
return cache_data[8:], u64(cache_data[:8])

# Cache miss.
self._trace(0x20, oid_int)
state, tid_int = self.adapter.mover.load_current(cursor, oid_int)
if tid_int:
self._check_tid_after_load(oid_int, tid_int)
Expand Down Expand Up @@ -359,6 +445,7 @@ def store_temp(self, oid_int, state):
endpos = queue.tell()
self.queue_contents[oid_int] = (startpos, endpos)


def _read_temp_state(self, startpos, endpos):
self.queue.seek(startpos)
length = endpos - startpos
Expand Down Expand Up @@ -389,6 +476,7 @@ def send_queue(self, tid):
]
items.sort()

self._trace_store_current(tid_int, items)
for startpos, endpos, oid_int in items:
state, length = self._read_temp_state(startpos, endpos)
cachekey = '%s:state:%d:%d' % (prefix, tid_int, oid_int)
Expand Down Expand Up @@ -518,6 +606,8 @@ def after_poll(self, cursor, prev_tid_int, new_tid_int, changes):
my_tid_int = m_get(oid_int)
if my_tid_int is None or tid_int > my_tid_int:
m[oid_int] = tid_int
# 0x1E = invalidate (hit, saving non-current)
self._trace(0x1C, oid_int, tid_int)
self.current_tid = new_tid_int
else:
# We have to replace the checkpoints.
Expand All @@ -544,6 +634,8 @@ def after_poll(self, cursor, prev_tid_int, new_tid_int, changes):

# Put the changes in new_delta_after*.
for oid_int, tid_int in iteritems(change_dict):
# 0x1E = invalidate (hit, saving non-current)
self._trace(0x1C, oid_int, tid_int)
if tid_int > cp0:
new_delta_after0[oid_int] = tid_int
elif tid_int > cp1:
Expand Down Expand Up @@ -963,6 +1055,41 @@ def _list_cache_files(cls, options, prefix):
possible_caches = glob.glob(os.path.join(path, 'relstorage-cache-' + prefix + '.*' + cls._gz_ext))
return possible_caches

@classmethod
def trace_file(cls, options, prefix):
# Return an open file for tracing to, if that is set up.
# Otherwise, return nothing.

# We choose a trace file based on ZEO_CACHE_TRACE. If it is
# set to 'single', then we use a single file (not suitable for multiple
# process, but records client opens/closes). If it is set to any other value,
# we include a pid. If it is not set, we do nothing.
trace = os.environ.get("ZEO_CACHE_TRACE")
if not trace:
return None

if trace == 'single':
pid = 0
else: # pragma: no cover
pid = os.getpid()

name = 'relstorage-trace-' + prefix + '.' + str(pid) + '.trace'

parent_dir = cls._normalize_path(options)
try:
os.makedirs(parent_dir)
except os.error:
pass
fname = os.path.join(parent_dir, name)
try:
tf = open(fname, 'ab')
except IOError as e: # pragma: no cover
log.warning("Cannot write tracefile %r (%s)", fname, e)
tf = None
else:
log.info("opened tracefile %r", fname)
return tf

@classmethod
def _stat_cache_files(cls, options, prefix):
fds = []
Expand Down
1 change: 1 addition & 0 deletions relstorage/tests/alltests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def make_suite():
'relstorage.tests.test_autotemp',
'relstorage.tests.test_blobhelper',
'relstorage.tests.test_cache',
'relstorage.tests.test_cache_stats',
'relstorage.tests.test_treemark',
'relstorage.tests.test_zodbconvert',
'relstorage.tests.test_zodbpack',
Expand Down

0 comments on commit 3736683

Please sign in to comment.