Skip to content

Commit

Permalink
Fix for KeyErrors emanating from TemporaryStorage unexpectedly. A
Browse files Browse the repository at this point in the history
connection can be holding on to an object with references to an object
that has been recently garbage collected.  When the object is unghosted in
the reference-holding thread, the storage cannot find the oid (because it's
out of storage due to gc).  We work around this by turning KeyErrors that
we believe are due to this scenario into ConflictErrors.
  • Loading branch information
mcdonc committed May 16, 2004
1 parent 040018e commit 33ced44
Showing 1 changed file with 39 additions and 19 deletions.
58 changes: 39 additions & 19 deletions TemporaryStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
non-cyclic garbage and it does rudimentary conflict resolution. This is a
ripoff of Jim's Packless bsddb3 storage.
$Id: TemporaryStorage.py,v 1.5 2004/02/19 18:35:23 jeremy Exp $
$Id: TemporaryStorage.py,v 1.1.2.2 2004/05/16 01:41:34 chrism Exp $
"""

__version__ ='$Revision: 1.5 $'[11:-2]
__version__ ='$Revision: 1.1.2.2 $'[11:-2]

from zLOG import LOG, BLATHER
from ZODB.serialize import referencesf
from ZODB.referencesf import referencesf
from ZODB import POSException
from ZODB.BaseStorage import BaseStorage
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
Expand All @@ -33,6 +33,8 @@
CONFLICT_CACHE_MAXAGE = 60
# garbage collect conflict cache every CONFLICT_CACHE_GCEVERY seconds
CONFLICT_CACHE_GCEVERY = 60
# keep history of recently gc'ed oids of length RECENTLY_GC_OIDS_LEN
RECENTLY_GC_OIDS_LEN = 200

class ReferenceCountError(POSException.POSError):
""" An error occured while decrementing a reference to an object in
Expand All @@ -55,6 +57,7 @@ def __init__(self, name='TemporaryStorage'):
_tmp -- used by 'store' to collect changes before finalization
_conflict_cache -- cache of recently-written object revisions
_last_cache_gc -- last time that conflict cache was garbage collected
_recently_gc_oids -- a queue of recently gc'ed oids
"""
BaseStorage.__init__(self, name)

Expand All @@ -65,6 +68,7 @@ def __init__(self, name='TemporaryStorage'):
self._tmp = []
self._conflict_cache = {}
self._last_cache_gc = 0
self._recently_gc_oids = [None for x in range (RECENTLY_GC_OIDS_LEN)]
self._oid = '\0\0\0\0\0\0\0\0'

def __len__(self):
Expand All @@ -91,16 +95,26 @@ def close(self):
def load(self, oid, version):
self._lock_acquire()
try:
s=self._index[oid]
p=self._opickle[oid]
return p, s # pickle, serial
try:
s=self._index[oid]
p=self._opickle[oid]
return p, s # pickle, serial
except KeyError:
# this oid was probably garbage collected while a thread held
# on to an object that had a reference to it; we can probably
# force the loader to sync their connection by raising a
# ConflictError (at least if Zope is the loader, because it
# will resync its connection on a retry). This isn't
# perfect because the length of the recently gc'ed oids list
# is finite and could be overrun through a mass gc, but it
# should be adequate in common-case usage.
if oid in self._recently_gc_oids:
raise POSException.ConflictError(oid=oid)
else:
raise
finally:
self._lock_release()

def loadEx(self, oid, version):
p, s = self.load(oid, version)
return p, s, s

def loadSerial(self, oid, serial, marker=[]):
""" this is only useful to make conflict resolution work. It
does not actually implement all the semantics that a revisioning
Expand Down Expand Up @@ -132,16 +146,13 @@ def store(self, oid, serial, data, version, transaction):
if self._index.has_key(oid):
oserial=self._index[oid]
if serial != oserial:
rdata = self.tryToResolveConflict(oid, oserial,
serial, data)
if rdata is None:
raise POSException.ConflictError(
oid=oid, serials=(oserial, serial), data=data)
else:
data = rdata
data=self.tryToResolveConflict(oid, oserial, serial, data)
if not data:
raise POSException.ConflictError(oid=oid,
serials=(oserial, serial))
else:
oserial = serial
newserial=self._tid
newserial=self._serial
self._tmp.append((oid, data))
now = time.time()
self._conflict_cache[(oid, newserial)] = data, now
Expand All @@ -154,7 +165,7 @@ def _finish(self, tid, u, d, e):
referenceCount=self._referenceCount
referenceCount_get=referenceCount.get
oreferences=self._oreferences
serial=self._tid
serial=self._serial
index=self._index
opickle=self._opickle

Expand Down Expand Up @@ -229,13 +240,22 @@ def _takeOutGarbage(self, oid):
# take out the garbage.
referenceCount=self._referenceCount
referenceCount_get=referenceCount.get

self._recently_gc_oids.pop()
self._recently_gc_oids.insert(0, oid)

try: del referenceCount[oid]
except: pass
try: del self._opickle[oid]
except: pass
try: del self._index[oid]
except: pass

# remove this object from the conflict cache if it exists there
for k in self._conflict_cache.keys():
if k[0] == oid:
del self._conflict_cache[k]

# Remove/decref references
roids = self._oreferences.get(oid, [])
while roids:
Expand Down

0 comments on commit 33ced44

Please sign in to comment.