From bccee68e24fa004affe84b37a8677f44a55d8df8 Mon Sep 17 00:00:00 2001 From: Iurii Kravchenko Date: Wed, 12 Nov 2025 14:35:47 +0300 Subject: [PATCH 1/5] tests --- ydb/tests/olap/order_by_with_limit.py | 102 +++++++++++++++++++++++++- 1 file changed, 100 insertions(+), 2 deletions(-) diff --git a/ydb/tests/olap/order_by_with_limit.py b/ydb/tests/olap/order_by_with_limit.py index 9d04900c8525..201eac1e1493 100644 --- a/ydb/tests/olap/order_by_with_limit.py +++ b/ydb/tests/olap/order_by_with_limit.py @@ -3,6 +3,7 @@ import yatest.common import ydb import random +import string from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator from ydb.tests.library.harness.kikimr_runner import KiKiMR @@ -17,10 +18,13 @@ class TestOrderBy(object): @classmethod def setup_class(cls): - random.seed(0xBEDA) + random.seed(0xBEDD) ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY")) logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8")) config = KikimrConfigGenerator( + extra_feature_flags={ + "enable_columnshard_bool": True + }, column_shard_config={ "compaction_enabled": False, "deduplication_enabled": True, @@ -53,7 +57,7 @@ def write_data(self, table: str): row, ) - def test(self): + def test_random(self): test_dir = f"{self.ydb_client.database}/{self.test_name}" table_path = f"{test_dir}/table" @@ -100,3 +104,97 @@ def test(self): keys = [row['id'] for result_set in result_sets for row in result_set.rows] assert keys == answer, keys + + def random_string(self): + characters = string.ascii_letters + string.digits + result_string = ''.join(random.choice(characters) for i in range(1000)) + return result_string + + def gen_portion(self, start_idx: int, portion_size: int, start: str): + return [{"id": i, "value": start + self.random_string()} for i in range(start_idx, start_idx + portion_size)] + + def test_fetch_race(self): + test_dir = f"{self.ydb_client.database}/{self.test_name}" + table_path = f"{test_dir}/table" + + self.ydb_client.query( + f""" + CREATE TABLE `{table_path}` ( + id Uint64 NOT NULL, + value Utf8 NOT NULL, + PRIMARY KEY(id, value), + ) + WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1 + ) + """ + ) + + column_types = ydb.BulkUpsertColumns() + column_types.add_column("id", ydb.PrimitiveType.Uint64) + column_types.add_column("value", ydb.PrimitiveType.Utf8) + + big_portion1 = self.gen_portion(1, 10000, "3") + small_portion = self.gen_portion(1, 1, "2") + big_portion2 = self.gen_portion(1, 10000, "1") + + self.ydb_client.bulk_upsert(table_path, column_types, big_portion1) + self.ydb_client.bulk_upsert(table_path, column_types, small_portion) + self.ydb_client.bulk_upsert(table_path, column_types, big_portion2) + + result_sets = self.ydb_client.query( + f""" + select id, value from `{table_path}` + order by id, value limit 1000 + """ + ) + + keys = [row['id'] for result_set in result_sets for row in result_set.rows] + + assert len(keys) == 1000 + assert max(keys) == 500 + + def test_filtered_portion(self): + test_dir = f"{self.ydb_client.database}/{self.test_name}" + table_path = f"{test_dir}/table" + + self.ydb_client.query( + f""" + CREATE TABLE `{table_path}` ( + id Uint64 NOT NULL, + value bool NOT NULL, + PRIMARY KEY(id), + ) + WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1 + ) + """ + ) + + column_types = ydb.BulkUpsertColumns() + column_types.add_column("id", ydb.PrimitiveType.Uint64) + column_types.add_column("value", ydb.PrimitiveType.Bool) + + portion1 = [{"id" : 1, "value" : True}] + portion2 = [{"id" : 2, "value" : False}, {"id" : 3, "value" : False}] + portion3 = [{"id" : 2, "value" : True}] + portion4 = [{"id" : 2, "value" : True}, {"id" : 3, "value" : True}] + + self.ydb_client.bulk_upsert(table_path, column_types, portion1) + self.ydb_client.bulk_upsert(table_path, column_types, portion2) + self.ydb_client.bulk_upsert(table_path, column_types, portion3) + self.ydb_client.bulk_upsert(table_path, column_types, portion4) + + result_sets = self.ydb_client.query( + f""" + select id, value from `{table_path}` + order by id limit 10 + """ + ) + + result = [(row['id'], row['value']) for result_set in result_sets for row in result_set.rows] + + assert len(result) == 3 + assert result == [(1, True), (2, True), (3, True)] From 978de9284995f1eac0869e206ff22af07f049ed8 Mon Sep 17 00:00:00 2001 From: Iurii Kravchenko Date: Fri, 14 Nov 2025 16:18:17 +0300 Subject: [PATCH 2/5] one more test --- ydb/tests/olap/order_by_with_limit.py | 56 +++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/ydb/tests/olap/order_by_with_limit.py b/ydb/tests/olap/order_by_with_limit.py index 201eac1e1493..9902566c4f12 100644 --- a/ydb/tests/olap/order_by_with_limit.py +++ b/ydb/tests/olap/order_by_with_limit.py @@ -4,6 +4,7 @@ import ydb import random import string +import datetime from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator from ydb.tests.library.harness.kikimr_runner import KiKiMR @@ -18,7 +19,7 @@ class TestOrderBy(object): @classmethod def setup_class(cls): - random.seed(0xBEDD) + random.seed(0xBEDA) ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY")) logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8")) config = KikimrConfigGenerator( @@ -105,13 +106,16 @@ def test_random(self): assert keys == answer, keys - def random_string(self): + def random_char(self): characters = string.ascii_letters + string.digits - result_string = ''.join(random.choice(characters) for i in range(1000)) + return random.choice(characters) + + def random_string(self, size: int): + result_string = ''.join(self.random_char() for i in range(size)) return result_string def gen_portion(self, start_idx: int, portion_size: int, start: str): - return [{"id": i, "value": start + self.random_string()} for i in range(start_idx, start_idx + portion_size)] + return [{"id": i, "value": start + self.random_string(1000)} for i in range(start_idx, start_idx + portion_size)] def test_fetch_race(self): test_dir = f"{self.ydb_client.database}/{self.test_name}" @@ -198,3 +202,47 @@ def test_filtered_portion(self): assert len(result) == 3 assert result == [(1, True), (2, True), (3, True)] + + def test_stress_sorting(self): + test_dir = f"{self.ydb_client.database}/{self.test_name}" + table_path = f"{test_dir}/table" + + self.ydb_client.query( + f""" + CREATE TABLE `{table_path}` ( + time Timestamp NOT NULL, + uniq Utf8 NOT NULL, + class Utf8 NOT NULL, + value Utf8, + PRIMARY KEY(time, class, uniq), + ) + WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1 + ) + """ + ) + + column_types = ydb.BulkUpsertColumns() + column_types.add_column("time", ydb.PrimitiveType.Timestamp) + column_types.add_column("class", ydb.PrimitiveType.Utf8) + column_types.add_column("uniq", ydb.PrimitiveType.Utf8) + column_types.add_column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + + portions = [[{ "time" : datetime.datetime.fromtimestamp(1234567890) + , "class" : self.random_char() + , "uniq" : self.random_string(20) + , "value" : self.random_char() + }] for _ in range(10000)] + + for portion in portions: + self.ydb_client.bulk_upsert(table_path, column_types, portion) + + result_sets = self.ydb_client.query( + f""" + select * from `{table_path}` + order by time, class, uniq limit 1000 + """ + ) + + assert len(result_sets[0].rows) == 1000 From c5c86940c563ee324f724f12aea744b028640923 Mon Sep 17 00:00:00 2001 From: Iurii Kravchenko Date: Fri, 14 Nov 2025 19:11:59 +0300 Subject: [PATCH 3/5] tmp fix --- .../iterator/sync_points/limit.cpp | 29 +++++++++++++++++-- .../iterator/sync_points/limit.h | 2 ++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp index 7dbaa0edcc59..52830d5bd226 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp @@ -1,6 +1,7 @@ #include "limit.h" #include +#include namespace NKikimr::NOlap::NReader::NSimple { @@ -45,8 +46,32 @@ ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady( const auto& rk = *source->GetSourceSchema()->GetIndexInfo().GetReplaceKey(); const auto& g = source->GetStageResult().GetBatch(); AFL_VERIFY(Iterators.size()); - AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("front", Iterators.front().DebugString())("source", - source->GetAs()->GetStart().DebugString())("source_id", source->GetSourceId()); + if (Iterators.front().GetSourceId() != source->GetSourceId()) { + for (auto it : Iterators) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("Iterator", it.DebugString()); + } + for (auto it : DebugOrder) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it); + } + if (std::ranges::find(Iterators, source->GetSourceId(), &TSourceIterator::GetSourceId) != Iterators.end()) { + AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "portion is in heap") + ("front", Iterators.front().DebugString()) + ("source", source->GetAs()->GetStart().DebugString()) + ("source_id", source->GetSourceId()); + } + else if (std::ranges::find(DebugOrder, source->GetSourceId()) != DebugOrder.end()) { + AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "known portion, not in heap") + ("front", Iterators.front().DebugString()) + ("source", source->GetAs()->GetStart().DebugString()) + ("source_id", source->GetSourceId()); + } + else { + AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "unknown portion") + ("front", Iterators.front().DebugString()) + ("source", source->GetAs()->GetStart().DebugString()) + ("source_id", source->GetSourceId()); + } + } std::pop_heap(Iterators.begin(), Iterators.end()); if (!g || !g->GetRecordsCount()) { Iterators.pop_back(); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h index 65ba79521266..1ac6b1a274ab 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h @@ -117,6 +117,7 @@ class TSyncPointLimitControl: public ISyncPoint { }; std::vector Iterators; + std::vector DebugOrder; virtual bool IsFinished() const override { return FetchedCount >= Limit || TBase::IsFinished(); @@ -125,6 +126,7 @@ class TSyncPointLimitControl: public ISyncPoint { virtual std::shared_ptr OnAddSource(const std::shared_ptr& source) override { AFL_VERIFY(FetchedCount < Limit)("fetched", FetchedCount)("limit", Limit); Iterators.emplace_back(TSourceIterator(source)); + DebugOrder.emplace_back(source->GetSourceId()); std::push_heap(Iterators.begin(), Iterators.end()); return TBase::OnAddSource(source); } From a63f004b92f26100351e0a52140793453e0fb865 Mon Sep 17 00:00:00 2001 From: Iurii Kravchenko Date: Fri, 14 Nov 2025 21:13:40 +0300 Subject: [PATCH 4/5] flake8 fix --- ydb/tests/olap/order_by_with_limit.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ydb/tests/olap/order_by_with_limit.py b/ydb/tests/olap/order_by_with_limit.py index 9902566c4f12..06a1d328a0f3 100644 --- a/ydb/tests/olap/order_by_with_limit.py +++ b/ydb/tests/olap/order_by_with_limit.py @@ -229,11 +229,11 @@ class Utf8 NOT NULL, column_types.add_column("uniq", ydb.PrimitiveType.Utf8) column_types.add_column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)) - portions = [[{ "time" : datetime.datetime.fromtimestamp(1234567890) - , "class" : self.random_char() - , "uniq" : self.random_string(20) - , "value" : self.random_char() - }] for _ in range(10000)] + portions = [[{"time" : datetime.datetime.fromtimestamp(1234567890), + "class" : self.random_char(), + "uniq" : self.random_string(20), + "value" : self.random_char() + }] for _ in range(10000)] for portion in portions: self.ydb_client.bulk_upsert(table_path, column_types, portion) From 18c29b035242fcd34bda0678e9f9f1c51eb631da Mon Sep 17 00:00:00 2001 From: Iurii Kravchenko Date: Fri, 14 Nov 2025 21:56:54 +0300 Subject: [PATCH 5/5] fix --- .../reader/simple_reader/iterator/sync_points/limit.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp index 52830d5bd226..dad58f1e6053 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp @@ -1,7 +1,6 @@ #include "limit.h" #include -#include namespace NKikimr::NOlap::NReader::NSimple { @@ -53,13 +52,13 @@ ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady( for (auto it : DebugOrder) { AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it); } - if (std::ranges::find(Iterators, source->GetSourceId(), &TSourceIterator::GetSourceId) != Iterators.end()) { + if (FindIf(Iterators, [&](const auto& item) { return item.GetSourceId() == source->GetSourceId(); }) != Iterators.end()) { AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "portion is in heap") ("front", Iterators.front().DebugString()) ("source", source->GetAs()->GetStart().DebugString()) ("source_id", source->GetSourceId()); } - else if (std::ranges::find(DebugOrder, source->GetSourceId()) != DebugOrder.end()) { + else if (Find(DebugOrder, source->GetSourceId()) != DebugOrder.end()) { AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "known portion, not in heap") ("front", Iterators.front().DebugString()) ("source", source->GetAs()->GetStart().DebugString())