Skip to content

Commit

Permalink
SQLite fixes: ressting OIDS on zap, large prefetches, and make copyin…
Browse files Browse the repository at this point in the history
…g transactions faster by avoiding an extra DELETE.
  • Loading branch information
jamadden committed Oct 23, 2019
1 parent 47683d0 commit bdd04aa
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 140 deletions.
7 changes: 6 additions & 1 deletion CHANGES.rst
Expand Up @@ -5,8 +5,13 @@
3.0b2 (unreleased)
==================

- Nothing changed yet.
- SQLite: Fix resetting OIDs when zapping a storage. This could be a
problem for benchmarks.

- SQLite: Fix large prefetches resulting in ``OperationalError``

- SQLite: Improve the speed of copying transactions into a SQLite
storage (e.g., with zodbconvert).

3.0b1 (2019-10-22)
==================
Expand Down
83 changes: 63 additions & 20 deletions src/relstorage/adapters/batch.py
Expand Up @@ -34,7 +34,12 @@ class RowBatcher(object):
can be set in the ``delete_placeholder`` attribute.
"""

# How many total rows can be sent at once. Also used as
# ``bind_limit`` if that is 0 or None.
row_limit = 1024
# The total number of available bind variables a single statement
# can use.
bind_limit = None
# The default max_allowed_packet in MySQL is 4MB,
# so the data, including encoding and the rest of the query structure,
# must be less than that (unless we dynamically query to find out
Expand All @@ -48,14 +53,17 @@ class RowBatcher(object):

def __init__(self, cursor, row_limit=None,
delete_placeholder=None,
insert_placeholder=None):
insert_placeholder=None,
bind_limit=None):
self.cursor = cursor
if delete_placeholder is not None:
self.delete_placeholder = delete_placeholder
if insert_placeholder is not None:
self.insert_placeholder = insert_placeholder
if row_limit is not None:
self.row_limit = row_limit
if bind_limit is not None:
self.bind_limit = bind_limit

# These are cumulative
self.total_rows_inserted = 0
Expand All @@ -65,6 +73,7 @@ def __init__(self, cursor, row_limit=None,
# These all get reset at each flush()
self.rows_added = 0
self.size_added = 0
self.bind_params_added = 0

self.deletes = defaultdict(set) # {(table, columns_tuple): set([(column_value,)])}
self.inserts = defaultdict(dict) # {(command, header, row_schema, suffix): {rowkey: [row]}}
Expand All @@ -78,29 +87,50 @@ def __repr__(self):
self.rows_added,
)

def _flush_if_needed(self):
if self.rows_added >= self.row_limit:
return self.flush()
if self.bind_limit and self.bind_params_added >= self.bind_limit:
return self.flush()
if self.size_added >= self.size_limit:
return self.flush()

def _flush_if_would_exceed_bind(self, addition):
# The bind limit is a hard limit we cannot exceed.
# If adding *addition* params would cause us to exceed,
# flush now.
if self.bind_limit and self.bind_params_added + addition >= self.bind_limit:
self.flush()
return True

def delete_from(self, table, **kw):
if not kw:
raise AssertionError("Need at least one column value")
columns = tuple(sorted(kw))
key = (table, columns)
rows = self.deletes[key]
row = tuple(kw[column] for column in columns)
rows.add(row)
bind_params_added = len(row) if key not in self.deletes[key] else 0
self._flush_if_would_exceed_bind(bind_params_added)

self.deletes[key].add(row)
self.rows_added += 1
if self.rows_added >= self.row_limit:
self.flush()
self.bind_params_added += bind_params_added
self._flush_if_needed()

def insert_into(self, header, row_schema, row, rowkey, size,
command='INSERT', suffix=''):
key = (command, header, row_schema, suffix)
rows = self.inserts[key]
rows[rowkey] = row # note that this may replace a row

bind_params_added = len(row) if rowkey not in self.inserts[key] else 0
self._flush_if_would_exceed_bind(bind_params_added)

# If we flushed, self.inserts has started all over.
self.inserts[key][rowkey] = row # note that this may replace a row

self.rows_added += 1
self.bind_params_added += bind_params_added
self.size_added += size

if (self.rows_added >= self.row_limit
or self.size_added >= self.size_limit):
self.flush()
self._flush_if_needed()

def row_schema_of_length(self, param_count):
# Use as the *row_schema* parameter to insert_into
Expand All @@ -115,21 +145,33 @@ def row_schema_of_length(self, param_count):

def select_from(self, columns, table, suffix='', **kw):
"""
Handles a query of the ``WHERE col IN (?, ?,)`` type.
Handles a query of the ``WHERE col IN (?, ?,)`` type::
``SELECT columns FROM table WHERE col IN (?, ...)``
The keyword arguments should be of length 1, containing
an iterable of the values to check: ``col=(1, 2)`` or
in the dynamic case ``**{indirect_var: [1, 2]}``.
The keyword arguments should be of length 1, containing an
iterable of the values to check: ``col=(1, 2)`` or in the
dynamic case ``**{indirect_var: [1, 2]}``.
The number of batches needed is determined by the length of
the iterator divided by this object's ``bind_limit``,
or, if that's not set, by the ``row_limit``.
Returns a iterator of matching rows.
"""
assert len(kw) == 1
filter_column, filter_values = kw.popitem()
filter_values = list(filter_values)
filter_values = iter(filter_values)

command = 'SELECT %s' % (','.join(columns),)
while filter_values:
filter_subset = filter_values[:self.row_limit]
del filter_values[:self.row_limit]

chunk_size = self.bind_limit or self.row_limit
chunk_size -= 1

for head in filter_values:
filter_subset = list(itertools.islice(filter_values, chunk_size))
filter_subset.append(head)

descriptor = [[(table, (filter_column,)), filter_subset]]
self._do_batch(command, descriptor, rows_need_flattened=False, suffix=suffix)
for row in self.cursor.fetchall():
Expand All @@ -143,8 +185,10 @@ def flush(self):
self.total_rows_inserted += self._do_inserts()
self.inserts.clear()
self.total_size_inserted += self.size_added

self.rows_added = 0
self.size_added = 0
self.bind_params_added = 0

def _do_deletes(self):
return self._do_batch('DELETE', sorted(iteritems(self.deletes)))
Expand Down Expand Up @@ -176,7 +220,6 @@ def _do_batch(self, command, descriptors, rows_need_flattened=True, suffix=''):
if these_params_need_flattened:
params = self._flatten_params(params)
stmt += suffix
__traceback_info__ = stmt, params
self.cursor.execute(stmt, params)

return count
Expand Down
42 changes: 23 additions & 19 deletions src/relstorage/adapters/mover.py
Expand Up @@ -270,13 +270,17 @@ def replace_temps(self, cursor, state_oid_tid_iter):

@metricmethod_sampled
def _generic_restore(self, batcher, oid, tid, data,
command='INSERT', suffix=''):
"""Store an object directly, without conflict detection.
command, suffix):
"""
Store an object directly, without conflict detection.
Used for copying transactions into this database.
"""
md5sum = self._compute_md5sum(data)
Either the *command* or the *suffix* must be capable of
handling conflicts in a single query. For example,
``command='INSERT OR REPLACE'``
or ``command='INSERT', suffix='ON CONFLICT (zoid) DO...``
"""
if data is not None:
encoded = self.driver.Binary(data)
size = len(data)
Expand All @@ -285,8 +289,8 @@ def _generic_restore(self, batcher, oid, tid, data,
size = 0

if self.keep_history:
if command == 'INSERT' and not suffix:
batcher.delete_from("object_state", zoid=oid, tid=tid)
# We can record deletion/un-creation via a null state.
md5sum = self._compute_md5sum(data)
row_schema = """
%s, %s,
COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
Expand All @@ -301,20 +305,20 @@ def _generic_restore(self, batcher, oid, tid, data,
command=command,
suffix=suffix
)
elif data:
if command == 'INSERT' and not suffix:
batcher.delete_from('object_state', zoid=oid)
batcher.insert_into(
"object_state (zoid, tid, state_size, state)",
"%s, %s, %s, %s",
(oid, tid, size, encoded),
rowkey=oid,
size=size,
command=command,
suffix=suffix
)
else:
batcher.delete_from('object_state', zoid=oid)
# history free can only delete the entire record.
if data:
batcher.insert_into(
"object_state (zoid, tid, state_size, state)",
"%s, %s, %s, %s",
(oid, tid, size, encoded),
rowkey=oid,
size=size,
command=command,
suffix=suffix
)
else:
batcher.delete_from('object_state', zoid=oid)

