Skip to content

Commit

Permalink
Merge 456fee0 into 3de9606
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Sep 25, 2019
2 parents 3de9606 + 456fee0 commit 828b948
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 101 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@

- Avoid attempting to lock objects being created. See :issue:`329`.

- Make cache vacuuming faster.

3.0a10 (2019-09-04)
===================

Expand Down
17 changes: 5 additions & 12 deletions src/relstorage/cache/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,20 +298,13 @@ def __delitem__(key):
# Cache-specific operations.
###

def get_from_key_or_backup_key(pref_key, backup_key):
def replace_or_remove_smaller_value(key, value):
"""
Get a value stored at either *pref_key* or, failing that,
*backup_key*.
Given a key that's already present, either removes it
if *value* is none, or changes its stored value to be *value*.
*backup_key* may be None if there is no second key.
If a value is found at *backup_key*, then it is
moved to be stored at *pref_key*, while retaining its
frequency information.
Counts as a hit on whichever key matches.
This is used to implement ``IStateCache.__call__``.
Does this without counting as a hit or otherwise changing the MRU
status of key.
"""

def peek(key):
Expand Down
115 changes: 71 additions & 44 deletions src/relstorage/cache/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,84 +572,111 @@ def __setitem__(self, oid_tid, state_bytes_tid):
# A state of 'None' happens for undone transactions.
oid, key_tid = oid_tid
state_bytes, actual_tid = state_bytes_tid

assert isinstance(state_bytes, bytes) or state_bytes is None, type(state_bytes)
compress = self._compress
cvalue = compress(state_bytes) if compress else state_bytes # pylint:disable=not-callable
del state_bytes
# Really key_tid should be > 0; we allow >= for tests.
assert key_tid == actual_tid and key_tid >= 0
self.__set_many([
(oid, key_tid, state_bytes)
])

if cvalue and len(cvalue) >= self._value_limit:
# This value is too big, so don't cache it.
def __set_many(self, oid_tid_state_iter):
if not self.limit:
# don't bother
return

# Really key_tid should be > 0; we allow >= for tests.
assert key_tid == actual_tid and key_tid >= 0
value = _SingleValue(cvalue, actual_tid)
compress = self._compress
peek = self._peek
value_limit = self._value_limit
min_allowed = self._min_allowed_writeback
lock = self._lock
store = self._cache.__setitem__
sets = 0

with self._lock:
existing = self._peek(oid)
if existing:
existing += value
value = existing
for oid, tid, state_bytes in oid_tid_state_iter:
# A state of 'None' happens for undone transactions.
state_bytes = compress(state_bytes) if compress else state_bytes # pylint:disable=not-callable

if state_bytes and len(state_bytes) >= value_limit:
# This value is too big, so don't cache it.
continue

self._cache[oid] = value # possibly evicts
if actual_tid > self._min_allowed_writeback.get(oid, MAX_TID):
self._min_allowed_writeback[oid] = actual_tid
value = _SingleValue(state_bytes, tid)

self._sets += 1
with lock:
existing = peek(oid)
if existing:
existing += value
value = existing

store(oid, value) # possibly evicts

if tid > min_allowed.get(oid, MAX_TID):
min_allowed[oid] = tid

sets += 1

with lock:
self._sets += sets
# Do we need to move this up above the eviction choices?
# Inline some of the logic about whether to age or not; avoiding the
# call helps speed
if self._hits + self._sets > self._next_age_at:
self._age()

def set_all_for_tid(self, tid_int, state_oid_iter):
for state, oid_int, _ in state_oid_iter:
self[(oid_int, tid_int)] = (state, tid_int)
self.__set_many((
(oid_int, tid_int, state)
for (state, oid_int, _)
in state_oid_iter
))

def __delitem__(self, oid_tid):
oid, expected_tid = oid_tid
self.delitems({oid_tid[0]: oid_tid[1]})

def delitems(self, oids_tids):
"""
For each OID/TID pair in the items, remove all cached values
for OID that are older than TID.
"""
peek = self._peek
replace = self._cache.replace_or_remove_smaller_value
min_allowed = self._min_allowed_writeback
with self._lock:
entry = self._peek(oid)
if entry is not None:
entry -= expected_tid
if not entry:
del self._cache[oid]
else:
# XXX: Messing with LRU. We just want to update the
# value and size calculation.
self._cache[oid] = entry
if expected_tid > self._min_allowed_writeback.get(oid, MAX_TID):
self._min_allowed_writeback[oid] = expected_tid
for oid, expected_tid in iteroiditems(oids_tids):
entry = peek(oid)
if entry is not None:
entry -= expected_tid
replace(oid, entry)
if expected_tid > min_allowed.get(oid, MAX_TID):
min_allowed[oid] = expected_tid

def invalidate_all(self, oids):
min_allowed = self._min_allowed_writeback
peek = self._peek
delitem = self._cache.__delitem__
with self._lock:
min_allowed = self._min_allowed_writeback
for oid in oids:
entry = self._peek(oid)
if entry:
del self._cache[oid]
entry = peek(oid)
if entry is not None:
delitem(oid)
tid = entry.max_tid
if tid > min_allowed.get(oid, MAX_TID):
min_allowed[oid] = tid

def freeze(self, oids_tids):
# The idea is to *move* the data, or make it available,
# *without* copying it.
replace = self._cache.replace_or_remove_smaller_value
peek = self._peek
with self._lock:
# This shuffles them around the LRU order. We probably don't actually
# want to do that.
store = self._cache.__setitem__
delitem = self._cache.__delitem__
peek = self._peek
for oid, tid in oids_tids.items():
orig = entry = peek(oid)
for oid, tid in iteroiditems(oids_tids):
entry = peek(oid)
if entry is not None:
entry <<= tid
if entry is None:
delitem(oid)
elif entry is not orig:
store(oid, entry)
replace(oid, entry)

