diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 43859f56f3d0..de245127f6cd 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2410,7 +2410,7 @@ class TKqpDataExecuter : public TKqpExecuterBase TKqpPlanner::AssignTasksToNodes() { return nullptr; } + if (ResourcesSnapshot.empty()) { + ResourcesSnapshot = std::move(GetKqpResourceManager()->GetClusterResources()); + } + if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) { // try to run without memory overflow settings if (LocalRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] && @@ -407,6 +411,8 @@ std::unique_ptr TKqpPlanner::PlanExecution() { nComputeTasks = ComputeTasks.size(); + // explicit requirement to execute task on the same node because it has dependencies + // on datashard tx. if (LocalComputeTasks) { bool shareMailbox = (ComputeTasks.size() <= 1); for (ui64 taskId : ComputeTasks) { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index fabda6bf1bf9..29507ae895fd 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -104,7 +104,7 @@ class TKqpPlanner { const bool WithSpilling; const TMaybe RlPath; THashSet TrackingNodes; - const TVector ResourcesSnapshot; + TVector ResourcesSnapshot; NWilson::TSpan& ExecuterSpan; const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig; ui64 LocalRunMemoryEst = 0; diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 319d13495fa7..b7b0491e4665 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -599,6 +599,23 @@ class TKqpResourceManager : public IKqpResourceManager { FireResourcesPublishing(); } + TVector GetClusterResources() const override { + TVector resources; + Y_ABORT_UNLESS(PublishResourcesByExchanger); + + if (PublishResourcesByExchanger) { + std::shared_ptr> infos; + with_lock (ResourceSnapshotState->Lock) { + infos = ResourceSnapshotState->Snapshot; + } + if (infos != nullptr) { + resources = *infos; + } + } + + return resources; + } + void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override { LOG_AS_D("Schedule Snapshot request"); if (PublishResourcesByExchanger) { diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index f21a2ccb5a48..079690faec61 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -91,6 +91,7 @@ class IKqpResourceManager : private TNonCopyable { virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0; + virtual TVector GetClusterResources() const = 0; virtual TKqpLocalNodeResources GetLocalResources() const = 0; virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0; diff --git a/ydb/core/tx/datashard/datashard_ut_trace.cpp b/ydb/core/tx/datashard/datashard_ut_trace.cpp index 272928a4eaad..13a8df250854 100644 --- a/ydb/core/tx/datashard/datashard_ut_trace.cpp +++ b/ydb/core/tx/datashard/datashard_ut_trace.cpp @@ -87,10 +87,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { auto [runtime, server, sender] = TestCreateServer(); CreateShardedTable(server, sender, "/Root", "table-1", 1, false); - + TFakeWilsonUploader *uploader = new TFakeWilsonUploader(); TActorId uploaderId = runtime.Register(uploader, 0); - runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); + runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); runtime.SimulateSleep(TDuration::Seconds(10)); const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions(); @@ -129,7 +129,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { CheckTxHasDatashardUnits(propose, 3); auto progress = tabletTxs[1]; - CheckTxHasWriteLog(progress); + CheckTxHasWriteLog(progress); CheckTxHasDatashardUnits(progress, usesVolatileTxs ? 6 : 11); } @@ -166,12 +166,12 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { Y_UNIT_TEST(TestTraceDistributedSelect) { auto [runtime, server, sender] = TestCreateServer(); bool bTreeIndex = runtime.GetAppData().FeatureFlags.GetEnableLocalDBBtreeIndex(); - + CreateShardedTable(server, sender, "/Root", "table-1", 1, false); - + TFakeWilsonUploader *uploader = new TFakeWilsonUploader(); TActorId uploaderId = runtime.Register(uploader, 0); - runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); + runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); runtime.SimulateSleep(TDuration::Seconds(10)); SplitTable(runtime, server, 5); @@ -230,7 +230,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { UNIT_ASSERT_VALUES_EQUAL(dsReads.size(), 2); canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) " - ", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) " + ", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) " ", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read " "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) " ", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) " @@ -239,18 +239,18 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { "-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) " ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) " ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog " - "-> [(Tablet.WriteLog.LogEntry)])])])])])])])"; + "-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])"; if (bTreeIndex) { // no index nodes (levels = 0) canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) " - ", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) " + ", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) " ", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read " "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) " ", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) " ", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction " "-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) " ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog " - "-> [(Tablet.WriteLog.LogEntry)])])])])])])])"; + "-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])"; } } else { @@ -281,7 +281,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { } canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) " - ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , " + ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (RunTasks) , " "(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " "(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " @@ -290,10 +290,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " "(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> " - "[(Tablet.WriteLog.LogEntry)])])])])])"; + "[(Tablet.WriteLog.LogEntry)])])]) , (ComputeActor)])])"; } - - + + UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString()); } @@ -345,13 +345,13 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { UNIT_ASSERT_VALUES_EQUAL(dsReads.size(), 2); std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , " - "(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , " + "(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , " "(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , " "(Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " "(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])" "]) , (Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> " "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> " - "[(Tablet.WriteLog.LogEntry)])])])])])])])"; + "[(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])"; UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString()); } @@ -363,7 +363,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { TFakeWilsonUploader *uploader = new TFakeWilsonUploader(); TActorId uploaderId = runtime.Register(uploader, 0); - runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); + runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); runtime.SimulateSleep(TDuration::Seconds(10)); NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId(15, 4095); @@ -380,16 +380,16 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { UNIT_ASSERT_VALUES_EQUAL(1, uploader->Traces.size()); TFakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second; - + auto wtSpan = trace.Root.BFSFindOne("Datashard.WriteTransaction"); UNIT_ASSERT(wtSpan); - + auto tabletTxs = wtSpan->get().FindAll("Tablet.Transaction"); UNIT_ASSERT_VALUES_EQUAL(1, tabletTxs.size()); auto writeTx = tabletTxs[0]; - CheckTxHasWriteLog(writeTx); - CheckTxHasDatashardUnits(writeTx, 5); + CheckTxHasWriteLog(writeTx); + CheckTxHasDatashardUnits(writeTx, 5); std::string canon = "(Datashard.WriteTransaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> " "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "