From 0bb2b29660ceceb24f5fba03d9992255b55d40ba Mon Sep 17 00:00:00 2001 From: Daniil Timizhev Date: Wed, 28 May 2025 20:16:55 +0300 Subject: [PATCH 1/4] Impl compatibility tests --- .../compatibility/test_batch_operations.py | 172 ++++++++++++++++++ ydb/tests/compatibility/ya.make | 2 + ydb/tests/library/harness/kikimr_config.py | 4 + 3 files changed, 178 insertions(+) create mode 100644 ydb/tests/compatibility/test_batch_operations.py diff --git a/ydb/tests/compatibility/test_batch_operations.py b/ydb/tests/compatibility/test_batch_operations.py new file mode 100644 index 000000000000..8b1ca1c71f5d --- /dev/null +++ b/ydb/tests/compatibility/test_batch_operations.py @@ -0,0 +1,172 @@ +# -*- coding: utf-8 -*- +import pytest +import itertools + +from ydb.tests.library.compatibility.fixtures import RollingUpgradeAndDowngradeFixture +from ydb.tests.oss.ydb_sdk_import import ydb + + +class TestBatchOperations(RollingUpgradeAndDowngradeFixture): + table_name = "batch_table" + pk_1 = list(range(0, 10)) + pk_2 = ["NULL"] + list(range(0, 10)) + groups_cnt = 3 + + @pytest.fixture(autouse=True, scope="function") + def setup(self): + yield from self.setup_cluster() + + def test_batch_update(self): + self._fill_table() + + value = 0 + for _ in self.roll(): + self._assert_batch( + self._q_batch_update(set=[f"v1 = {value}", f"v2 = \"Value{value}\""], where_and=[], where_or=[]), + self._q_select(where_and=[], where_or=[f"v1 != {value}", f"v2 != \"Value{value}\""]) + ) + + value += 1 + + self._assert_batch( + self._q_batch_update(set=[f"v1 = {value}", f"v2 = \"Value{value}\""], where_and=[f"id = {value % self.groups_cnt}"], where_or=[]), + self._q_select(where_and=[f"id = {value % self.groups_cnt}"], where_or=[f"v1 != {value}", f"v2 != \"Value{value}\""]) + ) + + value += 1 + + self._assert_batch( + self._q_batch_update(set=[f"v1 = {value}", f"v2 = \"Value{value}\""], where_and=[], where_or=[f"id = {value % self.groups_cnt}", "k1 % 2 = 0"]), + self._q_select(where_and=[f"id = {value % self.groups_cnt} OR k1 % 2 = 0"], where_or=[f"v1 != {value}", f"v2 != \"Value{value}\""]) + ) + + value += 1 + + self._assert_batch( + self._q_batch_update(set=[f"v1 = {value}", f"v2 = \"Value{value}\""], where_and=[f"id = {value % self.groups_cnt}", f"k2 IS NOT NULL AND k2 <= {value % 5}"], where_or=[]), + self._q_select(where_and=[f"id = {value % self.groups_cnt}", f"k2 IS NOT NULL AND k2 <= {value % 5}"], where_or=[f"v1 != {value}", f"v2 != \"Value{value}\""]) + ) + + value += 1 + + def test_batch_delete(self): + self._fill_table() + + value = 0 + for _ in self.roll(): + self._assert_batch( + self._q_batch_delete(where_and=[f"id = {value % self.groups_cnt}"], where_or=[]), + self._q_select(where_and=[f"id = {value % self.groups_cnt}"], where_or=[]) + ) + + value += 1 + + self._assert_batch( + self._q_batch_delete(where_and=[f"id = {value % self.groups_cnt}", f"k2 IS NULL OR k2 >= {value % 5}"], where_or=[]), + self._q_select(where_and=[f"id = {value % self.groups_cnt}", f"k2 IS NULL OR k2 >= {value % 5}"], where_or=[]) + ) + + value += 1 + + self._assert_batch( + self._q_batch_delete(where_and=[], where_or=[f"id = {value % self.groups_cnt}", "k1 % 2 = 0"]), + self._q_select(where_and=[f"id = {value % self.groups_cnt} OR k1 % 2 = 0"], where_or=[]) + ) + + value += 1 + + self._assert_batch( + self._q_batch_delete(where_and=[]), + self._q_select(where_and=[], where_or=[]) + ) + + value += 1 + self._fill_table(False) + + + # Create and fill the table + def _fill_table(self, create=True): + rows = [] + id_value = 0 + + for k1, k2 in itertools.product(self.pk_1, self.pk_2): + v1 = k1 + k2 if k2 != "NULL" else k1 + v2 = f"\"Value{str(k1) + str(k2)}\"" + rows.append([k1, k2, v1, v2, id_value % self.groups_cnt]) + id_value += 1 + + with ydb.QuerySessionPool(self.driver) as session_pool: + if create: + session_pool.execute_with_retries(self._q_create()) + session_pool.execute_with_retries(self._q_upsert(rows)) + + + # Execute BATCH query and check new updates + def _assert_batch(self, batch_query, select_query): + with ydb.QuerySessionPool(self.driver) as session_pool: + session_pool.execute_with_retries(batch_query) + result_sets = session_pool.execute_with_retries(select_query) + assert result_sets[0].rows[0]["cnt"] == 0 + + + # Queries + def _q_create(self): + return f""" + CREATE TABLE `{self.table_name}` ( + k1 Uint64 NOT NULL, + k2 Uint64, + v1 Uint64, + v2 String, + id Int64, + PRIMARY KEY (k2, k1) + ) WITH ( + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3, + PARTITION_AT_KEYS = ((2, 2), (4), (5, 6), (8)) + ); + """ + + def _q_upsert(self, rows): + return f""" + UPSERT INTO `{self.table_name}` (k1, k2, v1, v2, id) + VALUES {", ".join(["(" + ", ".join(map(str, row)) + ")" for row in rows])}; + """ + + def _q_batch_update(self, set, where_and, where_or): + where_list = [] + if len(where_and) > 0: + where_list.append(" AND ".join(where_and)) + if len(where_or) > 0: + where_list.append("(" + " OR ".join(where_or) + ")") + where = " AND ".join(where_list) + + return f""" + BATCH UPDATE `{self.table_name}` + SET {", ".join(set)} + {("WHERE " + where) if len(where) > 0 else ""}; + """ + + def _q_batch_delete(self, where_and, where_or): + where_list = [] + if len(where_and) > 0: + where_list.append(" AND ".join(where_and)) + if len(where_or) > 0: + where_list.append("(" + " OR ".join(where_or) + ")") + where = " AND ".join(where_list) + + return f""" + BATCH DELETE FROM `{self.table_name}` + {("WHERE " + where) if len(where) > 0 else ""}; + """ + + def _q_select(self, where_and, where_or): + where_list = [] + if len(where_and) > 0: + where_list.append(" AND ".join(where_and)) + if len(where_or) > 0: + where_list.append("(" + " OR ".join(where_or) + ")") + where = " AND ".join(where_list) + + return f""" + SELECT COUNT(*) AS cnt FROM `{self.table_name}` + {("WHERE " + where) if len(where) > 0 else ""}; + """ diff --git a/ydb/tests/compatibility/ya.make b/ydb/tests/compatibility/ya.make index e90c4150b280..59d46e46b133 100644 --- a/ydb/tests/compatibility/ya.make +++ b/ydb/tests/compatibility/ya.make @@ -1,6 +1,7 @@ PY3TEST() ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd") ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb") +ENV(YDB_KQP_ENABLE_BATCH_UPDATES="true") FORK_TEST_FILES() FORK_TESTS() @@ -16,6 +17,7 @@ TEST_SRCS( test_statistics.py test_rolling.py test_datetime2.py + test_batch_operations.py ) SIZE(LARGE) diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index 9b010ac42902..e694073fe20b 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -273,6 +273,10 @@ def __init__( if os.getenv('YDB_KQP_ENABLE_IMMEDIATE_EFFECTS', 'false').lower() == 'true': self.yaml_config["table_service_config"]["enable_kqp_immediate_effects"] = True + if os.getenv('YDB_KQP_ENABLE_BATCH_UPDATES', 'false').lower() == 'true': + self.yaml_config["table_service_config"]["enable_oltp_sink"] = True + self.yaml_config["table_service_config"]["enable_batch_updates"] = True + if os.getenv('PGWIRE_LISTENING_PORT', ''): self.yaml_config["local_pg_wire_config"] = {} self.yaml_config["local_pg_wire_config"]["listening_port"] = os.getenv('PGWIRE_LISTENING_PORT') From 7704904599e3093fa679244691b2f1556cd6d1b2 Mon Sep 17 00:00:00 2001 From: Daniil Timizhev Date: Thu, 29 May 2025 12:42:31 +0300 Subject: [PATCH 2/4] Add table_service_config to kikimr config --- ydb/tests/compatibility/test_batch_operations.py | 5 ++++- ydb/tests/compatibility/ya.make | 1 - ydb/tests/library/harness/kikimr_config.py | 8 ++++---- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/ydb/tests/compatibility/test_batch_operations.py b/ydb/tests/compatibility/test_batch_operations.py index 8b1ca1c71f5d..05f5163484a5 100644 --- a/ydb/tests/compatibility/test_batch_operations.py +++ b/ydb/tests/compatibility/test_batch_operations.py @@ -14,7 +14,10 @@ class TestBatchOperations(RollingUpgradeAndDowngradeFixture): @pytest.fixture(autouse=True, scope="function") def setup(self): - yield from self.setup_cluster() + yield from self.setup_cluster(table_service_config={ + "enable_oltp_sink": True, + "enable_batch_updates": True, + }) def test_batch_update(self): self._fill_table() diff --git a/ydb/tests/compatibility/ya.make b/ydb/tests/compatibility/ya.make index 59d46e46b133..ada69246d68d 100644 --- a/ydb/tests/compatibility/ya.make +++ b/ydb/tests/compatibility/ya.make @@ -1,7 +1,6 @@ PY3TEST() ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd") ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb") -ENV(YDB_KQP_ENABLE_BATCH_UPDATES="true") FORK_TEST_FILES() FORK_TESTS() diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index e694073fe20b..237441a94a2d 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -171,6 +171,7 @@ def __init__( breakpad_minidumps_path=None, breakpad_minidumps_script=None, explicit_hosts_and_host_configs=False, + table_service_config=None, # dict[name]=value ): if extra_feature_flags is None: extra_feature_flags = [] @@ -270,13 +271,12 @@ def __init__( if "table_service_config" not in self.yaml_config: self.yaml_config["table_service_config"] = {} + if table_service_config: + self.yaml_config["table_service_config"].update(table_service_config) + if os.getenv('YDB_KQP_ENABLE_IMMEDIATE_EFFECTS', 'false').lower() == 'true': self.yaml_config["table_service_config"]["enable_kqp_immediate_effects"] = True - if os.getenv('YDB_KQP_ENABLE_BATCH_UPDATES', 'false').lower() == 'true': - self.yaml_config["table_service_config"]["enable_oltp_sink"] = True - self.yaml_config["table_service_config"]["enable_batch_updates"] = True - if os.getenv('PGWIRE_LISTENING_PORT', ''): self.yaml_config["local_pg_wire_config"] = {} self.yaml_config["local_pg_wire_config"]["listening_port"] = os.getenv('PGWIRE_LISTENING_PORT') From e56b4bc818c62890f200dd97cb4f4b2252393092 Mon Sep 17 00:00:00 2001 From: Daniil Timizhev Date: Mon, 9 Jun 2025 20:13:59 +0300 Subject: [PATCH 3/4] Pretty queries --- .../compatibility/test_batch_operations.py | 114 ++++++++---------- 1 file changed, 48 insertions(+), 66 deletions(-) diff --git a/ydb/tests/compatibility/test_batch_operations.py b/ydb/tests/compatibility/test_batch_operations.py index 05f5163484a5..81910d638e25 100644 --- a/ydb/tests/compatibility/test_batch_operations.py +++ b/ydb/tests/compatibility/test_batch_operations.py @@ -8,8 +8,8 @@ class TestBatchOperations(RollingUpgradeAndDowngradeFixture): table_name = "batch_table" - pk_1 = list(range(0, 10)) - pk_2 = ["NULL"] + list(range(0, 10)) + pk_1 = list(range(0, 15)) + pk_2 = list(range(0, 15)) groups_cnt = 3 @pytest.fixture(autouse=True, scope="function") @@ -20,75 +20,81 @@ def setup(self): }) def test_batch_update(self): - self._fill_table() + self.fill_table() value = 0 for _ in self.roll(): - self._assert_batch( - self._q_batch_update(set=[f"v1 = {value}", f"v2 = \"Value{value}\""], where_and=[], where_or=[]), - self._q_select(where_and=[], where_or=[f"v1 != {value}", f"v2 != \"Value{value}\""]) + self.assert_batch( + self.q_batch_update(f"v1 = {value}, v2 = \"String_{value}\""), + self.q_select(f"v1 != {value} OR v2 != \"String_{value}\"") ) value += 1 - self._assert_batch( - self._q_batch_update(set=[f"v1 = {value}", f"v2 = \"Value{value}\""], where_and=[f"id = {value % self.groups_cnt}"], where_or=[]), - self._q_select(where_and=[f"id = {value % self.groups_cnt}"], where_or=[f"v1 != {value}", f"v2 != \"Value{value}\""]) + where = f"id = {value % self.groups_cnt}" + self.assert_batch( + self.q_batch_update(f"v1 = {value}, v2 = \"String_{value}\"", where), + self.q_select(f"{where} AND (v1 != {value} OR v2 != \"String_{value}\")") ) value += 1 - self._assert_batch( - self._q_batch_update(set=[f"v1 = {value}", f"v2 = \"Value{value}\""], where_and=[], where_or=[f"id = {value % self.groups_cnt}", "k1 % 2 = 0"]), - self._q_select(where_and=[f"id = {value % self.groups_cnt} OR k1 % 2 = 0"], where_or=[f"v1 != {value}", f"v2 != \"Value{value}\""]) + where = f"id = {value % self.groups_cnt} OR k1 % 2 = 0" + self.assert_batch( + self.q_batch_update(f"v1 = {value}, v2 = \"String_{value}\"", where), + self.q_select(f"{where} AND (v1 != {value} OR v2 != \"String_{value}\")") ) value += 1 - self._assert_batch( - self._q_batch_update(set=[f"v1 = {value}", f"v2 = \"Value{value}\""], where_and=[f"id = {value % self.groups_cnt}", f"k2 IS NOT NULL AND k2 <= {value % 5}"], where_or=[]), - self._q_select(where_and=[f"id = {value % self.groups_cnt}", f"k2 IS NOT NULL AND k2 <= {value % 5}"], where_or=[f"v1 != {value}", f"v2 != \"Value{value}\""]) + where = f"id = {value % self.groups_cnt} AND k2 IS NOT NULL AND k2 <= {value % 5}" + self.assert_batch( + self.q_batch_update(f"v1 = {value}, v2 = \"String_{value}\"", where), + self.q_select(f"{where} AND (v1 != {value} OR v2 != \"String_{value}\")") ) value += 1 def test_batch_delete(self): - self._fill_table() - + self.fill_table() value = 0 + for _ in self.roll(): - self._assert_batch( - self._q_batch_delete(where_and=[f"id = {value % self.groups_cnt}"], where_or=[]), - self._q_select(where_and=[f"id = {value % self.groups_cnt}"], where_or=[]) + where = f"id = {value % self.groups_cnt}" + self.assert_batch( + self.q_batch_delete(where), + self.q_select(where) ) value += 1 - self._assert_batch( - self._q_batch_delete(where_and=[f"id = {value % self.groups_cnt}", f"k2 IS NULL OR k2 >= {value % 5}"], where_or=[]), - self._q_select(where_and=[f"id = {value % self.groups_cnt}", f"k2 IS NULL OR k2 >= {value % 5}"], where_or=[]) + where = f"id = {value % self.groups_cnt} AND (k2 IS NULL OR k2 >= {value % 5})" + self.assert_batch( + self.q_batch_delete(where), + self.q_select(where) ) value += 1 - self._assert_batch( - self._q_batch_delete(where_and=[], where_or=[f"id = {value % self.groups_cnt}", "k1 % 2 = 0"]), - self._q_select(where_and=[f"id = {value % self.groups_cnt} OR k1 % 2 = 0"], where_or=[]) + where = f"id = {value % self.groups_cnt} OR k1 % 2 = 0" + self.assert_batch( + self.q_batch_delete(where), + self.q_select(where) ) value += 1 - self._assert_batch( - self._q_batch_delete(where_and=[]), - self._q_select(where_and=[], where_or=[]) + self.assert_batch( + self.q_batch_delete(), + self.q_select() ) value += 1 - self._fill_table(False) + self.fill_table(False) # Create and fill the table - def _fill_table(self, create=True): + def fill_table(self, create=True): rows = [] id_value = 0 @@ -100,12 +106,12 @@ def _fill_table(self, create=True): with ydb.QuerySessionPool(self.driver) as session_pool: if create: - session_pool.execute_with_retries(self._q_create()) - session_pool.execute_with_retries(self._q_upsert(rows)) + session_pool.execute_with_retries(self.q_create()) + session_pool.execute_with_retries(self.q_upsert(rows)) # Execute BATCH query and check new updates - def _assert_batch(self, batch_query, select_query): + def assert_batch(self, batch_query, select_query): with ydb.QuerySessionPool(self.driver) as session_pool: session_pool.execute_with_retries(batch_query) result_sets = session_pool.execute_with_retries(select_query) @@ -113,7 +119,7 @@ def _assert_batch(self, batch_query, select_query): # Queries - def _q_create(self): + def q_create(self): return f""" CREATE TABLE `{self.table_name}` ( k1 Uint64 NOT NULL, @@ -128,48 +134,24 @@ def _q_create(self): ); """ - def _q_upsert(self, rows): + def q_upsert(self, rows): return f""" UPSERT INTO `{self.table_name}` (k1, k2, v1, v2, id) VALUES {", ".join(["(" + ", ".join(map(str, row)) + ")" for row in rows])}; """ - def _q_batch_update(self, set, where_and, where_or): - where_list = [] - if len(where_and) > 0: - where_list.append(" AND ".join(where_and)) - if len(where_or) > 0: - where_list.append("(" + " OR ".join(where_or) + ")") - where = " AND ".join(where_list) - + def q_batch_update(self, values, where=None): return f""" BATCH UPDATE `{self.table_name}` - SET {", ".join(set)} - {("WHERE " + where) if len(where) > 0 else ""}; + SET {values}{(" WHERE " + where) if where else ""}; """ - def _q_batch_delete(self, where_and, where_or): - where_list = [] - if len(where_and) > 0: - where_list.append(" AND ".join(where_and)) - if len(where_or) > 0: - where_list.append("(" + " OR ".join(where_or) + ")") - where = " AND ".join(where_list) - + def q_batch_delete(self, where=None): return f""" - BATCH DELETE FROM `{self.table_name}` - {("WHERE " + where) if len(where) > 0 else ""}; + BATCH DELETE FROM `{self.table_name}`{(" WHERE " + where) if where else ""}; """ - def _q_select(self, where_and, where_or): - where_list = [] - if len(where_and) > 0: - where_list.append(" AND ".join(where_and)) - if len(where_or) > 0: - where_list.append("(" + " OR ".join(where_or) + ")") - where = " AND ".join(where_list) - + def q_select(self, where=None): return f""" - SELECT COUNT(*) AS cnt FROM `{self.table_name}` - {("WHERE " + where) if len(where) > 0 else ""}; + SELECT COUNT(*) AS cnt FROM `{self.table_name}`{(" WHERE " + where) if where else ""}; """ From 258fcdb2a1208674ddfaecc189806f444a609685 Mon Sep 17 00:00:00 2001 From: Daniil Timizhev Date: Tue, 10 Jun 2025 11:11:19 +0300 Subject: [PATCH 4/4] Rm blank lines and comments --- ydb/tests/compatibility/test_batch_operations.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ydb/tests/compatibility/test_batch_operations.py b/ydb/tests/compatibility/test_batch_operations.py index 81910d638e25..06eccb293136 100644 --- a/ydb/tests/compatibility/test_batch_operations.py +++ b/ydb/tests/compatibility/test_batch_operations.py @@ -92,8 +92,6 @@ def test_batch_delete(self): value += 1 self.fill_table(False) - - # Create and fill the table def fill_table(self, create=True): rows = [] id_value = 0 @@ -109,16 +107,12 @@ def fill_table(self, create=True): session_pool.execute_with_retries(self.q_create()) session_pool.execute_with_retries(self.q_upsert(rows)) - - # Execute BATCH query and check new updates def assert_batch(self, batch_query, select_query): with ydb.QuerySessionPool(self.driver) as session_pool: session_pool.execute_with_retries(batch_query) result_sets = session_pool.execute_with_retries(select_query) assert result_sets[0].rows[0]["cnt"] == 0 - - # Queries def q_create(self): return f""" CREATE TABLE `{self.table_name}` (