Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UPSERT many functionality #4644

Merged
merged 14 commits into from Feb 20, 2019
Copy path View file
@@ -13,7 +13,7 @@ cache:
- $HOME/.cache/pip/wheels

addons:
postgresql: "9.4"
postgresql: "9.5"
This conversation was marked as resolved by hawkowl

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 18, 2019

Member

what's the logic here, ooi? Given we have to pick one version of postgres to test against, it feels like we should be testing against the oldest one we support.

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 18, 2019

Member

oh I see, it's because postgres 9.4 doesn't support upsert. hrm, we need to figure out what to do here

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 18, 2019

Member

As I just wrote in #synapse-dev:

I am sorely tempted to say that we should do both [9.4 and 9.5], and the solution to the pain that it will cause is to either do our CI better or drop support for 9.4

This comment has been minimized.

Copy link
@hawkowl

hawkowl Feb 19, 2019

Author Contributor

The reason why I want 9.5 here is that we need to test the native upsert path, which can use the postgres-specific batch code.

We should do our CI better and support more postgreses, but if we have to pick one, 9.5 is the one that covers the most code.


# don't clone the whole repo history, one commit will do
git:
@@ -71,7 +71,7 @@ matrix:

install:
- pip install tox

# if we don't have python3.6 in this environment, travis unhelpfully gives us
# a `python3.6` on our path which does nothing but spit out a warning. Tox
# tries to run it (even if we're not running a py36 env), so the build logs
Copy path View file
@@ -0,0 +1 @@
Introduce upsert batching functionality in the database layer.
Copy path View file
@@ -100,6 +100,14 @@ def __setattr__(self, name, value):
def __iter__(self):
return self.txn.__iter__()

def execute_batch(self, sql, args):
if isinstance(self.database_engine, PostgresEngine):
from psycopg2.extras import execute_batch
self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args)
else:
for val in args:
self.execute(sql, val)

def execute(self, sql, *args):
self._do_execute(self.txn.execute, sql, *args)

@@ -693,18 +701,30 @@ def _getwhere(key):
else:
return "%s = ?" % (key,)

# First try to update.
sql = "UPDATE %s SET %s WHERE %s" % (
table,
", ".join("%s = ?" % (k,) for k in values),
" AND ".join(_getwhere(k) for k in keyvalues)
)
sqlargs = list(values.values()) + list(keyvalues.values())
if not values:
# Try and select.
This conversation was marked as resolved by hawkowl

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 18, 2019

Member

This comment really isn't doing anything to help me. How about something like:

If values is empty, then all of the values we care about are in the unique key, so the UPDATE would be a no-op. We can just do a SELECT instead.

sql = "SELECT 1 FROM %s WHERE %s" % (
table,
" AND ".join(_getwhere(k) for k in keyvalues)
)
sqlargs = list(keyvalues.values())
txn.execute(sql, sqlargs)
if txn.fetchall():
# We have an existing record.
return False
else:
# First try to update.
sql = "UPDATE %s SET %s WHERE %s" % (
table,
", ".join("%s = ?" % (k,) for k in values),
" AND ".join(_getwhere(k) for k in keyvalues)
)
sqlargs = list(values.values()) + list(keyvalues.values())

txn.execute(sql, sqlargs)
if txn.rowcount > 0:
# successfully updated at least one row.
return False
txn.execute(sql, sqlargs)
if txn.rowcount > 0:
# successfully updated at least one row.
return False

# We didn't update any rows so insert a new one
This conversation was marked as resolved by hawkowl

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 18, 2019

Member

s/update any rows/find an existing row/

allvalues = {}
@@ -753,6 +773,101 @@ def _simple_upsert_txn_native_upsert(
)
txn.execute(sql, list(allvalues.values()))

def _simple_upsert_many_txn(
self, txn, table, key_names, key_values, value_names, value_values
):
"""
Upsert, many times.
Args:
table (str): The table to upsert into
key_names (list[str]): The key column names.
key_values (list[list]): A list of each row's key column values.
value_names (list[str]): The value column names.
value_values (list[list]): A list of each row's value column values.
Returns:
None
"""
if (
self.database_engine.can_native_upsert
and table not in self._unsafe_to_upsert_tables
):
return self._simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)
else:
return self._simple_upsert_many_txn_emulated(
txn, table, key_names, key_values, value_names, value_values
)