def restore(self, cursor, batcher, oid, tid, data):
raise NotImplementedError()
Expand Down
3 changes: 2 additions & 1 deletion src/relstorage/adapters/mysql/mover.py
Expand Up @@ -111,7 +111,8 @@ def restore(self, cursor, batcher, oid, tid, data):
state_size = VALUES(state_size),
state = VALUES(state)
"""
self._generic_restore(batcher, oid, tid, data, suffix=suffix)
self._generic_restore(batcher, oid, tid, data,
command='INSERT', suffix=suffix)

# Override this query from the superclass. The MySQL optimizer, up
# through at least 5.7.17 doesn't like actual subqueries in a DELETE
Expand Down
3 changes: 2 additions & 1 deletion src/relstorage/adapters/postgresql/mover.py
Expand Up @@ -106,7 +106,8 @@ def restore(self, cursor, batcher, oid, tid, data):
state_size = excluded.state_size,
state = excluded.state
"""
self._generic_restore(batcher, oid, tid, data, suffix=suffix)
self._generic_restore(batcher, oid, tid, data,
command='INSERT', suffix=suffix)

@metricmethod_sampled
def download_blob(self, cursor, oid, tid, filename):
Expand Down
4 changes: 2 additions & 2 deletions src/relstorage/adapters/sqlite/batch.py
Expand Up @@ -20,13 +20,13 @@
from ..batch import RowBatcher

