Skip to content

Commit

Permalink
Merge b3a941b into fcfc77d
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Jun 18, 2019
2 parents fcfc77d + b3a941b commit da83cd7
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 108 deletions.
8 changes: 7 additions & 1 deletion CHANGES.rst
Expand Up @@ -10,12 +10,18 @@

- Make MySQL and PostgreSQL use a prepared statement to get
transaction IDs. PostgreSQL also uses a prepared statement to set
them. This may be slightly faster. See :issue:`246`.
them. This can be slightly faster. See :issue:`246`.

- Make PostgreSQL use a prepared statement to move objects to their
final destination during commit (history free only). See
:issue:`246`.

- Fix an issue with persistent caches written to from multiple
instances sometimes getting stale data after a restart. Note: This
makes the persistent cache less useful for objects that rarely
change in a database that features other actively changing objects;
it is hoped this can be addressed in the future. See :issue:`249`.

3.0a1 (2019-06-12)
==================

Expand Down
17 changes: 14 additions & 3 deletions src/relstorage/_util.py
Expand Up @@ -51,18 +51,29 @@ def f(*args, **kwargs):
return result
return f

def _thread_spawn(func, args):
import threading
t = threading.Thread(target=func, args=args)
t.name = t.name + '-spawn-' + func.__name__
t.start()
return t

def _gevent_pool_spawn(func, args):
import gevent
return gevent.get_hub().threadpool.spawn(func, *args)

def spawn(func, args=()):
"""Execute func in a different (real) thread"""
import threading
submit = lambda func, args: threading.Thread(target=func, args=args).start()

submit = _thread_spawn
try:
import gevent.monkey
import gevent
except ImportError:
pass
else:
if gevent.monkey.is_module_patched('threading'):
submit = lambda func, args: gevent.get_hub().threadpool.spawn(func, *args)
submit = _gevent_pool_spawn
submit(func, args)

def get_this_psutil_process():
Expand Down
8 changes: 4 additions & 4 deletions src/relstorage/cache/local_client.py
Expand Up @@ -128,11 +128,11 @@ def _compress(self, data): # pylint:disable=method-hidden
return data

@_log_timed
def save(self, overwrite=False, close_async=True):
def save(self, **sqlite_args):
options = self.options
if options.cache_local_dir and self.__bucket.size:
conn = sqlite_connect(options, self.prefix,
overwrite=overwrite, close_async=close_async)
**sqlite_args)
with closing(conn):
try:
self.write_to_sqlite(conn)
Expand Down Expand Up @@ -305,11 +305,11 @@ def data():
# storage for the blob state.
#
# We make one call into sqlite and let it handle the iterating.
# Items are (oid, key_tid, state, actual_tid). Currently,
# key_tid == actual_tid
cur = db.fetch_rows_by_priority()
items = cur.fetchall()
cur.close()
# Items are (oid, key_tid, state, actual_tid)
items = db.fetch_rows_by_priority().fetchall()
# row_filter produces the sequence ((oid, key_tid) (state, actual_tid))
if row_filter is not None:
row_iter = row_filter(checkpoints, items)
Expand Down
61 changes: 32 additions & 29 deletions src/relstorage/cache/persistence.py
Expand Up @@ -96,10 +96,11 @@ class Connection(sqlite3.Connection):
)

def __init__(self, *args, **kwargs):
__traceback_info__ = args, kwargs
super(Connection, self).__init__(*args, **kwargs)

self.rs_db_filename = None
self.rs_close_async = True
self.rs_close_async = DEFAULT_CLOSE_ASYNC
self._rs_has_closed = False

def __repr__(self):
Expand All @@ -125,7 +126,7 @@ def close(self):
self._rs_has_closed = True
from relstorage._util import spawn as _spawn
spawn = _spawn if self.rs_close_async else lambda f: f()
def c():
def optimize_and_close():
# Recommended best practice is to OPTIMIZE the database for
# each closed connection. OPTIMIZE needs to run in each connection
# so it can see what tables and indexes were used. It's usually fast,
Expand All @@ -137,7 +138,7 @@ def c():
logger.exception("Failed to optimize database; was it removed?")

super(Connection, self).close()
spawn(c)
spawn(optimize_and_close)


# PRAGMA statements don't allow ? placeholders
Expand Down Expand Up @@ -173,7 +174,31 @@ def _execute_pragmas(cur, **kwargs):
else:
logger.debug("Using %s = %s", k, orig_value)

