Skip to content

Commit

Permalink
Merge ae83708 into 6c22632
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit committed May 3, 2024
2 parents 6c22632 + ae83708 commit bcb5eb4
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 23 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2410,7 +2410,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

const bool singlePartitionOptAllowed = !HasOlapTable && !UnknownAffectedShardCount && !HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty();
const bool useDataQueryPool = !(HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty());
const bool localComputeTasks = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.empty());
const bool localComputeTasks = !DatashardTxs.empty();

Planner = CreateKqpPlanner({
.TasksGraph = TasksGraph,
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
return nullptr;
}

if (ResourcesSnapshot.empty()) {
ResourcesSnapshot = std::move(GetKqpResourceManager()->GetClusterResources());
}

if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) {
// try to run without memory overflow settings
if (LocalRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&
Expand Down Expand Up @@ -407,6 +411,8 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {

nComputeTasks = ComputeTasks.size();

// explicit requirement to execute task on the same node because it has dependencies
// on datashard tx.
if (LocalComputeTasks) {
bool shareMailbox = (ComputeTasks.size() <= 1);
for (ui64 taskId : ComputeTasks) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class TKqpPlanner {
const bool WithSpilling;
const TMaybe<NKikimrKqp::TRlPath> RlPath;
THashSet<ui32> TrackingNodes;
const TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
NWilson::TSpan& ExecuterSpan;
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig;
ui64 LocalRunMemoryEst = 0;
Expand Down
17 changes: 17 additions & 0 deletions ydb/core/kqp/rm_service/kqp_rm_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,23 @@ class TKqpResourceManager : public IKqpResourceManager {
FireResourcesPublishing();
}

TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const override {
TVector<NKikimrKqp::TKqpNodeResources> resources;
Y_ABORT_UNLESS(PublishResourcesByExchanger);

if (PublishResourcesByExchanger) {
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
with_lock (ResourceSnapshotState->Lock) {
infos = ResourceSnapshotState->Snapshot;
}
if (infos != nullptr) {
resources = *infos;
}
}

return resources;
}

void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
LOG_AS_D("Schedule Snapshot request");
if (PublishResourcesByExchanger) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/rm_service/kqp_rm_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class IKqpResourceManager : private TNonCopyable {

virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0;

virtual TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const = 0;
virtual TKqpLocalNodeResources GetLocalResources() const = 0;
virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0;

Expand Down
42 changes: 21 additions & 21 deletions ydb/core/tx/datashard/datashard_ut_trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
auto [runtime, server, sender] = TestCreateServer();

CreateShardedTable(server, sender, "/Root", "table-1", 1, false);

TFakeWilsonUploader *uploader = new TFakeWilsonUploader();
TActorId uploaderId = runtime.Register(uploader, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.SimulateSleep(TDuration::Seconds(10));

const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions();
Expand Down Expand Up @@ -129,7 +129,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
CheckTxHasDatashardUnits(propose, 3);

auto progress = tabletTxs[1];
CheckTxHasWriteLog(progress);
CheckTxHasWriteLog(progress);
CheckTxHasDatashardUnits(progress, usesVolatileTxs ? 6 : 11);
}

Expand Down Expand Up @@ -166,12 +166,12 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
Y_UNIT_TEST(TestTraceDistributedSelect) {
auto [runtime, server, sender] = TestCreateServer();
bool bTreeIndex = runtime.GetAppData().FeatureFlags.GetEnableLocalDBBtreeIndex();

CreateShardedTable(server, sender, "/Root", "table-1", 1, false);

TFakeWilsonUploader *uploader = new TFakeWilsonUploader();
TActorId uploaderId = runtime.Register(uploader, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.SimulateSleep(TDuration::Seconds(10));

SplitTable(runtime, server, 5);
Expand Down Expand Up @@ -230,7 +230,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
UNIT_ASSERT_VALUES_EQUAL(dsReads.size(), 2);

canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) "
", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) "
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) "
Expand All @@ -239,18 +239,18 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
"-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
"-> [(Tablet.WriteLog.LogEntry)])])])])])])])";
"-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])";

if (bTreeIndex) { // no index nodes (levels = 0)
canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) "
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) "
", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) "
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction "
"-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) "
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
"-> [(Tablet.WriteLog.LogEntry)])])])])])])])";
"-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])";
}

} else {
Expand Down Expand Up @@ -281,7 +281,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
}

canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , "
", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (RunTasks) , "
"(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
"(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
"(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
Expand All @@ -290,10 +290,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
"(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
"(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
"[(Tablet.WriteLog.LogEntry)])])])])])";
"[(Tablet.WriteLog.LogEntry)])])]) , (ComputeActor)])])";
}


UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}

Expand Down Expand Up @@ -345,13 +345,13 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
UNIT_ASSERT_VALUES_EQUAL(dsReads.size(), 2);

std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , "
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , "
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , "
"(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
"(Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
"(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])"
"]) , (Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
"[(Tablet.WriteLog.LogEntry)])])])])])])])";
"[(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])";
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}

Expand All @@ -363,7 +363,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {

TFakeWilsonUploader *uploader = new TFakeWilsonUploader();
TActorId uploaderId = runtime.Register(uploader, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
runtime.SimulateSleep(TDuration::Seconds(10));

NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId(15, 4095);
Expand All @@ -380,16 +380,16 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
UNIT_ASSERT_VALUES_EQUAL(1, uploader->Traces.size());

TFakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second;

auto wtSpan = trace.Root.BFSFindOne("Datashard.WriteTransaction");
UNIT_ASSERT(wtSpan);

auto tabletTxs = wtSpan->get().FindAll("Tablet.Transaction");
UNIT_ASSERT_VALUES_EQUAL(1, tabletTxs.size());
auto writeTx = tabletTxs[0];

CheckTxHasWriteLog(writeTx);
CheckTxHasDatashardUnits(writeTx, 5);
CheckTxHasWriteLog(writeTx);
CheckTxHasDatashardUnits(writeTx, 5);

std::string canon = "(Datashard.WriteTransaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
Expand Down

0 comments on commit bcb5eb4

Please sign in to comment.