def close(self):
pass
Expand Down
29 changes: 15 additions & 14 deletions src/relstorage/cache/lru_cffiring.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,15 @@ def __getitem__(self, key):

# Cache-specific operations.

def get_from_key_or_backup_key(self, pref_key, backup_key):
entry = self.get(pref_key)
if entry is None:
entry = self.get(backup_key)
if entry is not None:
# Swap the key (which we assume has the same weight).
entry.key = pref_key
del self.data[backup_key]
self.data[pref_key] = entry
if entry is not None:
self.on_hit(entry)
return entry.value
def replace_or_remove_smaller_value(self, key, value):
# We're shrinking a value, or at least not making it bigger.
assert key in self.data
if value is None:
del self[key]
return
# values can be mutable lists
entry = self.get(key)
self.generations[entry.cffi_entry.r_parent].change_value(entry, value)

def peek(self, key):
entry = self.get(key)
Expand Down Expand Up @@ -434,11 +431,15 @@ def make_MRU(self, entry):
# Only for testing
_ring_move_to_head(self.ring_home, entry.cffi_ring_node)

@_mutates_free_list
def update_MRU(self, entry, value):
def change_value(self, entry, value):
old_size = entry.weight
new_size = self.key_weight(entry.key) + self.value_weight(value)
entry.set_value(value, new_size)
return old_size, new_size

@_mutates_free_list
def update_MRU(self, entry, value):
old_size, new_size = self.change_value(entry, value)

if old_size == new_size:
# Treat it as a simple hit; nothing could get evicted.
Expand Down
15 changes: 8 additions & 7 deletions src/relstorage/cache/mvcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,9 +827,6 @@ def _vacuum(self, cache, object_index):
# This is called for every transaction. It needs to be fast, and mindful
# of what it logs.
#
# TODO: In production, we've seen this function sometimes take
# up to 1.9s. Profile and figure out where that is and fix it.

# MVCC can easily develop "gaps", where one lone reader is at
# the back and all the other readers are up front somewhere,
# with that chain of maps in between doing no one any good. We
Expand All @@ -852,7 +849,8 @@ def _vacuum(self, cache, object_index):
object_index.minimum_highest_visible_tid,
required_tid,
)
while True:
oids_tids_to_del = OidTMap()
while 1:
if object_index.depth == 1:
# Nothing left to vacuum
break
Expand Down Expand Up @@ -880,6 +878,7 @@ def _vacuum(self, cache, object_index):
"Examining %d old OIDs to see if they've been replaced",
len(in_both)
)

for oid in in_both:
old_tid = obsolete_bucket[oid]
newer_tid = object_index[oid]
Expand All @@ -893,12 +892,11 @@ def _vacuum(self, cache, object_index):
# data, because the object changed in the future.
# This particular transaction chunk won't be complete, but
# it's inaccessible.
# TODO: A bulk call for this.
# This is where we should hook in the 'invalidation' tracing.
del obsolete_bucket[oid]
oids_tids_to_del[oid] = old_tid # These will just keep going up
# If we have a shared memcache, we can't be sure everyone
# else is done with this key, so we just leave it alone.
del local_client[(oid, old_tid)]

# Now at this point, the obsolete_bucket contains data that we know is
# either not present in a future map, or is present with exactly the
Expand All @@ -908,10 +906,13 @@ def _vacuum(self, cache, object_index):
# useful to have in this last bucket and we can throw it away. Note that
# we do *not* remove the index entries; they're needed to keep
# the CST in sync for newer transactions that might still be open.
self.log(TRACE, "Vacuum: Freezing %s old OIDs", len(obsolete_bucket))
if obsolete_bucket:
self.log(TRACE, "Vacuum: Freezing %s old OIDs", len(obsolete_bucket))
local_client.freeze(obsolete_bucket)

if oids_tids_to_del:
local_client.delitems(oids_tids_to_del)


def flush_all(self):
with self._lock:
Expand Down
2 changes: 1 addition & 1 deletion src/relstorage/cache/tests/test_local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def test_set_and_get_string_uncompressed(self):

def test_set_and_get_object_too_large(self):
c = self._makeOne(cache_local_compression='none')
c[self.key] = (b'abcdefgh' * 10000, 1)
c[self.key] = (b'abcdefgh' * 10000, self.key_tid)
self.assertEqual(c[self.key], None)

def test_set_with_zero_space(self):
Expand Down
23 changes: 0 additions & 23 deletions src/relstorage/cache/tests/test_lru_cffiring.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,29 +200,6 @@ def test_delete(self):
self.assertIsNone(cache[(1, 0)])
self.assertEqual(list(cache), [])

def test_get_backup_not_found(self):
c = self._makeOne(100)
r = c.get_from_key_or_backup_key((1, 0), None)
self.assertIsNone(r)

def test_get_backup_at_pref(self):
c = self._makeOne(100)
c[(1, 0)] = (b'1', 0)
c[(1, 1)] = (b'2', 0)

result = c.get_from_key_or_backup_key((1, 0), None)
self.assertEqual(result, (b'1', 0))

def test_get_backup_at_backup(self):
c = self._makeOne(100)
c[(1, 1)] = (b'2', 0)

result = c.get_from_key_or_backup_key((1, 0), (1, 1))
self.assertEqual(result, (b'2', 0))
self.assertEqual(len(c), 1)
self.assertIn((1, 0), c)
self.assertNotIn((1, 1), c)

def test_entries(self):
cache = self._makeOne(20)
cache[(1, 0)] = (b'abc', 0)
Expand Down

0 comments on commit 828b948

Please sign in to comment.