class Sqlite3RowBatcher(RowBatcher):
# The batch size depends on how many params a stored proc can
# The batch size depends on how many params a statement can
# have; if we go too big we get OperationalError: too many SQL
# variables. The default allowed is 999.
# Note that the multiple-value syntax was added in
# 3.7.11, 2012-03-20.

row_limit = 998
bind_limit = 998

# sqlite only supports ? as a param.
delete_placeholder = '?'
Expand Down
20 changes: 2 additions & 18 deletions src/relstorage/adapters/sqlite/mover.py
Expand Up @@ -67,21 +67,5 @@ def on_store_opened(self, cursor, restart=False):

@metricmethod_sampled
def restore(self, cursor, batcher, oid, tid, data):
# TODO: Use the update syntax when available.
# if self.keep_history:
# suffix = """
# ON DUPLICATE KEY UPDATE
# tid = VALUES(tid),
# prev_tid = VALUES(prev_tid),
# md5 = VALUES(md5),
# state_size = VALUES(state_size),
# state = VALUES(state)
# """
# else:
# suffix = """
# ON DUPLICATE KEY UPDATE
# tid = VALUES(tid),
# state_size = VALUES(state_size),
# state = VALUES(state)
# """
self._generic_restore(batcher, oid, tid, data, suffix='')
self._generic_restore(batcher, oid, tid, data,
command='INSERT OR REPLACE', suffix='')
2 changes: 1 addition & 1 deletion src/relstorage/adapters/sqlite/schema.py
Expand Up @@ -71,7 +71,7 @@ def _create_pack_lock(self, cursor):

def _reset_oid(self, cursor):
from .oidallocator import Sqlite3OIDAllocator
with contextlib.closing(Sqlite3OIDAllocator(self.driver, self.connmanager)) as oids:
with contextlib.closing(Sqlite3OIDAllocator(self.driver, self.oid_connmanager)) as oids:
oids.reset_oid(cursor)

def drop_all(self):
Expand Down

0 comments on commit bdd04aa

Please sign in to comment.