Skip to content

Commit 77e7e8e

Browse files
committed
fix
1 parent 7b591b5 commit 77e7e8e

File tree

3 files changed

+13
-6
lines changed

3 files changed

+13
-6
lines changed

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/constructors.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class TSourceConstructor: public ICursorEntity, public TMoveOnly {
6464
}
6565

6666
bool operator()(const TSourceConstructor& l, const TSourceConstructor& r) const {
67-
return r.Start < l.Start;
67+
return std::make_pair(r.Start, r.GetSourceId()) < std::make_pair(l.Start, l.GetSourceId());
6868
}
6969
};
7070

@@ -75,7 +75,7 @@ class TPortionsSources: public NCommon::TSourcesConstructorWithAccessors<TSource
7575
private:
7676
using TBase = NCommon::TSourcesConstructorWithAccessors<TSourceConstructor>;
7777
ui32 CurrentSourceIdx = 0;
78-
std::vector<TInsertWriteId> Uncommitted;
78+
std::vector<TInsertWriteId> Uncommitted;
7979

8080
virtual void DoFillReadStats(TReadStats& stats) const override {
8181
ui64 compactedPortionsBytes = 0;

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,23 +50,29 @@ ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady(
5050
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("Iterator", it.DebugString());
5151
}
5252
for (auto it : DebugOrder) {
53-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it);
53+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("DebugOrder", it.DebugString());
54+
}
55+
for (auto it : SourcesSequentially) {
56+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("SourcesSequentially", it->GetSourceId());
5457
}
5558
if (FindIf(Iterators, [&](const auto& item) { return item.GetSourceId() == source->GetSourceId(); }) != Iterators.end()) {
5659
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "portion is in heap")
5760
("front", Iterators.front().DebugString())
61+
("back", Iterators.back().DebugString())
5862
("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())
5963
("source_id", source->GetSourceId());
6064
}
61-
else if (Find(DebugOrder, source->GetSourceId()) != DebugOrder.end()) {
65+
else if (FindIf(DebugOrder, [&](const auto& item) { return item.GetSourceId() == source->GetSourceId(); }) != DebugOrder.end()) {
6266
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "known portion, not in heap")
6367
("front", Iterators.front().DebugString())
68+
("back", Iterators.back().DebugString())
6469
("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())
6570
("source_id", source->GetSourceId());
6671
}
6772
else {
6873
AFL_VERIFY(Iterators.front().GetSourceId() == source->GetSourceId())("issue #28037", "unknown portion")
6974
("front", Iterators.front().DebugString())
75+
("back", Iterators.back().DebugString())
7076
("source", source->GetAs<TPortionDataSource>()->GetStart().DebugString())
7177
("source_id", source->GetSourceId());
7278
}
@@ -112,6 +118,7 @@ TString TSyncPointLimitControl::TSourceIterator::DebugString() const {
112118
sb << "f=" << IsFilled() << ";";
113119
sb << "record=" << SortableRecord->DebugJson() << ";";
114120
sb << "start=" << Source->GetAs<TPortionDataSource>()->GetStart().DebugString() << ";";
121+
sb << "finish=" << Source->GetAs<TPortionDataSource>()->GetFinish().DebugString() << ";";
115122
return sb;
116123
}
117124

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class TSyncPointLimitControl: public ISyncPoint {
117117
};
118118

119119
std::vector<TSourceIterator> Iterators;
120-
std::vector<ui64> DebugOrder;
120+
std::vector<TSourceIterator> DebugOrder;
121121

122122
virtual bool IsFinished() const override {
123123
return FetchedCount >= Limit || TBase::IsFinished();
@@ -126,7 +126,7 @@ class TSyncPointLimitControl: public ISyncPoint {
126126
virtual std::shared_ptr<NCommon::IDataSource> OnAddSource(const std::shared_ptr<NCommon::IDataSource>& source) override {
127127
AFL_VERIFY(FetchedCount < Limit);
128128
Iterators.emplace_back(TSourceIterator(source));
129-
DebugOrder.emplace_back(source->GetSourceId());
129+
DebugOrder.emplace_back(TSourceIterator(source));
130130
std::push_heap(Iterators.begin(), Iterators.end());
131131
return TBase::OnAddSource(source);
132132
}

0 commit comments

Comments
 (0)