Skip to content

Commit

Permalink
Merge 69d3900 into fbe3847
Browse files Browse the repository at this point in the history
  • Loading branch information
jamadden committed Aug 7, 2019
2 parents fbe3847 + 69d3900 commit 21e3395
Show file tree
Hide file tree
Showing 20 changed files with 221 additions and 83 deletions.
7 changes: 7 additions & 0 deletions CHANGES.rst
Expand Up @@ -63,6 +63,13 @@
release and then acquire the GIL while holding global locks. See
:issue:`304`.

- Make conflict resolution require fewer database round trips,
especially on PostgreSQL and MySQL, at the expense of using more
memory. In the ideal case it now only needs one (MySQL) or two
(PostgreSQL) queries. Previously it needed at least twice the number
of trips as there were conflicting objects. On both databases, the
benchmarks are 40% to 80% faster (depending on cache configuration).

3.0a6 (2019-07-29)
==================

Expand Down
27 changes: 16 additions & 11 deletions src/relstorage/adapters/interfaces.py
Expand Up @@ -179,10 +179,13 @@ def lock_objects_and_detect_conflicts(
*read_current_oids* (for read).
Returns an iterable of ``(oid_int, committed_tid_int,
tid_this_txn_saw_int)`` for all OIDs that were locked (that
is, the OIDs that we're modifying plus the OIDs in
*required_tids*). If the ``tid_this_txn_saw_int`` is None,
that was an object we only read, not modified.
tid_this_txn_saw_int, committed_state)`` for all OIDs that
were locked (that is, the OIDs that we're modifying plus the
OIDs in *required_tids*) and which had conflicts.
If the ``tid_this_txn_saw_int`` is None, that was an object we
only read, not modified. ``committed_state`` is allowed to be None if
there isn't an efficient way to query that in bulk from the database.
Implementations are encouraged to do all this work in as few
calls to the database as possible with a stored procedure. The
Expand Down Expand Up @@ -793,7 +796,7 @@ def store_temps(cursor, state_oid_tid_iter):
``oid_int`` values will be distinct. It is further guaranteed that
this method will not be called more than once in a given transaction;
further updates to the temporary table will be made using
``replace_temp``, one at a time.
``replace_temps``, which is also only called once.
"""

def restore(cursor, batcher, oid, tid, data, stmt_buf):
Expand All @@ -809,20 +812,22 @@ def detect_conflict(cursor):
Find all conflicts in the data about to be committed.
If there is a conflict, returns a sequence of
``(oid, committed_tid, attempted_committed_tid)``.
``(oid, committed_tid, attempted_committed_tid, committed_state)``.
This method should be called during the ``tpc_vote`` phase of a transaction,
with :meth:`ILocker.lock_current_objects` held.
"""

def replace_temp(cursor, oid, prev_tid, data):
def replace_temps(cursor, state_oid_tid_iter):
"""
Replace an object in the temporary table.
Replace all objects in the temporary table with new data from
*state_oid_tid_iter*.
This happens after conflict resolution.
This happens after conflict resolution. The param is as for
``store_temps``.
TODO: This method needs to go away and use the regular
row batcher, so we can take advantage of bulk optimizations.
Implementations should try to perform this in as few database operations
as possible.
"""

