Skip to content

Commit

Permalink
Merge pull request #23 from zopefoundation/load-uses-loadBefore
Browse files Browse the repository at this point in the history
Load uses load before
  • Loading branch information
jimfulton committed Jun 7, 2016
2 parents 5069266 + 14adf5a commit 0bde326
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 70 deletions.
9 changes: 9 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ Changelog
4.2.0 (unreleased)
------------------

- Changed loadBefore to operate more like load behaved, especially
with regard to the load lock. This allowes ZEO to work with the
upcoming ZODB 5, which used loadbefore rather than load.

Reimplemented load using loadBefore, thus testing loadBefore
extensively via existing tests.

- Fixed: the ZEO cache loadBefore method failed to utilize current data.

- Drop support for Python 2.6 and 3.2.

4.2.0b1 (2015-06-05)
Expand Down
82 changes: 23 additions & 59 deletions src/ZEO/ClientStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@

logger = logging.getLogger(__name__)

# max signed 64-bit value ~ infinity :) Signed cuz LBTree and TimeStamp
m64 = b'\x7f\xff\xff\xff\xff\xff\xff\xff'

try:
from ZODB.ConflictResolution import ResolvedSerial
except ImportError:
Expand Down Expand Up @@ -819,75 +822,36 @@ def load(self, oid, version=''):
otherwise a KeyError is raised.
"""
self._lock.acquire() # for atomic processing of invalidations
try:
t = self._cache.load(oid)
if t:
return t
finally:
self._lock.release()
result = self.loadBefore(oid, m64)
if result is None:
raise POSException.POSKeyError(oid)
return result[:2]

def loadBefore(self, oid, tid):
"""Load the object data written before a transaction id
"""
with self._lock: # for atomic processing of invalidations
result = self._cache.loadBefore(oid, tid)
if result:
return result

if self._server is None:
raise ClientDisconnected()

self._load_lock.acquire()
try:
self._lock.acquire()
try:
with self._load_lock:
with self._lock:
self._load_oid = oid
self._load_status = 1
finally:
self._lock.release()

data, tid = self._server.loadEx(oid)
result = self._server.loadBefore(oid, tid)

self._lock.acquire() # for atomic processing of invalidations
try:
if self._load_status:
self._cache.store(oid, tid, None, data)
with self._lock: # for atomic processing of invalidations
if result and self._load_status:
data, tid, end = result
self._cache.store(oid, tid, end, data)
self._load_oid = None
finally:
self._lock.release()
finally:
self._load_lock.release()

return data, tid

def loadBefore(self, oid, tid):
self._lock.acquire()
try:
t = self._cache.loadBefore(oid, tid)
if t is not None:
return t
finally:
self._lock.release()

t = self._server.loadBefore(oid, tid)
if t is None:
return None
data, start, end = t
if end is None:
# This method should not be used to get current data. It
# doesn't use the _load_lock, so it is possble to overlap
# this load with an invalidation for the same object.

# If we call again, we're guaranteed to get the
# post-invalidation data. But if the data is still
# current, we'll still get end == None.

# Maybe the best thing to do is to re-run the test with
# the load lock in the case. That's slow performance, but
# I don't think real application code will ever care about
# it.

return data, start, end
self._lock.acquire()
try:
self._cache.store(oid, start, end, data)
finally:
self._lock.release()

return data, start, end
return result

def new_oid(self):
"""Storage API: return a new object identifier."""
Expand Down
30 changes: 23 additions & 7 deletions src/ZEO/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ def getLastTid(self):
# @defreturn 3-tuple: (string, string, string)

@locked
def load(self, oid):
def load(self, oid, before_tid=None):
ofs = self.current.get(oid)
if ofs is None:
self._trace(0x20, oid)
Expand All @@ -509,6 +509,9 @@ def load(self, oid):
assert end_tid == z64, (ofs, self.f.tell(), oid, tid, end_tid)
assert lver == 0, "Versions aren't supported"

if before_tid and tid >= before_tid:
return None

data = read(ldata)
assert len(data) == ldata, (ofs, self.f.tell(), oid, len(data), ldata)

Expand Down Expand Up @@ -550,13 +553,22 @@ def load(self, oid):
def loadBefore(self, oid, before_tid):
noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid is None:
self._trace(0x24, oid, "", before_tid)
return None
result = self.load(oid, before_tid)
if result:
return result[0], result[1], None
else:
self._trace(0x24, oid, "", before_tid)
return result

items = noncurrent_for_oid.items(None, u64(before_tid)-1)
if not items:
self._trace(0x24, oid, "", before_tid)
return None
result = self.load(oid, before_tid)
if result:
return result[0], result[1], None
else:
self._trace(0x24, oid, "", before_tid)
return result

tid, ofs = items[-1]

self.f.seek(ofs)
Expand All @@ -577,8 +589,12 @@ def loadBefore(self, oid, before_tid):
assert read(8) == oid, (ofs, self.f.tell(), oid)

if end_tid < before_tid:
self._trace(0x24, oid, "", before_tid)
return None
result = self.load(oid, before_tid)
if result:
return result[0], result[1], None
else:
self._trace(0x24, oid, "", before_tid)
return result

self._n_accesses += 1
self._trace(0x26, oid, "", saved_tid)
Expand Down
19 changes: 18 additions & 1 deletion src/ZEO/tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,9 @@ def testChangingCacheSize(self):
# We use large-2 for the same reason we used small-1 above.
expected_len = large-2
self.assertEquals(len(cache), expected_len)
expected_oids = set(list(range(11, 50))+list(range(106, 110))+list(range(200, 305)))
expected_oids = set(list(range(11, 50)) +
list(range(106, 110)) +
list(range(200, 305)))
self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
expected_oids)

Expand All @@ -336,6 +338,21 @@ def testSetAnyLastTidOnEmptyCache(self):
self.cache.setLastTid(p64(3))
self.cache.setLastTid(p64(4))

def test_loadBefore_doesnt_miss_current(self):
# Make sure that loadBefore get's current data if there
# isn't non-current data

cache = self.cache
oid = n1
cache.store(oid, n1, None, b'first')
self.assertEqual(cache.loadBefore(oid, n1), None)
self.assertEqual(cache.loadBefore(oid, n2), (b'first', n1, None))
self.cache.invalidate(oid, n2)
cache.store(oid, n2, None, b'second')
self.assertEqual(cache.loadBefore(oid, n1), None)
self.assertEqual(cache.loadBefore(oid, n2), (b'first', n1, n2))
self.assertEqual(cache.loadBefore(oid, n3), (b'second', n2, None))

def kill_does_not_cause_cache_corruption():
r"""
Expand Down
11 changes: 8 additions & 3 deletions src/ZEO/zrpc/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,19 +242,24 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# Undone oid info returned by vote.
#
# Z3101 -- checkCurrentSerialInTransaction
#
# Z4 -- checkCurrentSerialInTransaction
# No-longer call load.

# Protocol variables:
# Our preferred protocol.
current_protocol = b"Z3101"
current_protocol = b"Z4"

# If we're a client, an exhaustive list of the server protocols we
# can accept.
servers_we_can_talk_to = [b"Z308", b"Z309", b"Z310", current_protocol]
servers_we_can_talk_to = [b"Z308", b"Z309", b"Z310", b"Z3101",
current_protocol]

# If we're a server, an exhaustive list of the client protocols we
# can accept.
clients_we_can_talk_to = [
b"Z200", b"Z201", b"Z303", b"Z308", b"Z309", b"Z310", current_protocol]
b"Z200", b"Z201", b"Z303", b"Z308", b"Z309", b"Z310", b"Z3101",
current_protocol]

# This is pretty excruciating. Details:
#
Expand Down

0 comments on commit 0bde326

Please sign in to comment.