Skip to content

Commit

Permalink
persistent.picklecache: 100% branch coverage.
Browse files Browse the repository at this point in the history
  • Loading branch information
tseaver committed Dec 16, 2014
1 parent 1462ee2 commit 6568869
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
4 changes: 3 additions & 1 deletion persistent/picklecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ def _invalidate(self, oid):
if value is not None and value._p_state != GHOST:
value._p_invalidate()
node = self.ring.next
while node is not self.ring:
while True:
if node is self.ring:
break # pragma: no cover belt-and-suspenders
if node.object is value:
node.prev.next, node.next.prev = node.next, node.prev
break
Expand Down
56 changes: 56 additions & 0 deletions persistent/tests/test_picklecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ def test___setitem___non_string_oid_raises_ValueError(self):
else:
self.fail("Didn't raise ValueError with non-string OID.")

def test___setitem___duplicate_oid_same_obj(self):
from persistent._compat import _b
KEY = _b('original')
cache = self._makeOne()
original = self._makePersist()
cache[KEY] = original
cache[KEY] = original

def test___setitem___duplicate_oid_raises_KeyError(self):
from persistent._compat import _b
KEY = _b('original')
Expand Down Expand Up @@ -448,6 +456,54 @@ def test_full_sweep(self):
for oid in oids:
self.assertTrue(cache.get(oid) is None)

def test_full_sweep_w_sticky(self):
import gc
from persistent.interfaces import UPTODATE
from persistent.interfaces import STICKY
from persistent._compat import _b
cache = self._makeOne()
oids = []
for i in range(100):
oid = _b('oid_%04d' % i)
oids.append(oid)
state = UPTODATE if i > 0 else STICKY
cache[oid] = self._makePersist(oid=oid, state=state)
self.assertEqual(cache.cache_non_ghost_count, 100)

cache.full_sweep()
gc.collect() # banish the ghosts who are no longer in the ring

self.assertEqual(cache.cache_non_ghost_count, 1)
self.assertTrue(cache.ring.next is not cache.ring)

self.assertTrue(cache.get(oids[0]) is not None)
for oid in oids[1:]:
self.assertTrue(cache.get(oid) is None)

def test_full_sweep_w_changed(self):
import gc
from persistent.interfaces import UPTODATE
from persistent.interfaces import CHANGED
from persistent._compat import _b
cache = self._makeOne()
oids = []
for i in range(100):
oid = _b('oid_%04d' % i)
oids.append(oid)
state = UPTODATE if i > 0 else CHANGED
cache[oid] = self._makePersist(oid=oid, state=state)
self.assertEqual(cache.cache_non_ghost_count, 100)

cache.full_sweep()
gc.collect() # banish the ghosts who are no longer in the ring

self.assertEqual(cache.cache_non_ghost_count, 1)
self.assertTrue(cache.ring.next is not cache.ring)

self.assertTrue(cache.get(oids[0]) is not None)
for oid in oids[1:]:
self.assertTrue(cache.get(oid) is None)

def test_minimize(self):
import gc
from persistent.interfaces import UPTODATE
Expand Down

0 comments on commit 6568869

Please sign in to comment.