Skip to content

Commit

Permalink
Fix unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Aug 17, 2019
1 parent a2166bd commit 635d9df
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 85 deletions.
2 changes: 1 addition & 1 deletion src/relstorage/adapters/connections.py
Expand Up @@ -206,7 +206,7 @@ def call(self, f, can_reconnect, *args, **kwargs):
set this to false.
"""
fresh_connection = False
if not self:
if self.connection is None or self._cursor is None:
# We're closed or disconnected. Start a new connection entirely.
self.drop()
self._open_connection()
Expand Down
4 changes: 4 additions & 0 deletions src/relstorage/adapters/poller.py
Expand Up @@ -154,6 +154,10 @@ def list_changes(self, cursor, after_tid, last_tid):
"""
See ``IPoller``.
"""
if after_tid == last_tid:
# small optimization in case we're asked for the same.
# there can be no changes where change > X and change <= X
return ()
params = {'min_tid': after_tid, 'max_tid': last_tid}
self._list_changes_range_query.execute(cursor, params)
# Return the cursor: let it be its own iterable. This could be a
Expand Down
2 changes: 1 addition & 1 deletion src/relstorage/adapters/tests/test_connections.py
Expand Up @@ -120,7 +120,7 @@ def f(_conn, _cur, fresh):
if not fresh:
raise manager.connmanager.driver.disconnected_exceptions[0]

manager.call(f, True)
manager.call(f, can_reconnect=True)

self.assertEqual(called, [False, True])

Expand Down
65 changes: 49 additions & 16 deletions src/relstorage/cache/storage_cache.py
Expand Up @@ -232,9 +232,14 @@ def replace_checkpoints(

# # We got it, we're going to do it.
try:
self.__rebuild_checkpoints(cache, cursor, desired_checkpoints, new_tid_int)
_, da0, da1 = self.__rebuild_checkpoints(cache, cursor,
desired_checkpoints, new_tid_int)
finally:
self._checkpoint_lock.release()
cache.delta_after0 = da0
cache.delta_after1 = da1
cache.checkpoints = desired_checkpoints
return desired_checkpoints


def after_normal_poll(self, cache): # type: StorageCache -> None
Expand Down Expand Up @@ -353,20 +358,32 @@ def __poll_and_update(self, cache, cursor, new_checkpoints, current_tid, new_tid
# This should be a very small, usual poll, so we don't hold a poll lock either.
new_da0 = self.delta_map_type()
new_da1 = self.delta_map_type()
assert new_tid_int >= new_checkpoints[0]
assert current_tid < new_tid_int

self.__poll_into(cache, cursor, new_checkpoints[0], current_tid, new_tid_int,
self.__poll_into(cache, cursor, new_checkpoints[0], new_tid_int, new_tid_int,
new_da0, new_da1)

assert not new_da1 # Nothing to add after the tid we just polled, that's impossible.
original_new_da0 = self.delta_map_type(new_da0) # we mutate it.
# Nothing to add *after* the tid we just polled; that's impossible
# because this connection is locked to that tid.
if new_da0:
raise CacheConsistencyError(
"After polling for changes between (%s, %s] found changes above the limit: %s" % (
new_checkpoints[0], new_tid_int,
dict(new_da0)
)
)

original_new_da1 = self.delta_map_type(new_da1) # we mutate it.

with self._da0_lock:
# Ok, time has marched on.
# We just need to merge anything we've got, letting other updates take precedence.
# (Our data might be old)
self.current_tid = max(new_tid_int, self.current_tid)
new_da0.update(self.delta_after0)
self.delta_after0 = new_da0

new_da1.update(self.delta_after1)
self.delta_after1 = new_da1

if self.current_tid <= new_tid_int:
# Cool, everything is good to return.
Expand All @@ -379,7 +396,7 @@ def __poll_and_update(self, cache, cursor, new_checkpoints, current_tid, new_tid
# The current data could contain info that's out of range for us,
# so we can't use it.
# But we can update our older snapshot and return that.
original_new_da0.update(da0)
original_new_da1.update(da1)
return new_checkpoints, da0, da1

def __rebuild_checkpoints(self, cache, cursor, new_checkpoints, new_tid_int):
Expand Down Expand Up @@ -477,6 +494,11 @@ def release():
current_tid = self.current_tid

try:
if not checkpoints and not current_tid:
self.checkpoints = checkpoints
self.current_tid = new_tid_int
return checkpoints, self.delta_map_type(), self.delta_map_type()

if checkpoints == new_checkpoints:
if new_tid_int < current_tid:
# We just return empty maps and hope it catches up.
Expand All @@ -496,6 +518,8 @@ def release():

# It's greater. We need to catch up. But only between our
# tid and the new one.
assert new_tid_int > current_tid
assert new_checkpoints[0] <= new_tid_int
# Discard the lock; it's not necessary to hold while we poll and update
# if we do it carefully.
da0 = self.delta_map_type(self.delta_after0)
Expand Down Expand Up @@ -1074,6 +1098,12 @@ def tpc_begin(self):
self.store_temp = q.store_temp
self.read_temp = q.read_temp

def _send_queue(self, tid):
# BWC For tests (cache_trace_analysis.rst) only.
tid_int = u64(tid)
self.cache.set_all_for_tid(tid_int, self.temp_objects)
self.temp_objects.reset()

def after_tpc_finish(self, tid):
"""
Flush queued changes.
Expand Down Expand Up @@ -1170,10 +1200,8 @@ def after_tpc_finish(self, tid):
store(oid_int, tid_int)