def move_from_temp(cursor, tid, txn_has_blobs):
Expand Down
12 changes: 10 additions & 2 deletions src/relstorage/adapters/mover.py
Expand Up @@ -313,7 +313,10 @@ def restore(self, cursor, batcher, oid, tid, data):
_detect_conflict_query = Schema.temp_store.natural_join(
Schema.all_current_object
).select(
Schema.temp_store.c.zoid, Schema.all_current_object.c.tid, Schema.temp_store.c.prev_tid
Schema.temp_store.c.zoid, Schema.all_current_object.c.tid, Schema.temp_store.c.prev_tid,
# We don't get the state here; no particularly good reason except that I need to write
# that into our query framework.
None
).where(
Schema.temp_store.c.prev_tid != Schema.all_current_object.c.tid
).order_by(
Expand All @@ -330,7 +333,6 @@ def detect_conflict(self, cursor):
rows = cursor.fetchall()
return rows

@metricmethod_sampled
def replace_temp(self, cursor, oid, prev_tid, data):
"""Replace an object in the temporary table.
Expand All @@ -347,6 +349,12 @@ def replace_temp(self, cursor, oid, prev_tid, data):
"""
cursor.execute(stmt, (prev_tid, md5sum, self.driver.Binary(data), oid))

@metricmethod_sampled
def replace_temps(self, cursor, state_oid_tid_iter):
for data, oid_int, tid_int in state_oid_tid_iter:
self.replace_temp(cursor, oid_int, tid_int, data)


# Subclasses may override any of these queries if there is a
# more optimal form.

Expand Down
6 changes: 5 additions & 1 deletion src/relstorage/adapters/mysql/connmanager.py
Expand Up @@ -72,7 +72,11 @@ def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
params = self._params

while True:
__traceback_info__ = params
__traceback_info__ = {
k: v if k != 'passwd' else '<*****>'
for k, v in params.items()
}

try:
conn = self._db_connect(**params)
cursor = self._db_driver.cursor(conn)
Expand Down
7 changes: 6 additions & 1 deletion src/relstorage/adapters/mysql/mover.py
Expand Up @@ -96,7 +96,6 @@ def on_store_opened(self, cursor, restart=False):
# more similar to what PostgreSQL uses, possibly allowing more
# query sharing (with a smarter query runner/interpreter).

@metricmethod_sampled
def store_temp(self, cursor, batcher, oid, prev_tid, data):
suffix = """
ON DUPLICATE KEY UPDATE
Expand All @@ -106,6 +105,12 @@ def store_temp(self, cursor, batcher, oid, prev_tid, data):
"""
self._generic_store_temp(batcher, oid, prev_tid, data, suffix=suffix)

@metricmethod_sampled
def replace_temps(self, cursor, state_oid_tid_iter):
# We can use the regular batcher and store_temps -> store_temp
# method because of our upsert query.
self.store_temps(cursor, state_oid_tid_iter)

@metricmethod_sampled
def restore(self, cursor, batcher, oid, tid, data):
"""Store an object directly, without conflict detection.
Expand Down
Expand Up @@ -58,15 +58,16 @@ BEGIN
-- readCurrent conflicts first so we don't waste time resolving
-- state conflicts if we are going to fail the transaction.

SELECT zoid, {CURRENT_OBJECT}.tid, NULL
SELECT zoid, {CURRENT_OBJECT}.tid, NULL, NULL
FROM {CURRENT_OBJECT}
INNER JOIN temp_read_current USING (zoid)
WHERE temp_read_current.tid <> {CURRENT_OBJECT}.tid
UNION ALL
SELECT zoid, tid, prev_tid
FROM {CURRENT_OBJECT}
SELECT cur.zoid, cur.tid, temp_store.prev_tid, {OBJECT_STATE_NAME}.state
FROM {CURRENT_OBJECT} cur
INNER JOIN temp_store USING (zoid)
WHERE temp_store.prev_tid <> {CURRENT_OBJECT}.tid;
{OBJECT_STATE_JOIN}
WHERE temp_store.prev_tid <> cur.tid;


END;
12 changes: 12 additions & 0 deletions src/relstorage/adapters/mysql/schema.py
Expand Up @@ -215,6 +215,16 @@ def create_procedures(self, cursor):
# an appropriate value for both of those installed here.
installed = self.list_procedures(cursor)
current_object = 'current_object' if self.keep_history else 'object_state'
if self.keep_history:
object_state_join = """
INNER JOIN object_state ON (object_state.zoid = cur.zoid
AND object_state.tid = cur.tid)
"""
object_state_name = 'object_state'
else:
object_state_join = ""
object_state_name = 'cur'

major_version = self.version_detector.get_major_version(cursor)
if self.version_detector.supports_nowait(cursor):
set_lock_timeout = ''
Expand All @@ -232,6 +242,8 @@ def create_procedures(self, cursor):
create_stmt = create_stmt.format(
CHECKSUM=checksum,
CURRENT_OBJECT=current_object,
OBJECT_STATE_NAME=object_state_name,
OBJECT_STATE_JOIN=object_state_join,
SET_LOCK_TIMEOUT=set_lock_timeout,
FOR_SHARE=for_share
)
Expand Down
46 changes: 38 additions & 8 deletions src/relstorage/adapters/postgresql/mover.py
Expand Up @@ -66,15 +66,19 @@ def on_store_opened(self, cursor, restart=False):
# DELETE ROWS and not DROP TABLE, but that didn't seem to be true (it's possible
# an ANALYZE would still be helpful before using the temp table, but we
# haven't benchmarked that).
ddl_stmts = [
"""
CREATE TEMPORARY TABLE IF NOT EXISTS temp_store (

temp_store_table_tmpl = """
CREATE TEMPORARY TABLE IF NOT EXISTS {NAME} (
zoid BIGINT NOT NULL PRIMARY KEY,
prev_tid BIGINT NOT NULL,
md5 CHAR(32),
state BYTEA
) ON COMMIT DELETE ROWS;
""",
"""

ddl_stmts = [
temp_store_table_tmpl.format(NAME='temp_store'),
temp_store_table_tmpl.format(NAME='temp_store_replacements'),
"""
CREATE TEMPORARY TABLE IF NOT EXISTS temp_blob_chunk (
zoid BIGINT NOT NULL,
Expand Down Expand Up @@ -248,16 +252,41 @@ def upload_blob(self, cursor, oid, tid, filename):
params['tid'] = tid
cursor.execute(insert_stmt, params)

def store_temps(self, cursor, state_oid_tid_iter):

def _do_store_temps(self, cursor, state_oid_tid_iter, table_name):
# History-preserving storages need the md5 to compare states.
# We could calculate that on the server using pgcrypto, if its
# available. Or we could just compare directly, instead of comparing
# md5; that's fast on PostgreSQL.
if state_oid_tid_iter:
buf = TempStoreCopyBuffer(state_oid_tid_iter,
buf = TempStoreCopyBuffer(table_name,
state_oid_tid_iter,
self._compute_md5sum if self.keep_history else None)
cursor.copy_expert(buf.COPY_COMMAND, buf)

@metricmethod_sampled
def _store_temps(self, cursor, state_oid_tid_iter):
self._do_store_temps(cursor, state_oid_tid_iter, 'temp_store')

@metricmethod_sampled
def replace_temps(self, cursor, state_oid_tid_iter):
# Upload and then replace. We *could* go right into the table
# if we first deleted but that would require either iterating twice
# and/or bufferring all the state data in memory. If it's small that's ok,
# but it could be large.
self._do_store_temps(cursor, state_oid_tid_iter, 'temp_store_replacements')
# TODO: Prepare this query.
cursor.execute(
"""
UPDATE temp_store
SET prev_tid = r.prev_tid,
md5 = r.md5,
state = r.state
FROM temp_store_replacements r
WHERE temp_store.zoid = r.zoid
"""
)


class TempStoreCopyBuffer(io.BufferedIOBase):
"""
Expand All @@ -267,10 +296,11 @@ class TempStoreCopyBuffer(io.BufferedIOBase):

# pg8000 uses readinto(); psycopg2 uses read().

COPY_COMMAND = "COPY temp_store (zoid, prev_tid, md5, state) FROM STDIN WITH (FORMAT binary)"
COPY_COMMAND_TMPL = "COPY {NAME} (zoid, prev_tid, md5, state) FROM STDIN WITH (FORMAT binary)"

def __init__(self, state_oid_tid_iterable, digester):
def __init__(self, table, state_oid_tid_iterable, digester):
super(TempStoreCopyBuffer, self).__init__()
self.COPY_COMMAND = self.COPY_COMMAND_TMPL.format(NAME=table)
self.state_oid_tid_iterable = state_oid_tid_iterable
self._iter = iter(state_oid_tid_iterable)
self._digester = digester
Expand Down
Expand Up @@ -2,7 +2,7 @@ CREATE OR REPLACE FUNCTION lock_objects_and_detect_conflicts(
read_current_oids BIGINT[],
read_current_tids BIGINT[]
)
RETURNS TABLE(zoid BIGINT, tid BIGINT, prev_tid BIGINT)
RETURNS TABLE(zoid BIGINT, tid BIGINT, prev_tid BIGINT, committed_state BYTEA)
AS
$$
BEGIN
Expand Down Expand Up @@ -47,17 +47,18 @@ BEGIN
ORDER BY zoid
FOR SHARE NOWAIT
)
SELECT locked.zoid, locked.tid, NULL::BIGINT
SELECT locked.zoid, locked.tid, NULL::BIGINT, NULL::BYTEA
FROM locked WHERE locked.tid <> locked.desired;
END IF;



RETURN QUERY
SELECT {CURRENT_OBJECT}.zoid, {CURRENT_OBJECT}.tid, temp_store.prev_tid
FROM {CURRENT_OBJECT}
SELECT cur.zoid, cur.tid,
temp_store.prev_tid, {OBJECT_STATE_NAME}.state
FROM {CURRENT_OBJECT} cur
INNER JOIN temp_store USING (zoid)
WHERE temp_store.prev_tid <> {CURRENT_OBJECT}.tid;
{OBJECT_STATE_JOIN}
WHERE temp_store.prev_tid <> cur.tid;

RETURN;

Expand Down
18 changes: 16 additions & 2 deletions src/relstorage/adapters/postgresql/schema.py
Expand Up @@ -92,13 +92,24 @@ def _read_proc_files(self):
# Convert from bare strings into _StoredFunction objects
# (which are missing their signatures at this point).
current_object = 'current_object' if self.keep_history else 'object_state'
if self.keep_history:
object_state_join = """
INNER JOIN object_state ON (object_state.zoid = cur.zoid
AND object_state.tid = cur.tid)
"""
object_state_name = 'object_state'
else:
object_state_join = ""
object_state_name = 'cur'
return {
name: _StoredFunction(
name,
None,
self._checksum_for_str(value),
value.format(
CURRENT_OBJECT=current_object
CURRENT_OBJECT=current_object,
OBJECT_STATE_JOIN=object_state_join,
OBJECT_STATE_NAME=object_state_name
)
)
for name, value
Expand Down Expand Up @@ -247,7 +258,8 @@ def __install_procedures(self, cursor):
__traceback_info__ = proc_name, self.keep_history
proc_source = stored_func.create
# All definitions should be written with 'CREATE OR REPLACE'
# so we don't need to bother with 'DROP'
# so we don't need to bother with 'DROP'. Though, if the return
# type changes, we can't REPLACE.
cursor.execute(proc_source)

# Update checksums
Expand Down Expand Up @@ -315,6 +327,8 @@ def _create_new_oid(self, cursor):
WHERE keep = true;
"""

DROP_TABLE_TMPL = 'DROP TABLE IF EXISTS {table}'

def _reset_oid(self, cursor):
stmt = "ALTER SEQUENCE zoid_seq RESTART WITH 1;"
self.runner.run_script(cursor, stmt)
Expand Down
3 changes: 2 additions & 1 deletion src/relstorage/adapters/schema.py
Expand Up @@ -764,6 +764,7 @@ def _after_zap_all_tables(self, cursor, slow=False):
self._init_after_create(cursor)
log.debug("Done running init script.")

DROP_TABLE_TMPL = 'DROP TABLE {table}'

def drop_all(self):
"""Drop all tables and sequences."""
Expand All @@ -773,7 +774,7 @@ def drop_all(_conn, cursor):
todo.reverse()
for table in todo:
if table in existent:
cursor.execute("DROP TABLE %s" % table)
cursor.execute(self.DROP_TABLE_TMPL.format(table=table))
for sequence in self.list_sequences(cursor):
cursor.execute("DROP SEQUENCE %s" % sequence)
self.connmanager.open_and_call(drop_all)
11 changes: 11 additions & 0 deletions src/relstorage/adapters/sql/ast.py
Expand Up @@ -27,6 +27,15 @@ def __compile_visit__(self, compiler):
def resolve_against(self, table):
return self

class NullNode(LiteralNode):
__slots__ = ()

def __init__(self):
super(NullNode, self).__init__(None)

def __compile_visit__(self, compiler):
compiler.emit_null()

class BooleanNode(LiteralNode):

__slots__ = ()
Expand All @@ -35,6 +44,8 @@ class TextNode(LiteralNode):
__slots__ = ()

def as_node(c):
if c is None:
return NullNode()
if isinstance(c, bool):
return BooleanNode(c)
if isinstance(c, int):
Expand Down
3 changes: 3 additions & 0 deletions src/relstorage/adapters/sql/dialect.py
Expand Up @@ -229,6 +229,9 @@ def emit(self, *contents):
for content in contents:
self.buf.write(content)

def emit_null(self):
self.emit('NULL')

def emit_w_padding_space(self, value):
ended_in_space = self.buf.getvalue().endswith(' ')
value = value.strip()
Expand Down

0 comments on commit 21e3395

Please sign in to comment.