Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,32 @@ ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady(
const auto& rk = *source->GetSourceSchema()->GetIndexInfo().GetReplaceKey();
const auto& g = source->GetStageResult().GetBatch();
AFL_VERIFY(Iterators.size());
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("front", Iterators.front().DebugString())("source",
source->GetAs<TPortionDataSource>()->GetStart().DebugString())("source_id", source->GetSourceId());
if (Iterators.front().GetSourceId() != source->GetSourceId()) {
for (auto it : Iterators) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("Iterator", it.DebugString());
}
for (auto it : DebugOrder) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it);
}
if (FindIf(Iterators, [&](const auto& item) { return item.GetSourceId() == source->GetSourceId(); }) != Iterators.end()) {
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "portion is in heap")
("front", Iterators.front().DebugString())
("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())
("source_id", source->GetSourceId());
}
else if (Find(DebugOrder, source->GetSourceId()) != DebugOrder.end()) {
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "known portion, not in heap")
("front", Iterators.front().DebugString())
("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())
("source_id", source->GetSourceId());
}
else {
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "unknown portion")
("front", Iterators.front().DebugString())
("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())
("source_id", source->GetSourceId());
}
}
std::pop_heap(Iterators.begin(), Iterators.end());
if (!g || !g->GetRecordsCount()) {
Iterators.pop_back();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class TSyncPointLimitControl: public ISyncPoint {
};

std::vector<TSourceIterator> Iterators;
std::vector<ui64> DebugOrder;

virtual bool IsFinished() const override {
return FetchedCount >= Limit || TBase::IsFinished();
Expand All @@ -125,6 +126,7 @@ class TSyncPointLimitControl: public ISyncPoint {
virtual std::shared_ptr<NCommon::IDataSource> OnAddSource(const std::shared_ptr<NCommon::IDataSource>& source) override {
AFL_VERIFY(FetchedCount < Limit)("fetched", FetchedCount)("limit", Limit);
Iterators.emplace_back(TSourceIterator(source));
DebugOrder.emplace_back(source->GetSourceId());
std::push_heap(Iterators.begin(), Iterators.end());
return TBase::OnAddSource(source);
}
Expand Down
148 changes: 147 additions & 1 deletion ydb/tests/olap/order_by_with_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import yatest.common
import ydb
import random
import string
import datetime

from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
from ydb.tests.library.harness.kikimr_runner import KiKiMR
Expand All @@ -21,6 +23,9 @@ def setup_class(cls):
ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY"))
logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
config = KikimrConfigGenerator(
extra_feature_flags={
"enable_columnshard_bool": True
},
column_shard_config={
"compaction_enabled": False,
"deduplication_enabled": True,
Expand Down Expand Up @@ -53,7 +58,7 @@ def write_data(self, table: str):
row,
)

def test(self):
def test_random(self):
test_dir = f"{self.ydb_client.database}/{self.test_name}"
table_path = f"{test_dir}/table"

Expand Down Expand Up @@ -100,3 +105,144 @@ def test(self):
keys = [row['id'] for result_set in result_sets for row in result_set.rows]

assert keys == answer, keys

def random_char(self):
characters = string.ascii_letters + string.digits
return random.choice(characters)

def random_string(self, size: int):
result_string = ''.join(self.random_char() for i in range(size))
return result_string

def gen_portion(self, start_idx: int, portion_size: int, start: str):
return [{"id": i, "value": start + self.random_string(1000)} for i in range(start_idx, start_idx + portion_size)]

def test_fetch_race(self):
test_dir = f"{self.ydb_client.database}/{self.test_name}"
table_path = f"{test_dir}/table"

self.ydb_client.query(
f"""
CREATE TABLE `{table_path}` (
id Uint64 NOT NULL,
value Utf8 NOT NULL,
PRIMARY KEY(id, value),
)
WITH (
STORE = COLUMN,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1
)
"""
)

column_types = ydb.BulkUpsertColumns()
column_types.add_column("id", ydb.PrimitiveType.Uint64)
column_types.add_column("value", ydb.PrimitiveType.Utf8)

big_portion1 = self.gen_portion(1, 10000, "3")
small_portion = self.gen_portion(1, 1, "2")
big_portion2 = self.gen_portion(1, 10000, "1")

self.ydb_client.bulk_upsert(table_path, column_types, big_portion1)
self.ydb_client.bulk_upsert(table_path, column_types, small_portion)
self.ydb_client.bulk_upsert(table_path, column_types, big_portion2)

result_sets = self.ydb_client.query(
f"""
select id, value from `{table_path}`
order by id, value limit 1000
"""
)

keys = [row['id'] for result_set in result_sets for row in result_set.rows]

assert len(keys) == 1000
assert max(keys) == 500

def test_filtered_portion(self):
test_dir = f"{self.ydb_client.database}/{self.test_name}"
table_path = f"{test_dir}/table"

self.ydb_client.query(
f"""
CREATE TABLE `{table_path}` (
id Uint64 NOT NULL,
value bool NOT NULL,
PRIMARY KEY(id),
)
WITH (
STORE = COLUMN,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1
)
"""
)

column_types = ydb.BulkUpsertColumns()
column_types.add_column("id", ydb.PrimitiveType.Uint64)
column_types.add_column("value", ydb.PrimitiveType.Bool)

portion1 = [{"id" : 1, "value" : True}]
portion2 = [{"id" : 2, "value" : False}, {"id" : 3, "value" : False}]
portion3 = [{"id" : 2, "value" : True}]
portion4 = [{"id" : 2, "value" : True}, {"id" : 3, "value" : True}]

self.ydb_client.bulk_upsert(table_path, column_types, portion1)
self.ydb_client.bulk_upsert(table_path, column_types, portion2)
self.ydb_client.bulk_upsert(table_path, column_types, portion3)
self.ydb_client.bulk_upsert(table_path, column_types, portion4)

result_sets = self.ydb_client.query(
f"""
select id, value from `{table_path}`
order by id limit 10
"""
)

result = [(row['id'], row['value']) for result_set in result_sets for row in result_set.rows]

assert len(result) == 3
assert result == [(1, True), (2, True), (3, True)]

def test_stress_sorting(self):
test_dir = f"{self.ydb_client.database}/{self.test_name}"
table_path = f"{test_dir}/table"

self.ydb_client.query(
f"""
CREATE TABLE `{table_path}` (
time Timestamp NOT NULL,
uniq Utf8 NOT NULL,
class Utf8 NOT NULL,
value Utf8,
PRIMARY KEY(time, class, uniq),
)
WITH (
STORE = COLUMN,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1
)
"""
)

column_types = ydb.BulkUpsertColumns()
column_types.add_column("time", ydb.PrimitiveType.Timestamp)
column_types.add_column("class", ydb.PrimitiveType.Utf8)
column_types.add_column("uniq", ydb.PrimitiveType.Utf8)
column_types.add_column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8))

portions = [[{"time" : datetime.datetime.fromtimestamp(1234567890),
"class" : self.random_char(),
"uniq" : self.random_string(20),
"value" : self.random_char()
}] for _ in range(10000)]

for portion in portions:
self.ydb_client.bulk_upsert(table_path, column_types, portion)

result_sets = self.ydb_client.query(
f"""
select * from `{table_path}`
order by time, class, uniq limit 1000
"""
)

assert len(result_sets[0].rows) == 1000
Loading