def _connect_to_file(fname, factory=Connection, close_async=True,
_MB = 1024 * 1024
DEFAULT_MAX_WAL = 10 * _MB
DEFAULT_CLOSE_ASYNC = False
# Benchmarking on at least one system doesn't show an improvement to
# either reading or writing by forcing a large mmap_size.
DEFAULT_MMAP_SIZE = None
# 4096 is the page size in current releases of sqlite; older versions
# used 1024. A larger page makes sense as we have biggish values.
# Going larger doesn't make much difference in benchmarks.
DEFAULT_PAGE_SIZE = 4096
# Control where temporary data is:
#
# FILE = a deleted disk file (that sqlite never flushes so
# theoretically just exists in the operating system's filesystem
# cache)
#
# MEMORY = explicitly in memory only
#
# DEFAULT = compile time default. Benchmarking for large writes
# doesn't show much difference between FILE and MEMORY, so don't
# bother to change from the default.
DEFAULT_TEMP_STORE = None

def _connect_to_file(fname, factory=Connection,
close_async=DEFAULT_CLOSE_ASYNC,
pragmas=None):

connection = sqlite3.connect(
Expand Down Expand Up @@ -204,9 +229,11 @@ def _connect_to_file(fname, factory=Connection, close_async=True,
# the database so that we can verify that it's not corrupt.
pragmas.setdefault('journal_mode', 'wal')
cur = connection.cursor()
__traceback_info__ = cur, pragmas
try:
_execute_pragmas(cur, **pragmas)
except:
logger.exception("Failed to execute pragmas")
cur.close()
if hasattr(connection, 'rs_close_async'):
connection.rs_close_async = False
Expand All @@ -217,30 +244,6 @@ def _connect_to_file(fname, factory=Connection, close_async=True,

return connection

_MB = 1024 * 1024
DEFAULT_MAX_WAL = 10 * _MB
DEFAULT_CLOSE_ASYNC = True
# Benchmarking on at least one system doesn't show an improvement to
# either reading or writing by forcing a large mmap_size.
DEFAULT_MMAP_SIZE = None
# 4096 is the page size in current releases of sqlite; older versions
# used 1024. A larger page makes sense as we have biggish values.
# Going larger doesn't make much difference in benchmarks.
DEFAULT_PAGE_SIZE = 4096
# Control where temporary data is:
#
# FILE = a deleted disk file (that sqlite never flushes so
# theoretically just exists in the operating system's filesystem
# cache)
#
# MEMORY = explicitly in memory only
#
# DEFAULT = compile time default. Benchmarking for large writes
# doesn't show much difference between FILE and MEMORY, so don't
# bother to change from the default.
DEFAULT_TEMP_STORE = None


def sqlite_connect(options, prefix,
overwrite=False,
max_wal_size=DEFAULT_MAX_WAL,
Expand All @@ -255,7 +258,7 @@ def sqlite_connect(options, prefix,
result in the connection being closed, only committed or rolled back.
"""
parent_dir = getattr(options, 'cache_local_dir', options)
# Allow for memory and temporary databases:
# Allow for memory and temporary databases (empty string):
if parent_dir != ':memory:' and parent_dir:
parent_dir = _normalize_path(options)
try:
Expand Down
53 changes: 39 additions & 14 deletions src/relstorage/cache/storage_cache.py
Expand Up @@ -81,7 +81,11 @@ class StorageCache(object):
# We assign to this *only* after executing a poll, or
# when reading data from the persistent cache (which happens at
# startup, and usually also when someone calls clear())
current_tid = 0
#
# Start with None so we can distinguish the case of never polled/
# no tid in persistent cache from a TID of 0, which can happen in
# tests.
current_tid = None

_tracer = None

Expand Down Expand Up @@ -221,6 +225,13 @@ def restore(self):
# No point keeping the delta maps otherwise,
# we have to poll. If there were no checkpoints, it means
# we saved without having ever completed a poll.
#
# We choose the cp0 as our beginning TID at which to
# resume polling. We have information on cached data as it
# relates to those checkpoints. (TODO: Are we sure that
# the delta maps we've just built are actually accurate
# as-of this particular TID we're choosing to poll from?)
#
self.current_tid = self.checkpoints[0]
self.delta_after0 = row_filter.delta_after0
self.delta_after1 = row_filter.delta_after1
Expand Down Expand Up @@ -255,8 +266,11 @@ def _reset(self, message=None):
method returns.
"""
# As if we've never polled
self.checkpoints = None
self.current_tid = 0
for name in ('checkpoints', 'current_tid'):
try:
delattr(self, name)
except AttributeError:
pass
self.delta_after0 = self._delta_map_type()
self.delta_after1 = self._delta_map_type()
if message:
Expand Down Expand Up @@ -419,7 +433,7 @@ def load(self, cursor, oid_int):
cache_data = cache[key]
if cache_data:
# Cache hit.
assert cache_data[1] == tid_int, cache_data[1]
assert cache_data[1] == tid_int, (cache_data[1], key)
return cache_data

# Cache miss.
Expand Down Expand Up @@ -544,7 +558,7 @@ def after_poll(self, cursor, prev_tid_int, new_tid_int, changes):
*prev_tid_int* can be None, in which case the changes
parameter will be ignored. new_tid_int can not be None.
"""
my_prev_tid_int = self.current_tid
my_prev_tid_int = self.current_tid or 0
self.current_tid = new_tid_int

global_checkpoints = self.cache.get_checkpoints()
Expand Down Expand Up @@ -757,25 +771,36 @@ def __call__(self, checkpoints, row_iter):
delta_after0 = self.delta_after0
delta_after1 = self.delta_after1
cp0, cp1 = checkpoints

for row in row_iter:
# We don't care about the extra suggested key tid
suggested_key = row[:2]
# Rows are (oid, tid, state, tid), where the two tids
# are always equal.
key = row[:2]
value = row[2:]
oid = suggested_key[0]
oid = key[0]
actual_tid = value[1]

if actual_tid >= cp0:
# They must match in both these cases
key = suggested_key
delta_after0[oid] = actual_tid
elif actual_tid >= cp1:
key = suggested_key
# XXX: This might not be right. We'll poll for
# changes after cp0 (because we set that as our
# current_tid/the storage's prev_polled_tid), but
# we won't poll for changes after cp1.
delta_after1[oid] = actual_tid
else:
# Old generation, no delta.
# Even though this is old, it could be good to have it,
# This is too old and outside our checkpoints for
# when something changed. It could be good to have it,
# it might be something that doesn't change much.
key = (oid, cp0)
# Unfortunately, we can't just stick it in our fallback
# keys (oid, cp0) or (oid, cp1), because it might not be current,
# and we definitely don't poll this far back.
#
# Our options are to put it in with its natural key and hope
# we later wind up constructing that key (how could we do that?)
# or drop it; since we don't know what key it was actively being
# found under before, dropping it is our best option. Sadly.
continue
yield key, value


Expand Down
9 changes: 5 additions & 4 deletions src/relstorage/cache/tests/test_storage_cache.py
Expand Up @@ -433,7 +433,7 @@ def test_after_poll_new_checkpoints_bad_changes_duplicates(self):
with self.assertRaisesRegex(CacheConsistencyError, "to have total len"):
c.after_poll(None, 40, 50, None)
self.assertIsNone(c.checkpoints)
self.assertEqual(0, c.current_tid)
self.assertIsNone(c.current_tid)

def test_after_poll_new_checkpoints_bad_changes_out_of_order(self):
from relstorage.tests.fakecache import data
Expand All @@ -450,7 +450,7 @@ def test_after_poll_new_checkpoints_bad_changes_out_of_order(self):
with self.assertRaisesRegex(CacheConsistencyError, "out of range"):
c.after_poll(None, 40, 50, None)
self.assertIsNone(c.checkpoints)
self.assertEqual(0, c.current_tid)
self.assertIsNone(c.current_tid)

# Too low
c.checkpoints = (40, 30)
Expand All @@ -459,7 +459,7 @@ def test_after_poll_new_checkpoints_bad_changes_out_of_order(self):
with self.assertRaisesRegex(CacheConsistencyError, "out of range"):
c.after_poll(None, 40, 50, None)
self.assertIsNone(c.checkpoints)
self.assertEqual(0, c.current_tid)
self.assertIsNone(c.current_tid)

def test_after_poll_new_checkpoints(self):
from relstorage.tests.fakecache import data
Expand Down Expand Up @@ -579,6 +579,8 @@ def test_deltas(self):

tid_after0 = 5000
tid_after1 = 4000
# The old_tid, outside the checkpoint range,
# will get dropped.
old_tid = 3999

rows = [
Expand All @@ -592,5 +594,4 @@ def test_deltas(self):
self.assertEqual(results, [
((1, tid_after0), (b'1', tid_after0)),
((2, tid_after1), (b'2', tid_after1)),
((3, tid_after0), (b'3', old_tid))
])
2 changes: 1 addition & 1 deletion src/relstorage/storage.py
Expand Up @@ -260,7 +260,7 @@ def __init__(self, adapter, name=None, create=None,

# Creating the storage cache may have loaded cache files, and if so,
# we have a previous tid state.
if self._cache.current_tid:
if self._cache.current_tid is not None:
self._prev_polled_tid = self._cache.current_tid


Expand Down

0 comments on commit da83cd7

Please sign in to comment.