From c987ac550bc687e5b49d454545bbecfe77e15d47 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Sun, 2 Nov 2025 17:41:26 +0300 Subject: [PATCH 1/3] Add metric for query count per pool --- ydb/core/kqp/runtime/scheduler/tree/common.h | 16 +++++----------- ydb/core/kqp/runtime/scheduler/tree/dynamic.cpp | 4 ++++ 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/ydb/core/kqp/runtime/scheduler/tree/common.h b/ydb/core/kqp/runtime/scheduler/tree/common.h index c987a55c105f..5f21fa2f5c02 100644 --- a/ydb/core/kqp/runtime/scheduler/tree/common.h +++ b/ydb/core/kqp/runtime/scheduler/tree/common.h @@ -76,21 +76,14 @@ namespace NKikimr::NKqp::NScheduler::NHdrf { void AddChild(const TPtr& element) { Y_ENSURE(Y_LIKELY(element)); - Children.push_back(element); + Y_ENSURE(Children.insert(element).second); element->Parent = this; } void RemoveChild(const TPtr& element) { Y_ENSURE(Y_LIKELY(element)); - for (auto it = Children.begin(); it != Children.end(); ++it) { - if (*it == element) { - element->Parent = nullptr; - Children.erase(it); - return; - } - } - - // TODO: throw exception that child not found. + Y_ENSURE(Children.erase(element)); + element->Parent = nullptr; } inline size_t ChildrenSize() const { @@ -139,7 +132,7 @@ namespace NKikimr::NKqp::NScheduler::NHdrf { TTreeElementBase* Parent = nullptr; private: - std::vector Children; + std::unordered_set Children; }; template @@ -171,6 +164,7 @@ namespace NKikimr::NKqp::NScheduler::NHdrf { NMonitoring::TDynamicCounters::TCounterPtr FairShare; NMonitoring::TDynamicCounters::TCounterPtr InFlight; NMonitoring::TDynamicCounters::TCounterPtr Waiting; + NMonitoring::TDynamicCounters::TCounterPtr Queries; NMonitoring::TDynamicCounters::TCounterPtr Satisfaction; NMonitoring::TDynamicCounters::TCounterPtr AdjustedSatisfaction; NMonitoring::TDynamicCounters::TCounterPtr InFlightExtra; diff --git a/ydb/core/kqp/runtime/scheduler/tree/dynamic.cpp b/ydb/core/kqp/runtime/scheduler/tree/dynamic.cpp index 482f6f31fd36..d508eb300e73 100644 --- a/ydb/core/kqp/runtime/scheduler/tree/dynamic.cpp +++ b/ydb/core/kqp/runtime/scheduler/tree/dynamic.cpp @@ -112,6 +112,7 @@ TPool::TPool(const TPoolId& id, const TIntrusivePtr& counters, con Counters->Demand = group->GetCounter("Demand", false); // snapshot Counters->InFlight = group->GetCounter("InFlight", false); Counters->Waiting = group->GetCounter("Waiting", false); + Counters->Queries = group->GetCounter("Queries", false); Counters->Usage = group->GetCounter("Usage", true); Counters->UsageResume = group->GetCounter("UsageResume", true); Counters->Throttle = group->GetCounter("Throttle", true); @@ -143,6 +144,9 @@ NSnapshot::TPool* TPool::TakeSnapshot() { } if (IsLeaf()) { + if (Counters) { + Counters->Queries->Set(ChildrenSize()); + } ForEachChild([&](TQuery* query, size_t) { newPool->AddQuery(NSnapshot::TQueryPtr(query->TakeSnapshot())); }); From b5c7dd67bcb8e4007bed42d5766cac95da0ed29e Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 6 Nov 2025 15:36:22 +0300 Subject: [PATCH 2/3] Fix concurrent parent reset --- ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.cpp b/ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.cpp index 149ab763e386..754b9dddb407 100644 --- a/ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.cpp +++ b/ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.cpp @@ -75,8 +75,11 @@ void TSchedulableTask::DecreaseUsage(const TDuration& burstUsage, bool forcedRes } size_t TSchedulableTask::GetSpareUsage() const { - if (const auto snapshot = Query->GetSnapshot()) { - auto usage = Query->GetParent()->Usage.load(); + const auto snapshot = Query->GetSnapshot(); + const auto* parent = Query->GetParent(); + if (snapshot && parent) { + // TODO: check this code when the pool removal will be implemented, since the `parent` may be gone. + auto usage = parent->Usage.load(); auto fairShare = snapshot->GetParent()->FairShare; return fairShare >= usage ? (fairShare - usage) : 0; } From 8c4ec8900600d6533fd1ef784df860851794cca9 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 6 Nov 2025 19:22:54 +0300 Subject: [PATCH 3/3] Sort children by id --- .../scheduler/kqp_compute_scheduler_ut.cpp | 52 ++++++++----------- ydb/core/kqp/runtime/scheduler/tree/common.h | 14 ++++- 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_ut.cpp b/ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_ut.cpp index e264ddee8fe3..b166c5631c77 100644 --- a/ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_ut.cpp +++ b/ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_ut.cpp @@ -10,8 +10,6 @@ namespace NKikimr::NKqp::NScheduler { -Y_UNIT_TEST_SUITE(TKqpScheduler) { - namespace { // hardcoded from ydb/core/protos/table_service_config.proto constexpr TDelayParams kDefaultDelayParams{ @@ -35,10 +33,13 @@ namespace { return tasks; } - void UpdateDemand(std::vector& tasks, ui64 demand) { + void ShrinkDemand(std::vector& tasks, ui64 demand) { + Y_ENSURE(demand * 2 < tasks.size()); tasks.resize(2 * demand); } -} +} // namespace + +Y_UNIT_TEST_SUITE(TKqpScheduler) { Y_UNIT_TEST(SingleDatabasePoolQueryStructure) { /* @@ -713,40 +714,29 @@ namespace { tasks.emplace_back(CreateDemandTasks(query, kQueryDemand)); } - scheduler.UpdateFairShare(); + auto CheckFairShare = [&](const std::vector& expectedFairShare) { + scheduler.UpdateFairShare(); - std::vector fairShares = {5, 4, 1}; - for (size_t queryId = 0; queryId < queries.size(); ++queryId) { - auto query = queries[queryId]; - auto querySnapshot = query->GetSnapshot(); - UNIT_ASSERT(querySnapshot); - UNIT_ASSERT_VALUES_EQUAL(querySnapshot->FairShare, fairShares[queryId]); - } + for (size_t queryId = 0; queryId < queries.size(); ++queryId) { + auto querySnapshot = queries.at(queryId)->GetSnapshot(); + UNIT_ASSERT(querySnapshot); + UNIT_ASSERT_VALUES_EQUAL_C(querySnapshot->FairShare, expectedFairShare.at(queryId), + "Wrong fair-share for query " << queryId); + } + }; + + CheckFairShare({5, 4, 1}); + // Add one more query NHdrf::NDynamic::TQueryPtr new_query = queries.emplace_back(scheduler.AddOrUpdateQuery(databaseId, poolId, 4, {})); tasks.emplace_back(CreateDemandTasks(new_query, kQueryDemand)); - scheduler.UpdateFairShare(); + CheckFairShare({5, 3, 1, 1}); - // distribution in FIFO ordering - fairShares = {5, 3, 1, 1}; - for (size_t queryId = 0; queryId < queries.size(); ++queryId) { - auto query = queries[queryId]; - auto querySnapshot = query->GetSnapshot(); - UNIT_ASSERT(querySnapshot); - UNIT_ASSERT_VALUES_EQUAL(querySnapshot->FairShare, fairShares[queryId]); - } + // Shrink demand of the first query + ShrinkDemand(tasks[0], 2); - UpdateDemand(tasks[0], 2); - scheduler.UpdateFairShare(); - - fairShares = {2, 5, 2, 1}; - for (size_t queryId = 0; queryId < queries.size(); ++queryId) { - auto query = queries[queryId]; - auto querySnapshot = query->GetSnapshot(); - UNIT_ASSERT(querySnapshot); - UNIT_ASSERT_VALUES_EQUAL(querySnapshot->FairShare, fairShares[queryId]); - } + CheckFairShare({2, 5, 2, 1}); auto* poolSnapshot = queries[0]->GetSnapshot()->GetParent(); UNIT_ASSERT(poolSnapshot); diff --git a/ydb/core/kqp/runtime/scheduler/tree/common.h b/ydb/core/kqp/runtime/scheduler/tree/common.h index 5f21fa2f5c02..f499f9f9927a 100644 --- a/ydb/core/kqp/runtime/scheduler/tree/common.h +++ b/ydb/core/kqp/runtime/scheduler/tree/common.h @@ -7,6 +7,7 @@ #include #include +#include template concept CStaticallyDowncastable = requires(Base* b) { @@ -132,9 +133,20 @@ namespace NKikimr::NKqp::NScheduler::NHdrf { TTreeElementBase* Parent = nullptr; private: - std::unordered_set Children; + struct TCompareChildren { + bool operator() (const TPtr& left, const TPtr& right) const; + }; + + std::set Children; }; + template + bool TTreeElementBase::TCompareChildren::operator() ( + const TTreeElementBase::TPtr& left, const TTreeElementBase::TPtr& right + ) const { + return left->GetId() < right->GetId(); + } + template struct TQuery : public virtual TTreeElementBase { using TBase = TTreeElementBase;