diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp index 7dbaa0edcc59..dad58f1e6053 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp @@ -45,8 +45,32 @@ ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady( const auto& rk = *source->GetSourceSchema()->GetIndexInfo().GetReplaceKey(); const auto& g = source->GetStageResult().GetBatch(); AFL_VERIFY(Iterators.size()); - AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("front", Iterators.front().DebugString())("source", - source->GetAs()->GetStart().DebugString())("source_id", source->GetSourceId()); + if (Iterators.front().GetSourceId() != source->GetSourceId()) { + for (auto it : Iterators) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("Iterator", it.DebugString()); + } + for (auto it : DebugOrder) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it); + } + if (FindIf(Iterators, [&](const auto& item) { return item.GetSourceId() == source->GetSourceId(); }) != Iterators.end()) { + AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "portion is in heap") + ("front", Iterators.front().DebugString()) + ("source", source->GetAs()->GetStart().DebugString()) + ("source_id", source->GetSourceId()); + } + else if (Find(DebugOrder, source->GetSourceId()) != DebugOrder.end()) { + AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "known portion, not in heap") + ("front", Iterators.front().DebugString()) + ("source", source->GetAs()->GetStart().DebugString()) + ("source_id", source->GetSourceId()); + } + else { + AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "unknown portion") + ("front", Iterators.front().DebugString()) + ("source", source->GetAs()->GetStart().DebugString()) + ("source_id", source->GetSourceId()); + } + } std::pop_heap(Iterators.begin(), Iterators.end()); if (!g || !g->GetRecordsCount()) { Iterators.pop_back(); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h index 65ba79521266..1ac6b1a274ab 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h @@ -117,6 +117,7 @@ class TSyncPointLimitControl: public ISyncPoint { }; std::vector Iterators; + std::vector DebugOrder; virtual bool IsFinished() const override { return FetchedCount >= Limit || TBase::IsFinished(); @@ -125,6 +126,7 @@ class TSyncPointLimitControl: public ISyncPoint { virtual std::shared_ptr OnAddSource(const std::shared_ptr& source) override { AFL_VERIFY(FetchedCount < Limit)("fetched", FetchedCount)("limit", Limit); Iterators.emplace_back(TSourceIterator(source)); + DebugOrder.emplace_back(source->GetSourceId()); std::push_heap(Iterators.begin(), Iterators.end()); return TBase::OnAddSource(source); } diff --git a/ydb/tests/olap/order_by_with_limit.py b/ydb/tests/olap/order_by_with_limit.py index 9d04900c8525..06a1d328a0f3 100644 --- a/ydb/tests/olap/order_by_with_limit.py +++ b/ydb/tests/olap/order_by_with_limit.py @@ -3,6 +3,8 @@ import yatest.common import ydb import random +import string +import datetime from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator from ydb.tests.library.harness.kikimr_runner import KiKiMR @@ -21,6 +23,9 @@ def setup_class(cls): ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY")) logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8")) config = KikimrConfigGenerator( + extra_feature_flags={ + "enable_columnshard_bool": True + }, column_shard_config={ "compaction_enabled": False, "deduplication_enabled": True, @@ -53,7 +58,7 @@ def write_data(self, table: str): row, ) - def test(self): + def test_random(self): test_dir = f"{self.ydb_client.database}/{self.test_name}" table_path = f"{test_dir}/table" @@ -100,3 +105,144 @@ def test(self): keys = [row['id'] for result_set in result_sets for row in result_set.rows] assert keys == answer, keys + + def random_char(self): + characters = string.ascii_letters + string.digits + return random.choice(characters) + + def random_string(self, size: int): + result_string = ''.join(self.random_char() for i in range(size)) + return result_string + + def gen_portion(self, start_idx: int, portion_size: int, start: str): + return [{"id": i, "value": start + self.random_string(1000)} for i in range(start_idx, start_idx + portion_size)] + + def test_fetch_race(self): + test_dir = f"{self.ydb_client.database}/{self.test_name}" + table_path = f"{test_dir}/table" + + self.ydb_client.query( + f""" + CREATE TABLE `{table_path}` ( + id Uint64 NOT NULL, + value Utf8 NOT NULL, + PRIMARY KEY(id, value), + ) + WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1 + ) + """ + ) + + column_types = ydb.BulkUpsertColumns() + column_types.add_column("id", ydb.PrimitiveType.Uint64) + column_types.add_column("value", ydb.PrimitiveType.Utf8) + + big_portion1 = self.gen_portion(1, 10000, "3") + small_portion = self.gen_portion(1, 1, "2") + big_portion2 = self.gen_portion(1, 10000, "1") + + self.ydb_client.bulk_upsert(table_path, column_types, big_portion1) + self.ydb_client.bulk_upsert(table_path, column_types, small_portion) + self.ydb_client.bulk_upsert(table_path, column_types, big_portion2) + + result_sets = self.ydb_client.query( + f""" + select id, value from `{table_path}` + order by id, value limit 1000 + """ + ) + + keys = [row['id'] for result_set in result_sets for row in result_set.rows] + + assert len(keys) == 1000 + assert max(keys) == 500 + + def test_filtered_portion(self): + test_dir = f"{self.ydb_client.database}/{self.test_name}" + table_path = f"{test_dir}/table" + + self.ydb_client.query( + f""" + CREATE TABLE `{table_path}` ( + id Uint64 NOT NULL, + value bool NOT NULL, + PRIMARY KEY(id), + ) + WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1 + ) + """ + ) + + column_types = ydb.BulkUpsertColumns() + column_types.add_column("id", ydb.PrimitiveType.Uint64) + column_types.add_column("value", ydb.PrimitiveType.Bool) + + portion1 = [{"id" : 1, "value" : True}] + portion2 = [{"id" : 2, "value" : False}, {"id" : 3, "value" : False}] + portion3 = [{"id" : 2, "value" : True}] + portion4 = [{"id" : 2, "value" : True}, {"id" : 3, "value" : True}] + + self.ydb_client.bulk_upsert(table_path, column_types, portion1) + self.ydb_client.bulk_upsert(table_path, column_types, portion2) + self.ydb_client.bulk_upsert(table_path, column_types, portion3) + self.ydb_client.bulk_upsert(table_path, column_types, portion4) + + result_sets = self.ydb_client.query( + f""" + select id, value from `{table_path}` + order by id limit 10 + """ + ) + + result = [(row['id'], row['value']) for result_set in result_sets for row in result_set.rows] + + assert len(result) == 3 + assert result == [(1, True), (2, True), (3, True)] + + def test_stress_sorting(self): + test_dir = f"{self.ydb_client.database}/{self.test_name}" + table_path = f"{test_dir}/table" + + self.ydb_client.query( + f""" + CREATE TABLE `{table_path}` ( + time Timestamp NOT NULL, + uniq Utf8 NOT NULL, + class Utf8 NOT NULL, + value Utf8, + PRIMARY KEY(time, class, uniq), + ) + WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1 + ) + """ + ) + + column_types = ydb.BulkUpsertColumns() + column_types.add_column("time", ydb.PrimitiveType.Timestamp) + column_types.add_column("class", ydb.PrimitiveType.Utf8) + column_types.add_column("uniq", ydb.PrimitiveType.Utf8) + column_types.add_column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)) + + portions = [[{"time" : datetime.datetime.fromtimestamp(1234567890), + "class" : self.random_char(), + "uniq" : self.random_string(20), + "value" : self.random_char() + }] for _ in range(10000)] + + for portion in portions: + self.ydb_client.bulk_upsert(table_path, column_types, portion) + + result_sets = self.ydb_client.query( + f""" + select * from `{table_path}` + order by time, class, uniq limit 1000 + """ + ) + + assert len(result_sets[0].rows) == 1000