Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions ydb/tests/compatibility/test_batch_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# -*- coding: utf-8 -*-
import pytest
import itertools

from ydb.tests.library.compatibility.fixtures import RollingUpgradeAndDowngradeFixture
from ydb.tests.oss.ydb_sdk_import import ydb


class TestBatchOperations(RollingUpgradeAndDowngradeFixture):
table_name = "batch_table"
pk_1 = list(range(0, 15))
pk_2 = list(range(0, 15))
groups_cnt = 3

@pytest.fixture(autouse=True, scope="function")
def setup(self):
yield from self.setup_cluster(table_service_config={
"enable_oltp_sink": True,
"enable_batch_updates": True,
})

def test_batch_update(self):
self.fill_table()

value = 0
for _ in self.roll():
self.assert_batch(
self.q_batch_update(f"v1 = {value}, v2 = \"String_{value}\""),
self.q_select(f"v1 != {value} OR v2 != \"String_{value}\"")
)

value += 1

where = f"id = {value % self.groups_cnt}"
self.assert_batch(
self.q_batch_update(f"v1 = {value}, v2 = \"String_{value}\"", where),
self.q_select(f"{where} AND (v1 != {value} OR v2 != \"String_{value}\")")
)

value += 1

where = f"id = {value % self.groups_cnt} OR k1 % 2 = 0"
self.assert_batch(
self.q_batch_update(f"v1 = {value}, v2 = \"String_{value}\"", where),
self.q_select(f"{where} AND (v1 != {value} OR v2 != \"String_{value}\")")
)

value += 1

where = f"id = {value % self.groups_cnt} AND k2 IS NOT NULL AND k2 <= {value % 5}"
self.assert_batch(
self.q_batch_update(f"v1 = {value}, v2 = \"String_{value}\"", where),
self.q_select(f"{where} AND (v1 != {value} OR v2 != \"String_{value}\")")
)

value += 1

def test_batch_delete(self):
self.fill_table()
value = 0

for _ in self.roll():
where = f"id = {value % self.groups_cnt}"
self.assert_batch(
self.q_batch_delete(where),
self.q_select(where)
)

value += 1

where = f"id = {value % self.groups_cnt} AND (k2 IS NULL OR k2 >= {value % 5})"
self.assert_batch(
self.q_batch_delete(where),
self.q_select(where)
)

value += 1

where = f"id = {value % self.groups_cnt} OR k1 % 2 = 0"
self.assert_batch(
self.q_batch_delete(where),
self.q_select(where)
)

value += 1

self.assert_batch(
self.q_batch_delete(),
self.q_select()
)

value += 1
self.fill_table(False)

def fill_table(self, create=True):
rows = []
id_value = 0

for k1, k2 in itertools.product(self.pk_1, self.pk_2):
v1 = k1 + k2 if k2 != "NULL" else k1
v2 = f"\"Value{str(k1) + str(k2)}\""
rows.append([k1, k2, v1, v2, id_value % self.groups_cnt])
id_value += 1

with ydb.QuerySessionPool(self.driver) as session_pool:
if create:
session_pool.execute_with_retries(self.q_create())
session_pool.execute_with_retries(self.q_upsert(rows))

def assert_batch(self, batch_query, select_query):
with ydb.QuerySessionPool(self.driver) as session_pool:
session_pool.execute_with_retries(batch_query)
result_sets = session_pool.execute_with_retries(select_query)
assert result_sets[0].rows[0]["cnt"] == 0

def q_create(self):
return f"""
CREATE TABLE `{self.table_name}` (
k1 Uint64 NOT NULL,
k2 Uint64,
v1 Uint64,
v2 String,
id Int64,
PRIMARY KEY (k2, k1)
) WITH (
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3,
PARTITION_AT_KEYS = ((2, 2), (4), (5, 6), (8))
);
"""

def q_upsert(self, rows):
return f"""
UPSERT INTO `{self.table_name}` (k1, k2, v1, v2, id)
VALUES {", ".join(["(" + ", ".join(map(str, row)) + ")" for row in rows])};
"""

def q_batch_update(self, values, where=None):
return f"""
BATCH UPDATE `{self.table_name}`
SET {values}{(" WHERE " + where) if where else ""};
"""

def q_batch_delete(self, where=None):
return f"""
BATCH DELETE FROM `{self.table_name}`{(" WHERE " + where) if where else ""};
"""

def q_select(self, where=None):
return f"""
SELECT COUNT(*) AS cnt FROM `{self.table_name}`{(" WHERE " + where) if where else ""};
"""
1 change: 1 addition & 0 deletions ydb/tests/compatibility/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ TEST_SRCS(
test_rolling.py
test_data_type.py
test_vector_index.py
test_batch_operations.py
udf/test_datetime2.py
udf/test_digest.py
udf/test_digest_regression.py
Expand Down
4 changes: 4 additions & 0 deletions ydb/tests/library/harness/kikimr_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def __init__(
breakpad_minidumps_path=None,
breakpad_minidumps_script=None,
explicit_hosts_and_host_configs=False,
table_service_config=None, # dict[name]=value
):
if extra_feature_flags is None:
extra_feature_flags = []
Expand Down Expand Up @@ -270,6 +271,9 @@ def __init__(
if "table_service_config" not in self.yaml_config:
self.yaml_config["table_service_config"] = {}

if table_service_config:
self.yaml_config["table_service_config"].update(table_service_config)

if os.getenv('YDB_KQP_ENABLE_IMMEDIATE_EFFECTS', 'false').lower() == 'true':
self.yaml_config["table_service_config"]["enable_kqp_immediate_effects"] = True

Expand Down
Loading