def _simple_upsert_many_txn_emulated(
self, txn, table, key_names, key_values, value_names, value_values
):
"""
Upsert, many times, but without native UPSERT support or batching.
Args:
table (str): The table to upsert into
key_names (list[str]): The key column names.
key_values (list[list]): A list of each row's key column values.
value_names (list[str]): The value column names.
value_values (list[list]): A list of each row's value column values.
Returns:
None
"""
if not value_values:
This conversation was marked as resolved by hawkowl

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 18, 2019

Member

if an empty/absent value_values is special, can you say so in the docstring?

value_values = [() for x in range(len(key_values))]

for keyv, valv in zip(key_values, value_values):
_keys = {x: y for x, y in zip(key_names, keyv)}
_vals = {x: y for x, y in zip(value_names, valv)}

self._simple_upsert_txn_emulated(txn, table, _keys, _vals)

def _simple_upsert_many_txn_native_upsert(
self, txn, table, key_names, key_values, value_names, value_values
):
"""
Upsert, many times, using batching where possible.
Args:
table (str): The table to upsert into
key_names (list[str]): The key column names.
key_values (list[list]): A list of each row's key column values.
value_names (list[str]): The value column names.
value_values (list[list]): A list of each row's value column values.
Returns:
None
"""
allvalues = []
This conversation was marked as resolved by hawkowl

This comment has been minimized.

Copy link
@richvdh

richvdh Feb 18, 2019

Member

should this not be called allnames ?

allvalues.extend(key_names)
allvalues.extend(value_names)

if not value_values:
value_values = [[] for x in range(len(key_values))]

if not value_names:
latter = "NOTHING"
else:
latter = (
"UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in value_names)
)

sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues),
", ".join(key_names),
latter,
)

args = []

for x, y in zip(key_values, value_values):
args.append(tuple(x) + tuple(y))

return txn.execute_batch(sql, args)

def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to
Copy path View file
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -314,3 +315,90 @@ def func2(self, key, cache_context):

self.assertEquals(callcount[0], 2)
self.assertEquals(callcount2[0], 3)


class UpsertManyTests(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.storage = hs.get_datastore()

self.table_name = "table_" + hs.get_secrets().token_hex(6)
self.get_success(
self.storage.runInteraction(
"create",
lambda x, *a: x.execute(*a),
"CREATE TABLE %s (id INTEGER, username TEXT, value TEXT)"
% (self.table_name,),
)
)
self.get_success(
self.storage.runInteraction(
"index",
lambda x, *a: x.execute(*a),
"CREATE UNIQUE INDEX %sindex ON %s(id, username)"
% (self.table_name, self.table_name),
)
)

def _dump_to_tuple(self, res):
for i in res:
yield (i["id"], i["username"], i["value"])

def test_upsert_many(self):
"""
Upsert_many will perform the upsert operation across a batch of data.
"""
# Add some data to an empty table
key_names = ["id", "username"]
value_names = ["value"]
key_values = [[1, "user1"], [2, "user2"]]
value_values = [["hello"], ["there"]]

self.get_success(
self.storage.runInteraction(
"test",
self.storage._simple_upsert_many_txn,
self.table_name,
key_names,
key_values,
value_names,
value_values,
)
)

# Check results are what we expect
res = self.get_success(
self.storage._simple_select_list(
self.table_name, None, ["id, username, value"]
)
)
self.assertEqual(
set(self._dump_to_tuple(res)),
set([(1, "user1", "hello"), (2, "user2", "there")]),
)

# Update only user2
key_values = [[2, "user2"]]
value_values = [["bleb"]]

self.get_success(
self.storage.runInteraction(
"test",
self.storage._simple_upsert_many_txn,
self.table_name,
key_names,
key_values,
value_names,
value_values,
)
)

# Check results are what we expect
res = self.get_success(
self.storage._simple_select_list(
self.table_name, None, ["id, username, value"]
)
)
self.assertEqual(
set(self._dump_to_tuple(res)),
set([(1, "user1", "hello"), (2, "user2", "bleb")]),
)
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.