Skip to content

Commit

Permalink
Merge pull request #356 from zodb/simplify-max-stored
Browse files Browse the repository at this point in the history
Simplify and localize the way most temporary objects are stored.
  • Loading branch information
jamadden committed Sep 27, 2019
2 parents 2c8997d + 1a3b83d commit 0e70ea5
Show file tree
Hide file tree
Showing 15 changed files with 261 additions and 374 deletions.
10 changes: 10 additions & 0 deletions src/relstorage/_compat.py
Expand Up @@ -110,6 +110,11 @@ def OidSet_discard(s, val):
s.remove(val)
except KeyError:
pass

def OidObjectMap_max_key(bt):
if not bt:
return 0
return bt.maxKey()
else:
OID_TID_MAP_TYPE = dict
OID_OBJECT_MAP_TYPE = dict
Expand All @@ -130,6 +135,11 @@ def OidSet_difference(c1, c2):

OidSet_discard = set.discard

def OidObjectMap_max_key(mapping):
if not mapping:
return 0
return max(iterkeys(mapping))

# Lists of OIDs or TIDs. These could be simple list() objects, or we
# can treat them as numbers and store them in array.array objects, if
# we have an unsigned 64-bit element type. array.array, just like the
Expand Down
27 changes: 0 additions & 27 deletions src/relstorage/autotemp.py

This file was deleted.

12 changes: 0 additions & 12 deletions src/relstorage/cache/interfaces.py
Expand Up @@ -82,18 +82,6 @@ def poll(cache, conn, cursor):
in this method should establish the snapshot for the first time.
"""

def after_tpc_finish(viewer, tid_int, temp_objects):
"""
Let the coordinator know that the registered *viewer* is
between transactions and can release its last snapshot.
The transaction it just committed was at *tid_int*, and it contained
*temp_objects*.
If *tid_int* is None, then the transaction was aborted; the viewer can still
release its snapshot.
"""

def stats():
"""
Return a dictionary with interesting keys and values
Expand Down
16 changes: 0 additions & 16 deletions src/relstorage/cache/mvcc.py
Expand Up @@ -578,22 +578,6 @@ def reset_viewer(self, cache):
cache.object_index = None
self.change(cache, None)

def after_tpc_finish(self, cache, tid_int, temp_objects):
# Note that unless we've polled, this data is not yet
# going to be visible to anyone.
cache.cache.set_all_for_tid(tid_int, temp_objects)

def afterCompletion(self, cache):
# We briefly thought this would be a good time to take the viewer
# out of consideration as far as minimum visible TID is concerned,
# but if we do that we wind up polling with different, overlapping, big
# polls for each client, instead of moving linearly forward with small
# polls. The small polls are much more predictable. We count on our
# _vacuum method to keep the history from growing too large.
pass

tpc_abort = afterCompletion

def poll(self, cache, conn, cursor):
with self._lock:
cur_ix = self.object_index
Expand Down
140 changes: 6 additions & 134 deletions src/relstorage/cache/storage_cache.py
Expand Up @@ -26,12 +26,8 @@
from ZODB.utils import p64
from zope import interface


from relstorage.autotemp import AutoTemporaryFile
from relstorage._compat import OID_OBJECT_MAP_TYPE
from relstorage._compat import OID_SET_TYPE as OIDSet
from relstorage._compat import iteroiditems
from relstorage._compat import IN_TESTRUNNER
from relstorage._compat import OID_SET_TYPE as OIDSet
from relstorage._util import bytes8_to_int64
from relstorage._mvcc import DetachableMVCCDatabaseViewer

Expand Down Expand Up @@ -79,11 +75,6 @@ class StorageCache(DetachableMVCCDatabaseViewer):
'local_client',
'cache',
'object_index',

# Things used during commit
'temp_objects',
'store_temp',
'read_temp',
)


Expand All @@ -99,12 +90,6 @@ def __init__(self, adapter, options, prefix, _parent=None):
self.options = options
self.keep_history = options.keep_history
self.prefix = prefix or ''
# queue is a _TemporaryStorage used during commit
self.temp_objects = None # type: _TemporaryStorage
# store_temp and read_temp are methods copied from the queue while
# we are committing.
self.store_temp = None
self.read_temp = None

if _parent is None:
# I must be the master!
Expand Down Expand Up @@ -533,51 +518,20 @@ def remove_all_cached_data_for_oids(self, oids):
# Remove the data too.
self.cache.invalidate_all(oids)

def tpc_begin(self):
"""Prepare temp space for objects to cache."""
q = self.temp_objects = _TemporaryStorage()
self.store_temp = q.store_temp
self.read_temp = q.read_temp

def after_tpc_finish(self, tid):
def after_tpc_finish(self, tid, temp_storage):
"""
Flush queued changes.
This is called after the database commit lock is released.
This is called after the database commit lock is released,
but before control is returned to the Connection.
Now that this tid is known, send all queued objects to the
cache. The cache will have ``(oid, tid)`` entry for each object
we have been holding on to (well, in a big transaction, some of them
might actually not get stored in the cache. But we try!)
"""
try:
tid_int = bytes8_to_int64(tid)
# Let the coordinator know ASAP that we're between transactions,
# and give it the opportunity to update the object index and cache
# if possible. Note that *we* cannot update our index: this transaction isn't
# visible to us until we poll.
self.polling_state.after_tpc_finish(self, tid_int, self.temp_objects)
finally:
self.clear_temp()

