diff --git a/ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_ut.cpp b/ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_ut.cpp index e264ddee8fe3..b166c5631c77 100644 --- a/ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_ut.cpp +++ b/ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_ut.cpp @@ -10,8 +10,6 @@ namespace NKikimr::NKqp::NScheduler { -Y_UNIT_TEST_SUITE(TKqpScheduler) { - namespace { // hardcoded from ydb/core/protos/table_service_config.proto constexpr TDelayParams kDefaultDelayParams{ @@ -35,10 +33,13 @@ namespace { return tasks; } - void UpdateDemand(std::vector& tasks, ui64 demand) { + void ShrinkDemand(std::vector& tasks, ui64 demand) { + Y_ENSURE(demand * 2 < tasks.size()); tasks.resize(2 * demand); } -} +} // namespace + +Y_UNIT_TEST_SUITE(TKqpScheduler) { Y_UNIT_TEST(SingleDatabasePoolQueryStructure) { /* @@ -713,40 +714,29 @@ namespace { tasks.emplace_back(CreateDemandTasks(query, kQueryDemand)); } - scheduler.UpdateFairShare(); + auto CheckFairShare = [&](const std::vector& expectedFairShare) { + scheduler.UpdateFairShare(); - std::vector fairShares = {5, 4, 1}; - for (size_t queryId = 0; queryId < queries.size(); ++queryId) { - auto query = queries[queryId]; - auto querySnapshot = query->GetSnapshot(); - UNIT_ASSERT(querySnapshot); - UNIT_ASSERT_VALUES_EQUAL(querySnapshot->FairShare, fairShares[queryId]); - } + for (size_t queryId = 0; queryId < queries.size(); ++queryId) { + auto querySnapshot = queries.at(queryId)->GetSnapshot(); + UNIT_ASSERT(querySnapshot); + UNIT_ASSERT_VALUES_EQUAL_C(querySnapshot->FairShare, expectedFairShare.at(queryId), + "Wrong fair-share for query " << queryId); + } + }; + + CheckFairShare({5, 4, 1}); + // Add one more query NHdrf::NDynamic::TQueryPtr new_query = queries.emplace_back(scheduler.AddOrUpdateQuery(databaseId, poolId, 4, {})); tasks.emplace_back(CreateDemandTasks(new_query, kQueryDemand)); - scheduler.UpdateFairShare(); + CheckFairShare({5, 3, 1, 1}); - // distribution in FIFO ordering - fairShares = {5, 3, 1, 1}; - for (size_t queryId = 0; queryId < queries.size(); ++queryId) { - auto query = queries[queryId]; - auto querySnapshot = query->GetSnapshot(); - UNIT_ASSERT(querySnapshot); - UNIT_ASSERT_VALUES_EQUAL(querySnapshot->FairShare, fairShares[queryId]); - } + // Shrink demand of the first query + ShrinkDemand(tasks[0], 2); - UpdateDemand(tasks[0], 2); - scheduler.UpdateFairShare(); - - fairShares = {2, 5, 2, 1}; - for (size_t queryId = 0; queryId < queries.size(); ++queryId) { - auto query = queries[queryId]; - auto querySnapshot = query->GetSnapshot(); - UNIT_ASSERT(querySnapshot); - UNIT_ASSERT_VALUES_EQUAL(querySnapshot->FairShare, fairShares[queryId]); - } + CheckFairShare({2, 5, 2, 1}); auto* poolSnapshot = queries[0]->GetSnapshot()->GetParent(); UNIT_ASSERT(poolSnapshot); diff --git a/ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.cpp b/ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.cpp index 149ab763e386..754b9dddb407 100644 --- a/ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.cpp +++ b/ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.cpp @@ -75,8 +75,11 @@ void TSchedulableTask::DecreaseUsage(const TDuration& burstUsage, bool forcedRes } size_t TSchedulableTask::GetSpareUsage() const { - if (const auto snapshot = Query->GetSnapshot()) { - auto usage = Query->GetParent()->Usage.load(); + const auto snapshot = Query->GetSnapshot(); + const auto* parent = Query->GetParent(); + if (snapshot && parent) { + // TODO: check this code when the pool removal will be implemented, since the `parent` may be gone. + auto usage = parent->Usage.load(); auto fairShare = snapshot->GetParent()->FairShare; return fairShare >= usage ? (fairShare - usage) : 0; } diff --git a/ydb/core/kqp/runtime/scheduler/tree/common.h b/ydb/core/kqp/runtime/scheduler/tree/common.h index c987a55c105f..f499f9f9927a 100644 --- a/ydb/core/kqp/runtime/scheduler/tree/common.h +++ b/ydb/core/kqp/runtime/scheduler/tree/common.h @@ -7,6 +7,7 @@ #include #include +#include template concept CStaticallyDowncastable = requires(Base* b) { @@ -76,21 +77,14 @@ namespace NKikimr::NKqp::NScheduler::NHdrf { void AddChild(const TPtr& element) { Y_ENSURE(Y_LIKELY(element)); - Children.push_back(element); + Y_ENSURE(Children.insert(element).second); element->Parent = this; } void RemoveChild(const TPtr& element) { Y_ENSURE(Y_LIKELY(element)); - for (auto it = Children.begin(); it != Children.end(); ++it) { - if (*it == element) { - element->Parent = nullptr; - Children.erase(it); - return; - } - } - - // TODO: throw exception that child not found. + Y_ENSURE(Children.erase(element)); + element->Parent = nullptr; } inline size_t ChildrenSize() const { @@ -139,9 +133,20 @@ namespace NKikimr::NKqp::NScheduler::NHdrf { TTreeElementBase* Parent = nullptr; private: - std::vector Children; + struct TCompareChildren { + bool operator() (const TPtr& left, const TPtr& right) const; + }; + + std::set Children; }; + template + bool TTreeElementBase::TCompareChildren::operator() ( + const TTreeElementBase::TPtr& left, const TTreeElementBase::TPtr& right + ) const { + return left->GetId() < right->GetId(); + } + template struct TQuery : public virtual TTreeElementBase { using TBase = TTreeElementBase; @@ -171,6 +176,7 @@ namespace NKikimr::NKqp::NScheduler::NHdrf { NMonitoring::TDynamicCounters::TCounterPtr FairShare; NMonitoring::TDynamicCounters::TCounterPtr InFlight; NMonitoring::TDynamicCounters::TCounterPtr Waiting; + NMonitoring::TDynamicCounters::TCounterPtr Queries; NMonitoring::TDynamicCounters::TCounterPtr Satisfaction; NMonitoring::TDynamicCounters::TCounterPtr AdjustedSatisfaction; NMonitoring::TDynamicCounters::TCounterPtr InFlightExtra; diff --git a/ydb/core/kqp/runtime/scheduler/tree/dynamic.cpp b/ydb/core/kqp/runtime/scheduler/tree/dynamic.cpp index 482f6f31fd36..d508eb300e73 100644 --- a/ydb/core/kqp/runtime/scheduler/tree/dynamic.cpp +++ b/ydb/core/kqp/runtime/scheduler/tree/dynamic.cpp @@ -112,6 +112,7 @@ TPool::TPool(const TPoolId& id, const TIntrusivePtr& counters, con Counters->Demand = group->GetCounter("Demand", false); // snapshot Counters->InFlight = group->GetCounter("InFlight", false); Counters->Waiting = group->GetCounter("Waiting", false); + Counters->Queries = group->GetCounter("Queries", false); Counters->Usage = group->GetCounter("Usage", true); Counters->UsageResume = group->GetCounter("UsageResume", true); Counters->Throttle = group->GetCounter("Throttle", true); @@ -143,6 +144,9 @@ NSnapshot::TPool* TPool::TakeSnapshot() { } if (IsLeaf()) { + if (Counters) { + Counters->Queries->Set(ChildrenSize()); + } ForEachChild([&](TQuery* query, size_t) { newPool->AddQuery(NSnapshot::TQueryPtr(query->TakeSnapshot())); });