Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 21 additions & 31 deletions ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

namespace NKikimr::NKqp::NScheduler {

Y_UNIT_TEST_SUITE(TKqpScheduler) {

namespace {
// hardcoded from ydb/core/protos/table_service_config.proto
constexpr TDelayParams kDefaultDelayParams{
Expand All @@ -35,10 +33,13 @@ namespace {
return tasks;
}

void UpdateDemand(std::vector<TSchedulableTaskPtr>& tasks, ui64 demand) {
void ShrinkDemand(std::vector<TSchedulableTaskPtr>& tasks, ui64 demand) {
Y_ENSURE(demand * 2 < tasks.size());
tasks.resize(2 * demand);
}
}
} // namespace

Y_UNIT_TEST_SUITE(TKqpScheduler) {

Y_UNIT_TEST(SingleDatabasePoolQueryStructure) {
/*
Expand Down Expand Up @@ -713,40 +714,29 @@ namespace {
tasks.emplace_back(CreateDemandTasks(query, kQueryDemand));
}

scheduler.UpdateFairShare();
auto CheckFairShare = [&](const std::vector<ui64>& expectedFairShare) {
scheduler.UpdateFairShare();

std::vector<ui64> fairShares = {5, 4, 1};
for (size_t queryId = 0; queryId < queries.size(); ++queryId) {
auto query = queries[queryId];
auto querySnapshot = query->GetSnapshot();
UNIT_ASSERT(querySnapshot);
UNIT_ASSERT_VALUES_EQUAL(querySnapshot->FairShare, fairShares[queryId]);
}
for (size_t queryId = 0; queryId < queries.size(); ++queryId) {
auto querySnapshot = queries.at(queryId)->GetSnapshot();
UNIT_ASSERT(querySnapshot);
UNIT_ASSERT_VALUES_EQUAL_C(querySnapshot->FairShare, expectedFairShare.at(queryId),
"Wrong fair-share for query " << queryId);
}
};

CheckFairShare({5, 4, 1});

// Add one more query
NHdrf::NDynamic::TQueryPtr new_query = queries.emplace_back(scheduler.AddOrUpdateQuery(databaseId, poolId, 4, {}));
tasks.emplace_back(CreateDemandTasks(new_query, kQueryDemand));

scheduler.UpdateFairShare();
CheckFairShare({5, 3, 1, 1});

// distribution in FIFO ordering
fairShares = {5, 3, 1, 1};
for (size_t queryId = 0; queryId < queries.size(); ++queryId) {
auto query = queries[queryId];
auto querySnapshot = query->GetSnapshot();
UNIT_ASSERT(querySnapshot);
UNIT_ASSERT_VALUES_EQUAL(querySnapshot->FairShare, fairShares[queryId]);
}
// Shrink demand of the first query
ShrinkDemand(tasks[0], 2);

UpdateDemand(tasks[0], 2);
scheduler.UpdateFairShare();

fairShares = {2, 5, 2, 1};
for (size_t queryId = 0; queryId < queries.size(); ++queryId) {
auto query = queries[queryId];
auto querySnapshot = query->GetSnapshot();
UNIT_ASSERT(querySnapshot);
UNIT_ASSERT_VALUES_EQUAL(querySnapshot->FairShare, fairShares[queryId]);
}
CheckFairShare({2, 5, 2, 1});

auto* poolSnapshot = queries[0]->GetSnapshot()->GetParent();
UNIT_ASSERT(poolSnapshot);
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ void TSchedulableTask::DecreaseUsage(const TDuration& burstUsage, bool forcedRes
}

size_t TSchedulableTask::GetSpareUsage() const {
if (const auto snapshot = Query->GetSnapshot()) {
auto usage = Query->GetParent()->Usage.load();
const auto snapshot = Query->GetSnapshot();
const auto* parent = Query->GetParent();
if (snapshot && parent) {
// TODO: check this code when the pool removal will be implemented, since the `parent` may be gone.
auto usage = parent->Usage.load();
auto fairShare = snapshot->GetParent()->FairShare;
return fairShare >= usage ? (fairShare - usage) : 0;
}
Expand Down
28 changes: 17 additions & 11 deletions ydb/core/kqp/runtime/scheduler/tree/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <util/generic/hash.h>

#include <optional>
#include <set>

template <typename Base, typename Derived>
concept CStaticallyDowncastable = requires(Base* b) {
Expand Down Expand Up @@ -76,21 +77,14 @@ namespace NKikimr::NKqp::NScheduler::NHdrf {

void AddChild(const TPtr& element) {
Y_ENSURE(Y_LIKELY(element));
Children.push_back(element);
Y_ENSURE(Children.insert(element).second);
element->Parent = this;
}

void RemoveChild(const TPtr& element) {
Y_ENSURE(Y_LIKELY(element));
for (auto it = Children.begin(); it != Children.end(); ++it) {
if (*it == element) {
element->Parent = nullptr;
Children.erase(it);
return;
}
}

// TODO: throw exception that child not found.
Y_ENSURE(Children.erase(element));
element->Parent = nullptr;
}

inline size_t ChildrenSize() const {
Expand Down Expand Up @@ -139,9 +133,20 @@ namespace NKikimr::NKqp::NScheduler::NHdrf {
TTreeElementBase* Parent = nullptr;

private:
std::vector<TPtr> Children;
struct TCompareChildren {
bool operator() (const TPtr& left, const TPtr& right) const;
};

std::set<TPtr, TCompareChildren> Children;
};

template <ETreeType T>
bool TTreeElementBase<T>::TCompareChildren::operator() (
const TTreeElementBase<T>::TPtr& left, const TTreeElementBase<T>::TPtr& right
) const {
return left->GetId() < right->GetId();
}

template <ETreeType T>
struct TQuery : public virtual TTreeElementBase<T> {
using TBase = TTreeElementBase<T>;
Expand Down Expand Up @@ -171,6 +176,7 @@ namespace NKikimr::NKqp::NScheduler::NHdrf {
NMonitoring::TDynamicCounters::TCounterPtr FairShare;
NMonitoring::TDynamicCounters::TCounterPtr InFlight;
NMonitoring::TDynamicCounters::TCounterPtr Waiting;
NMonitoring::TDynamicCounters::TCounterPtr Queries;
NMonitoring::TDynamicCounters::TCounterPtr Satisfaction;
NMonitoring::TDynamicCounters::TCounterPtr AdjustedSatisfaction;
NMonitoring::TDynamicCounters::TCounterPtr InFlightExtra;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/runtime/scheduler/tree/dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ TPool::TPool(const TPoolId& id, const TIntrusivePtr<TKqpCounters>& counters, con
Counters->Demand = group->GetCounter("Demand", false); // snapshot
Counters->InFlight = group->GetCounter("InFlight", false);
Counters->Waiting = group->GetCounter("Waiting", false);
Counters->Queries = group->GetCounter("Queries", false);
Counters->Usage = group->GetCounter("Usage", true);
Counters->UsageResume = group->GetCounter("UsageResume", true);
Counters->Throttle = group->GetCounter("Throttle", true);
Expand Down Expand Up @@ -143,6 +144,9 @@ NSnapshot::TPool* TPool::TakeSnapshot() {
}

if (IsLeaf()) {
if (Counters) {
Counters->Queries->Set(ChildrenSize());
}
ForEachChild<TQuery>([&](TQuery* query, size_t) {
newPool->AddQuery(NSnapshot::TQueryPtr(query->TakeSnapshot()));
});
Expand Down
Loading