Skip to content

Commit

Permalink
Merge 127c5c2 into 60f4b8b
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Aug 27, 2020
2 parents 60f4b8b + 127c5c2 commit 4f1d745
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 8 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Expand Up @@ -23,6 +23,10 @@
making a load query for the first time in a particular connection.
See :issue:`411`.

- Add some DEBUG level logging around forced invalidations of
persistent object caches due to exceeding the cache MVCC limits. See
:issue:`338`.

3.2.0 (2020-07-20)
==================

Expand Down
17 changes: 14 additions & 3 deletions src/relstorage/_mvcc.py
Expand Up @@ -7,6 +7,7 @@
from __future__ import print_function

import threading
from logging import DEBUG as LDEBUG

from zope import interface

Expand All @@ -15,6 +16,8 @@
from .interfaces import IMVCCDatabaseCoordinator
from .interfaces import IDetachableMVCCDatabaseViewer

logger = __import__('logging').getLogger(__name__)

@interface.implementer(IDetachableMVCCDatabaseViewer)
class DetachableMVCCDatabaseViewer(object):
__slots__ = (
Expand Down Expand Up @@ -53,6 +56,7 @@ def __init__(self):
self._by_tid = family64.UO.Bucket()
self._registered_viewers = set()
self.is_registered = self._registered_viewers.__contains__
self.log = logger.log

@property
def _viewer_count_at_min(self):
Expand Down Expand Up @@ -204,7 +208,14 @@ def detach_viewers_at_minimum(self):
"""
with self._lock:
if self._by_tid:
at_min = self._by_tid.pop(self._by_tid.minKey())
for viewer in at_min:
viewer.detached = True
min_tid = self._by_tid.minKey()
at_min = self._by_tid.pop(min_tid)
if at_min:
self.log(
LDEBUG,
"Detaching %d viewers at oldest TID 0x%x",
len(at_min), min_tid,
)
for viewer in at_min:
viewer.detached = True
self.__set_tids()
22 changes: 17 additions & 5 deletions src/relstorage/cache/mvcc.py
Expand Up @@ -20,6 +20,7 @@
from __future__ import print_function

import os
from logging import DEBUG as LDEBUG

from zope.interface import implementer

Expand All @@ -33,7 +34,7 @@
from relstorage._compat import IN_TESTRUNNER
from relstorage._util import log_timed
from relstorage._util import positive_integer
from relstorage._util import TRACE
from relstorage._util import TRACE as LTRACE
from relstorage._util import get_time_from_environ
from relstorage._mvcc import DetachableMVCCDatabaseCoordinator
from relstorage.options import Options
Expand Down Expand Up @@ -671,7 +672,7 @@ def _poll(self, cache, conn, cursor,
# So we do that now.
change_iter = list(change_iter)
self.log(
TRACE,
LTRACE,
"Polled new tid %s since %s with %s changes",
polled_tid, polling_since, len(change_iter)
)
Expand Down Expand Up @@ -769,6 +770,10 @@ def _find_changes_for_viewer(viewer, object_index):
# Whelp, it needs to invalidate all its cached objects (so
# we must return None), but it can still use our index and
# poll state going forward; we don't need to go backwards.
logger.debug(
"Invalidating all persistent objects for viewer %r (detached? %s)",
viewer, viewer.detached
)
return None

# Somewhere in the index is a map with the highest visible tid
Expand Down Expand Up @@ -839,12 +844,19 @@ def _vacuum(self, cache, object_index):
object_index.depth > self.max_allowed_index_depth
or object_index.total_size > self.max_allowed_index_size
):
self.log(
LDEBUG,
"Detaching oldest viewers because depth (%s) > max depth (%s) "
"or total size (%s) > max size (%s)",
object_index.depth, self.max_allowed_index_depth,
object_index.total_size, self.max_allowed_index_size,
)
self.detach_viewers_at_minimum()

required_tid = self.minimum_highest_visible_tid # This won't change during our invocation
local_client = cache.local_client
self.log(
TRACE,
LTRACE,
"Attempting vacuum from %s up to %s",
object_index.minimum_highest_visible_tid,
required_tid,
Expand Down Expand Up @@ -874,7 +886,7 @@ def _vacuum(self, cache, object_index):
in_both = OidTMap_intersection(newer_oids, obsolete_bucket)

self.log(
TRACE,
LTRACE,
"Examining %d old OIDs to see if they've been replaced",
len(in_both)
)
Expand Down Expand Up @@ -907,7 +919,7 @@ def _vacuum(self, cache, object_index):
# we do *not* remove the index entries; they're needed to keep
# the CST in sync for newer transactions that might still be open.
if obsolete_bucket:
self.log(TRACE, "Vacuum: Freezing %s old OIDs", len(obsolete_bucket))
self.log(LTRACE, "Vacuum: Freezing %s old OIDs", len(obsolete_bucket))
local_client.freeze(obsolete_bucket)

if oids_tids_to_del:
Expand Down

0 comments on commit 4f1d745

Please sign in to comment.