self.polling_state.after_tpc_finish(tid_int, self.temp_objects.stored_oids)
# We only do this because cache_trace_analysis uses us
# in ways that aren't quite accurate. We'd prefer to call clear_temp()
# at this point.
self.temp_objects.reset()

self.clear_temp()

def clear_temp(self):
"""Discard all transaction-specific temporary data.
Expand Down Expand Up @@ -1421,7 +1449,8 @@ def __poll_establish_global_checkpoints(self, new_tid_int):
# Initialize the checkpoints; we've never polled before.
logger.debug("Initializing checkpoints: %s", new_tid_int)

# self.checkpoints = self.cache.store_checkpoints(new_tid_int, new_tid_int)
# Storing to the cache is here just for test BWC
self.cache.store_checkpoints(new_tid_int, new_tid_int)
self.checkpoints = (new_tid_int, new_tid_int)
self.polling_state.after_established_checkpoints(self)

Expand Down Expand Up @@ -1501,10 +1530,14 @@ def _suggest_shifted_checkpoints(self, cursor):
delta_size
)

# The poll code will later see the new checkpoints
# and update self.checkpoints and self.delta_after(0|1).
# return self.cache.replace_checkpoints(expect, change_to)
self.polling_state.replace_checkpoints(self, cursor, expect, change_to, tid_int)
# In the past, after setting this on the cache, the poll code
# will later see the new checkpoints and update
# self.checkpoints and self.delta_after(0|1).
self.cache.replace_checkpoints(expect, change_to)
# However, we no longer read from the cache (the set is only there
# for test compatibility). The polling state handles replacing things
# for us if necessary (no need to waste a trip around with bad checkpoints)
return self.polling_state.replace_checkpoints(self, cursor, expect, change_to, tid_int)


class _PersistentRowFilter(object):
Expand Down
127 changes: 63 additions & 64 deletions src/relstorage/cache/tests/test_storage_cache.py
Expand Up @@ -433,14 +433,6 @@ def test_after_poll_init_checkpoints(self):
self.assertEqual(c.checkpoints, (50, 50))
self.assertEqual(data['myprefix:checkpoints'], b'50 50')

def test_after_poll_ignore_garbage_checkpoints(self):
from relstorage.tests.fakecache import data
data['myprefix:checkpoints'] = 'baddata'
c = self._makeOne()
c.after_poll(None, 40, 50, [])
self.assertEqual(c.checkpoints, (50, 50))
self.assertEqual(data['myprefix:checkpoints'], b'50 50')

def test_after_poll_ignore_invalid_checkpoints(self):
from relstorage.tests.fakecache import data
data['myprefix:checkpoints'] = b'60 70' # bad: c0 < c1
Expand All @@ -453,6 +445,8 @@ def test_after_poll_future_checkpoints_when_cp_exist(self):
from relstorage.tests.fakecache import data
data['myprefix:checkpoints'] = b'90 80'
c = self._makeOne()
c.polling_state.checkpoints = (90, 80)
c.polling_state.current_tid = 90
c.checkpoints = (40, 30)
c.current_tid = 40
c.after_poll(None, 40, 50, [(2, 45)])
Expand All @@ -464,37 +458,38 @@ def test_after_poll_future_checkpoints_when_cp_exist(self):
self.assertEqual(dict(c.delta_after1), {})

def test_after_poll_future_checkpoints_when_cp_nonexistent(self):
from relstorage.tests.fakecache import data
data['myprefix:checkpoints'] = b'90 80'
c = self._makeOne()
c.polling_state.checkpoints = (90, 80)
c.polling_state.current_tid = 90

c.after_poll(None, 40, 50, [(2, 45)])
# This instance can't yet see txn 90, and there aren't any
# existing checkpoints, so fall back to the current tid.
self.assertEqual(c.checkpoints, (50, 50))
self.assertEqual(data['myprefix:checkpoints'], b'90 80')
self.assertEqual(c.polling_state.checkpoints, (90, 80))
self.assertEqual(dict(c.delta_after0), {})
self.assertEqual(dict(c.delta_after1), {})

def test_after_poll_retain_checkpoints(self):
from relstorage.tests.fakecache import data
data['myprefix:checkpoints'] = b'40 30'
c = self._makeOne()
c.polling_state.checkpoints = (40, 30)
c.polling_state.current_tid = 40
c.checkpoints = (40, 30)
c.current_tid = 40
c.delta_after1 = {1: 35}
c.after_poll(None, 40, 50, [(2, 45), (2, 41)])
self.assertEqual(c.checkpoints, (40, 30))
self.assertEqual(data['myprefix:checkpoints'], b'40 30')
self.assertEqual(c.polling_state.checkpoints, (40, 30))
self.assertEqual(dict(c.delta_after0), {2: 45})
self.assertEqual(dict(c.delta_after1), {1: 35})

def test_after_poll_new_checkpoints_bad_changes_out_of_order(self):
from relstorage.tests.fakecache import data
from relstorage.cache.interfaces import CacheConsistencyError
data['myprefix:checkpoints'] = b'50 40'

adapter = MockAdapter()
c = self.getClass()(adapter, MockOptionsWithFakeCache(), 'myprefix')
c.polling_state.checkpoints = (50, 40)
c.polling_state.current_tid = 40
c.checkpoints = (40, 30)
c.current_tid = 40

Expand All @@ -515,101 +510,105 @@ def test_after_poll_new_checkpoints_bad_changes_out_of_order(self):
self.assertIsNone(c.current_tid)

def test_after_poll_new_checkpoints(self):
from relstorage.tests.fakecache import data
data['myprefix:checkpoints'] = b'50 40'

# list_changes isn't required to provide changes in any particular
# order.
# list_changes isn't not required to provide a list of tuples;
# it could provide a list of lists. That turns out to matter
# to the BTree constructor.
changes = [(3, 42), (1, 35), (2, 45)]

for f in (tuple, list):
adapter = MockAdapter()
c = self.getClass()(adapter, MockOptionsWithFakeCache(), 'myprefix')
adapter.poller.changes = [f(t) for t in changes]
__traceback_info__ = adapter.poller.changes
c.checkpoints = (40, 30)
c.current_tid = 40
adapter = MockAdapter()
c = self.getClass()(adapter, MockOptionsWithFakeCache(), 'myprefix')
adapter.poller.changes = changes
__traceback_info__ = adapter.poller.changes
c.polling_state.checkpoints = (50, 40)
c.polling_state.current_tid = 40
c.checkpoints = (40, 30)
c.current_tid = 40

shifted_checkpoints = c.after_poll(None, 40, 50, None)
c.after_poll(None, 40, 50, None)

self.assertEqual(c.checkpoints, (50, 40))
self.assertIsNone(shifted_checkpoints)
self.assertEqual(data['myprefix:checkpoints'], b'50 40')
self.assertEqual(dict(c.delta_after0), {})
self.assertEqual(dict(c.delta_after1), {2: 45, 3: 42})
self.assertEqual(c.local_client.get_checkpoints(), shifted_checkpoints)
self.assertEqual(c.checkpoints, (50, 40))
self.assertEqual(c.polling_state.checkpoints, (50, 40))
# polling_state assumes it is in sync and doesn't poll history.
self.assertEqual(adapter.poller.last_requested_range, (50, 50))

def test_after_poll_gap(self):
from relstorage.tests.fakecache import data
data['myprefix:checkpoints'] = b'40 30'
adapter = MockAdapter()
c = self.getClass()(adapter, MockOptionsWithFakeCache(), 'myprefix')
c.polling_state.checkpoints = (40, 30)
c.polling_state.current_tid = 40
c.polling_state.delta_after0 = {2: 45, 3: 42}
c.polling_state.delta_after1 = {1: 35}
adapter.poller.changes = [(3, 42), (1, 35), (2, 45)]
c.checkpoints = (40, 30)
c.current_tid = 40
# provide a prev_tid_int that shows a gap in the polled
# transaction list, forcing a rebuild of delta_after(0|1).
c.after_poll(None, 43, 50, [(2, 45)])


c.after_poll(None, prev_tid_int=43, new_tid_int=50, changes=[(2, 45)])
self.assertEqual(c.checkpoints, (40, 30))
self.assertEqual(data['myprefix:checkpoints'], b'40 30')
self.assertEqual(c.polling_state.checkpoints, (40, 30))
self.assertEqual(dict(c.delta_after0), {2: 45, 3: 42})
self.assertEqual(dict(c.delta_after1), {1: 35})

def test_after_poll_shift_checkpoints(self):
from relstorage.tests.fakecache import data
data['myprefix:checkpoints'] = b'40 30'
def test_after_poll_shift_checkpoints_for_growth(self):
c = self._makeOne()
c.delta_size_limit = 2
c.polling_state.checkpoints = (40, 30)
c.polling_state.current_tid = 40
c.checkpoints = (40, 30)
c.delta_size_limit = 1

c.current_tid = 40
c.after_poll(None, 40, 314, [(1, 45), (2, 46)])
expected_checkpoints = (314, 40)
self.assertEqual(c.checkpoints, (40, 30))
self.assertEqual(c.local_client.get_checkpoints(), expected_checkpoints)
self.assertEqual(dict(c.delta_after0), {1: 45, 2: 46})
self.assertEqual(dict(c.delta_after1), {})
expected_checkpoints = (314, 314) # because we grew too much.
self.assertEqual(c.checkpoints, expected_checkpoints)
self.assertEqual(c.polling_state.checkpoints, expected_checkpoints)
self.assertIsEmpty(c.delta_after0)
self.assertIsEmpty(c.delta_after0)

def test_after_poll_shift_checkpoints_already_changed(self):
# We can arrange for the view to be inconsistent by
# interjecting some code to change things.
from relstorage.tests.fakecache import data
data['myprefix:checkpoints'] = b'40 30'
c = self._makeOne()
c.polling_state.checkpoints = (40, 30)
c.delta_size_limit = 2
c.checkpoints = (40, 30)
orig_checkpoints = c.checkpoints = (40, 30)
c.current_tid = 40
shifted_checkpoints = []
old_suggest = c._suggest_shifted_checkpoints
def suggest():
data['myprefix:checkpoints'] = b'1 1'
shifted_checkpoints.append(old_suggest())
def suggest(cur):
c.polling_state.checkpoints = (1, 1)
shifted_checkpoints.append(old_suggest(cur))
return shifted_checkpoints[-1]
c._suggest_shifted_checkpoints = suggest

c.after_poll(None, 40, 314, [(1, 45), (2, 46)])
shifted_checkpoints = shifted_checkpoints[0]
self.assertEqual(c.checkpoints, (40, 30))
self.assertIsNone(shifted_checkpoints)
self.assertEqual(c.local_client.get_checkpoints(), shifted_checkpoints)
self.assertIs(shifted_checkpoints, orig_checkpoints)
self.assertEqual(c.polling_state.checkpoints, (1, 1))
self.assertEqual(dict(c.delta_after0), {1: 45, 2: 46})
self.assertEqual(dict(c.delta_after1), {})

def test_after_poll_shift_checkpoints_huge(self):
from relstorage.tests.fakecache import data
data['myprefix:checkpoints'] = b'40 30'
c = self._makeOne()

c.polling_state.checkpoints = (40, 30)
c.polling_state.current_tid = 40
c.delta_size_limit = 0
c.checkpoints = (40, 30)
c.current_tid = 40

c.after_poll(None, 40, 314, [(1, 45), (2, 46)])
expected_checkpoints = (314, 314)
self.assertEqual(c.checkpoints, (40, 30))
self.assertEqual(c.local_client.get_checkpoints(), expected_checkpoints)
self.assertEqual(dict(c.delta_after0), {1: 45, 2: 46})
self.assertEqual(dict(c.delta_after1), {})
self.assertEqual(c.polling_state.checkpoints, expected_checkpoints)
self.assertEqual(c.checkpoints, expected_checkpoints)

# self.assertEqual(dict(c.delta_after0), {1: 45, 2: 46})
# self.assertEqual(dict(c.delta_after1), {})
# Unlike in the past, updating the checkpoints had immediate effect
self.assertIsEmpty(c.delta_after0)
self.assertIsEmpty(c.delta_after1)
self.assertIsEmpty(c.polling_state.delta_after0)
self.assertIsEmpty(c.polling_state.delta_after1)

def __not_called(self):
self.fail("Should not be called")
Expand Down

0 comments on commit 635d9df

Please sign in to comment.