Skip to content

Commit

Permalink
Merge 8c09b4e into 50ab16f
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Jul 14, 2016
2 parents 50ab16f + 8c09b4e commit 1eff6d7
Show file tree
Hide file tree
Showing 7 changed files with 479 additions and 12 deletions.
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ recursive-include .travis *

recursive-include notes *.ods *.py *.sql *.txt

recursive-include relstorage *.py *.xml *.txt *.test *.conf
recursive-include relstorage *.py *.xml *.txt *.test *.conf *.rst

recursive-include doc *.py *.rst Makefile
recursive-exclude doc changelog.rst relstorage.*.rst
Expand Down
155 changes: 151 additions & 4 deletions relstorage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from relstorage.autotemp import AutoTemporaryFile
from ZODB.utils import p64
from ZODB.utils import u64
from ZODB.utils import z64
from ZODB.POSException import ReadConflictError
from persistent.timestamp import TimeStamp
from persistent.ring import Ring
Expand All @@ -29,6 +30,7 @@
import time
import zlib
import bz2
import struct

from ._compat import string_types
from ._compat import iteritems
Expand All @@ -51,6 +53,65 @@ class _UsedAfterRelease(object):
pass
_UsedAfterRelease = _UsedAfterRelease()

class _ZEOTracer(object):
# Knows how to write ZEO trace files.

def __init__(self, trace_file):
self._trace_file = trace_file
self._lock = threading.Lock()

# closure variables are faster than self/global dict lookups
# (going off example in ZEO code; in one test locally this gets us a
# ~15% improvement)
_now = time.time
_pack = struct.Struct(">iiH8s8s").pack
_trace_file_write = trace_file.write
_p64 = p64
_z64 = z64
_int = int
_len = len

def trace(code, oid_int=0, tid_int=0, end_tid_int=0, dlen=0, now=None):
# This method was originally part of ZEO.cache.ClientCache. The below
# comment is verbatim:
# The code argument is two hex digits; bits 0 and 7 must be zero.
# The first hex digit shows the operation, the second the outcome.
# ...
# Note: when tracing is disabled, this method is hidden by a dummy.
encoded = (dlen << 8) + code
tid = _p64(tid_int) if tid_int else _z64
end_tid = _p64(end_tid_int) if end_tid_int else _z64
oid = b'' if not oid_int else _p64(oid_int)

now = now or _now()
try:
_trace_file_write(
_pack(
_int(now), encoded, _len(oid), tid, end_tid) + oid,
)
except: # pragma: no cover
log.exception("Problem writing trace info for %r at tid %r and end tid %r",
oid, tid, end_tid)
raise

self._trace = trace

def trace(self, code, oid_int=0, tid_int=0, end_tid_int=0, dlen=0):
with self._lock:
self._trace(code, oid_int, tid_int, end_tid_int, dlen)

def trace_store_current(self, tid_int, items):
# As a locking optimization, we accept this in bulk
with self._lock:
now = time.time()
for startpos, endpos, oid_int in items:
self._trace(0x52, oid_int, tid_int, dlen=endpos-startpos, now=now)

def close(self):
self._trace_file.close()
del self._trace


class StorageCache(object):
"""RelStorage integration with memcached or similar.
Expand Down Expand Up @@ -81,7 +142,10 @@ class StorageCache(object):
# self.checkpoints[0] < tid <= self.current_tid
current_tid = 0

def __init__(self, adapter, options, prefix, local_client=None):
_tracer = None

def __init__(self, adapter, options, prefix, local_client=None,
_tracer=None):
self.adapter = adapter
self.options = options
self.prefix = prefix or ''
Expand Down Expand Up @@ -118,14 +182,28 @@ def __init__(self, adapter, options, prefix, local_client=None):
# entries in the delta_after maps.
self.delta_size_limit = options.cache_delta_size_limit

if _tracer is None:
tracefile = _Loader.trace_file(options, self.prefix)
if tracefile:
_tracer = _ZEOTracer(tracefile)
_tracer.trace(0x00)

self._tracer = _tracer
if hasattr(self._tracer, 'trace_store_current'):
self._trace = self._tracer.trace
self._trace_store_current = self._tracer.trace_store_current


def new_instance(self):
"""Return a copy of this instance sharing the same local client"""
local_client = None
if self.options.share_local_cache:
local_client = self.clients_local_first[0]

return type(self)(self.adapter, self.options, self.prefix,
local_client)
cache = type(self)(self.adapter, self.options, self.prefix,
local_client,
_tracer=self._tracer or False)
return cache

def release(self):
"""
Expand Down Expand Up @@ -159,6 +237,12 @@ def close(self):
save()
self.release()

if self._tracer:
self._tracer.close()
del self._trace
del self._trace_store_current
del self._tracer

def clear(self):
"""Remove all data from the cache. Called by speed tests."""
for client in self.clients_local_first:
Expand All @@ -168,6 +252,16 @@ def clear(self):
self.delta_after1 = {}
self.current_tid = 0

@staticmethod
def _trace(*_args, **_kwargs): # pylint:disable=method-hidden
# Dummy method for when we don't do tracing
return

@staticmethod
def _trace_store_current(_tid_int, _items): # pylint:disable=method-hidden
# Dummy method for when we don't do tracing
return