def tpc_abort(self):
self.clear_temp()
self.polling_state.tpc_abort(self)

def afterCompletion(self, load_connection):
load_connection.rollback_quietly()
self.polling_state.afterCompletion(self)

def clear_temp(self):
"""Discard all transaction-specific temporary data.
Called after transaction finish or abort.
"""
if self.temp_objects is not None:
self.store_temp = None
self.read_temp = None
self.temp_objects.close()
self.temp_objects = None
tid_int = bytes8_to_int64(tid)
self.cache.set_all_for_tid(tid_int, temp_storage)

def poll(self, conn, cursor, ignore_tid):
try:
Expand Down Expand Up @@ -610,85 +564,3 @@ def poll(self, conn, cursor, _):
# We'll never try to poll again, so this is ok.
self.polling_state.unregister(self)
return ()

class _TemporaryStorage(object):
def __init__(self):
# start with a fresh in-memory buffer instead of reusing one that might
# already be spooled to disk.
# TODO: An alternate idea would be a temporary sqlite database.
self._queue = AutoTemporaryFile()
# {oid: (startpos, endpos, prev_tid_int)}
self._queue_contents = OID_OBJECT_MAP_TYPE()

def reset(self):
self._queue_contents.clear()
self._queue.seek(0)

def store_temp(self, oid_int, state, prev_tid_int=0):
"""
Queue an object for caching.
Typically, we can't actually cache the object yet, because its
transaction ID is not yet chosen.
"""
assert isinstance(state, bytes)
queue = self._queue
queue.seek(0, 2) # seek to end
startpos = queue.tell()
queue.write(state)
endpos = queue.tell()
self._queue_contents[oid_int] = (startpos, endpos, prev_tid_int)

def __len__(self):
# How many distinct OIDs have been stored?
return len(self._queue_contents)

def __bool__(self):
return True

__nonzero__ = __bool__

@property
def stored_oids(self):
return self._queue_contents

def _read_temp_state(self, startpos, endpos):
self._queue.seek(startpos)
length = endpos - startpos
state = self._queue.read(length)
if len(state) != length:
raise AssertionError("Queued cache data is truncated")
return state

def read_temp(self, oid_int):
"""
Return the bytes for a previously stored temporary item.
"""
startpos, endpos, _ = self._queue_contents[oid_int]
return self._read_temp_state(startpos, endpos)

def __iter__(self):
return self.iter_for_oids(None)

def iter_for_oids(self, oids):
read_temp_state = self._read_temp_state
for startpos, endpos, oid_int, prev_tid_int in self.items(oids):
state = read_temp_state(startpos, endpos)
yield state, oid_int, prev_tid_int

def items(self, oids=None):
# Order the queue by file position, which should help
# if the file is large and needs to be read
# sequentially from disk.
items = [
(startpos, endpos, oid_int, prev_tid_int)
for (oid_int, (startpos, endpos, prev_tid_int)) in iteroiditems(self._queue_contents)
if oids is None or oid_int in oids
]
items.sort()
return items

def close(self):
self._queue.close()
self._queue = None
self._queue_contents = None
20 changes: 10 additions & 10 deletions src/relstorage/cache/tests/test_cache_stats.py
Expand Up @@ -30,7 +30,7 @@
import relstorage.cache

from relstorage.cache.tests import MockOptions, MockAdapter

from relstorage.storage.tpc.temporary_storage import TemporaryStorage

def _build_history():
random = random2.Random(42)
Expand All @@ -47,9 +47,9 @@ def _build_history():
b'x' * random.randint(200, 2000)))
return history

def _send_queue(self, tid_int):
self.cache.set_all_for_tid(tid_int, self.temp_objects)
self.temp_objects.reset()
def _send_queue(self, tid_int, temp_objects):
self.cache.set_all_for_tid(tid_int, temp_objects)
temp_objects.reset()

now = 0

Expand Down Expand Up @@ -89,16 +89,16 @@ def cache_run(name, size):
poller.poll_changes = []
new_cache.poll(None, None, None)

new_cache.tpc_begin()
cache.tpc_begin()
new_cache_ts = TemporaryStorage()
cache_ts = TemporaryStorage()
data_tot = 0
for action, oid, serial, data in history:
data_tot += len(data)
now += 1
if action == b's':
cache.store_temp(oid, data)
cache_ts.store_temp(oid, data)
#print("Storing", oid, serial)
_send_queue(cache, serial)
_send_queue(cache, serial, cache_ts)
cache.adapter.mover.data[oid] = (data, serial)
poller.poll_tid = serial
poller.poll_changes = [(oid, serial)]
Expand All @@ -110,8 +110,8 @@ def cache_run(name, size):
v = new_cache.load(None, oid)
if v[0] is None:
#print("Store after miss", oid, 1)
new_cache.store_temp(oid, data)
_send_queue(new_cache, 1)
new_cache_ts.store_temp(oid, data)
_send_queue(new_cache, 1, new_cache_ts)
cache.adapter.mover.data[oid] = (data, 1)
finally:
cache.close(close_async=False)
Expand Down

0 comments on commit 0e70ea5

Please sign in to comment.