def _check_tid_after_load(self, oid_int, actual_tid_int,
expect_tid_int=None):
"""Verify the tid of an object loaded from the database is sane."""
Expand Down Expand Up @@ -243,6 +337,7 @@ def load(self, cursor, oid_int):
"""
if not self.checkpoints:
# No poll has occurred yet. For safety, don't use the cache.
self._trace(0x20, oid_int)
return self.adapter.mover.load_current(cursor, oid_int)

prefix = self.prefix
Expand Down Expand Up @@ -275,9 +370,14 @@ def load(self, cursor, oid_int):
cache_data = client.get(cachekey)
if cache_data and len(cache_data) >= 8:
# Cache hit.
# Note that we trace all cache hits, not just the local cache hit.
# This makes the simulation less useful, but the stats might still have
# value to people trying different tuning options manually.
self._trace(0x22, oid_int, tid_int, dlen=len(cache_data) - 8)
assert cache_data[:8] == p64(tid_int)
return cache_data[8:], tid_int
# Cache miss.
self._trace(0x20, oid_int)
state, actual_tid_int = self.adapter.mover.load_current(
cursor, oid_int)
self._check_tid_after_load(oid_int, actual_tid_int, tid_int)
Expand Down Expand Up @@ -315,6 +415,7 @@ def load(self, cursor, oid_int):
if client is not local_client:
# Copy to the local client.
local_client.set(cp0_key, cache_data)
self._trace(0x22, oid_int, u64(cache_data[:8]), dlen=len(cache_data) - 8)
return cache_data[8:], u64(cache_data[:8])

if da1_key:
Expand All @@ -324,15 +425,19 @@ def load(self, cursor, oid_int):
if cache_data and len(cache_data) >= 8:
# Cache hit, but copy the state to
# the currently preferred key.
self._trace(0x22, oid_int, u64(cache_data[:8]), dlen=len(cache_data) - 8)
for client_to_set in self.clients_local_first:
client_to_set.set(cp0_key, cache_data)
return cache_data[8:], u64(cache_data[:8])

# Cache miss.
self._trace(0x20, oid_int)
state, tid_int = self.adapter.mover.load_current(cursor, oid_int)
if tid_int:
self._check_tid_after_load(oid_int, tid_int)
cache_data = p64(tid_int) + (state or b'')
# Record this as a store into the cache, ZEO does.
self._trace(0x52, oid_int, tid_int, dlen=len(state) if state else 0)
for client in self.clients_local_first:
client.set(cp0_key, cache_data)
return state, tid_int
Expand All @@ -359,6 +464,7 @@ def store_temp(self, oid_int, state):
endpos = queue.tell()
self.queue_contents[oid_int] = (startpos, endpos)


def _read_temp_state(self, startpos, endpos):
self.queue.seek(startpos)
length = endpos - startpos
Expand Down Expand Up @@ -388,7 +494,9 @@ def send_queue(self, tid):
for (oid_int, (startpos, endpos)) in iteritems(self.queue_contents)
]
items.sort()

# Trace these. This is the equivalent of ZEOs
# ClientStorage._update_cache.
self._trace_store_current(tid_int, items)
for startpos, endpos, oid_int in items:
state, length = self._read_temp_state(startpos, endpos)
cachekey = '%s:state:%d:%d' % (prefix, tid_int, oid_int)
Expand Down Expand Up @@ -518,6 +626,8 @@ def after_poll(self, cursor, prev_tid_int, new_tid_int, changes):
my_tid_int = m_get(oid_int)
if my_tid_int is None or tid_int > my_tid_int:
m[oid_int] = tid_int
# 0x1E = invalidate (hit, saving non-current)
self._trace(0x1C, oid_int, tid_int)
self.current_tid = new_tid_int
else:
# We have to replace the checkpoints.
Expand All @@ -544,6 +654,8 @@ def after_poll(self, cursor, prev_tid_int, new_tid_int, changes):

# Put the changes in new_delta_after*.
for oid_int, tid_int in iteritems(change_dict):
# 0x1E = invalidate (hit, saving non-current)
self._trace(0x1C, oid_int, tid_int)
if tid_int > cp0:
new_delta_after0[oid_int] = tid_int
elif tid_int > cp1:
Expand Down Expand Up @@ -963,6 +1075,41 @@ def _list_cache_files(cls, options, prefix):
possible_caches = glob.glob(os.path.join(path, 'relstorage-cache-' + prefix + '.*' + cls._gz_ext))
return possible_caches

@classmethod
def trace_file(cls, options, prefix):
# Return an open file for tracing to, if that is set up.
# Otherwise, return nothing.

# We choose a trace file based on ZEO_CACHE_TRACE. If it is
# set to 'single', then we use a single file (not suitable for multiple
# process, but records client opens/closes). If it is set to any other value,
# we include a pid. If it is not set, we do nothing.
trace = os.environ.get("ZEO_CACHE_TRACE")
if not trace:
return None

if trace == 'single':
pid = 0
else: # pragma: no cover
pid = os.getpid()

name = 'relstorage-trace-' + prefix + '.' + str(pid) + '.trace'

parent_dir = cls._normalize_path(options)
try:
os.makedirs(parent_dir)
except os.error:
pass
fname = os.path.join(parent_dir, name)
try:
tf = open(fname, 'ab')
except IOError as e: # pragma: no cover
log.warning("Cannot write tracefile %r (%s)", fname, e)
tf = None
else:
log.info("opened tracefile %r", fname)
return tf

@classmethod
def _stat_cache_files(cls, options, prefix):
fds = []
Expand Down
1 change: 1 addition & 0 deletions relstorage/tests/alltests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def make_suite():
'relstorage.tests.test_autotemp',
'relstorage.tests.test_blobhelper',
'relstorage.tests.test_cache',
'relstorage.tests.test_cache_stats',
'relstorage.tests.test_treemark',
'relstorage.tests.test_zodbconvert',
'relstorage.tests.test_zodbpack',
Expand Down

0 comments on commit 1eff6d7

Please sign in to comment.