From 59478397fd4e4bdada4a0f669f4a61d2f69f9520 Mon Sep 17 00:00:00 2001 From: Andrey Zaspa Date: Wed, 5 Nov 2025 12:43:44 +0000 Subject: [PATCH] Filled database name in Scheme Cache Resolve shards requests --- ydb/core/grpc_services/rpc_import_data.cpp | 2 +- ydb/core/grpc_services/rpc_kh_describe.cpp | 2 +- ydb/core/grpc_services/rpc_object_storage.cpp | 2 +- ydb/core/grpc_services/rpc_read_columns.cpp | 2 +- ydb/core/grpc_services/rpc_read_rows.cpp | 1 + ydb/core/kqp/compute_actor/kqp_compute_actor.cpp | 9 +++++---- ydb/core/kqp/compute_actor/kqp_compute_actor.h | 3 ++- ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp | 7 +++++-- ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h | 7 ++++--- ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp | 1 + ydb/core/kqp/executer_actor/kqp_table_resolver.cpp | 4 +++- ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 3 +++ ydb/core/kqp/node_service/kqp_node_service.cpp | 2 +- ydb/core/kqp/runtime/kqp_read_actor.cpp | 1 + ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp | 2 +- ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 3 +++ ydb/core/kqp/runtime/kqp_vector_actor.cpp | 1 + ydb/core/kqp/runtime/kqp_write_actor.cpp | 1 + ydb/core/protos/kqp.proto | 1 + ydb/core/protos/tx_datashard.proto | 5 +++++ ydb/core/statistics/aggregator/aggregator_impl.cpp | 3 ++- ydb/core/statistics/aggregator/tx_resolve.cpp | 7 ++++--- .../aggregator/tx_response_tablet_distribution.cpp | 7 ++++--- ydb/core/tx/replication/service/base_table_writer.cpp | 1 + ydb/core/tx/tx_proxy/read_table_impl.cpp | 1 + ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 4 ++-- 26 files changed, 56 insertions(+), 26 deletions(-) diff --git a/ydb/core/grpc_services/rpc_import_data.cpp b/ydb/core/grpc_services/rpc_import_data.cpp index ffd36322f5da..52df200c934e 100644 --- a/ydb/core/grpc_services/rpc_import_data.cpp +++ b/ydb/core/grpc_services/rpc_import_data.cpp @@ -179,7 +179,7 @@ class TImportDataRPC: public TRpcRequestActor(); - request->DatabaseName = NKikimr::CanonizePath(GetDatabaseName()); + request->DatabaseName = GetDatabaseName(); request->ResultSet.emplace_back(std::move(KeyDesc)); request->ResultSet.back().Access = NACLib::UpdateRow; diff --git a/ydb/core/grpc_services/rpc_kh_describe.cpp b/ydb/core/grpc_services/rpc_kh_describe.cpp index dc780a74b53d..9c0fcfa4d708 100644 --- a/ydb/core/grpc_services/rpc_kh_describe.cpp +++ b/ydb/core/grpc_services/rpc_kh_describe.cpp @@ -261,7 +261,7 @@ class TKikhouseDescribeTableRPC : public TActorBootstrapped request(new NSchemeCache::TSchemeCacheRequest()); - + request->DatabaseName = Request->GetDatabaseName().GetOrElse(""); request->ResultSet.emplace_back(std::move(KeyRange)); TAutoPtr resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request)); diff --git a/ydb/core/grpc_services/rpc_object_storage.cpp b/ydb/core/grpc_services/rpc_object_storage.cpp index 0024c8eb1075..6ee9d7a8daf6 100644 --- a/ydb/core/grpc_services/rpc_object_storage.cpp +++ b/ydb/core/grpc_services/rpc_object_storage.cpp @@ -444,7 +444,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped request(new NSchemeCache::TSchemeCacheRequest()); - + request->DatabaseName = GrpcRequest->GetDatabaseName().GetOrElse(""); request->ResultSet.emplace_back(std::move(KeyRange)); TAutoPtr resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request)); diff --git a/ydb/core/grpc_services/rpc_read_columns.cpp b/ydb/core/grpc_services/rpc_read_columns.cpp index e6dd4056672f..b1f0bfd4d539 100644 --- a/ydb/core/grpc_services/rpc_read_columns.cpp +++ b/ydb/core/grpc_services/rpc_read_columns.cpp @@ -590,7 +590,7 @@ class TReadColumnsRPC : public TActorBootstrapped { << " fromInclusive: " << true); TAutoPtr request(new NSchemeCache::TSchemeCacheRequest()); - + request->DatabaseName = Request->GetDatabaseName().GetOrElse(""); request->ResultSet.emplace_back(std::move(KeyRange)); TAutoPtr resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request)); diff --git a/ydb/core/grpc_services/rpc_read_rows.cpp b/ydb/core/grpc_services/rpc_read_rows.cpp index 0a943f3f1383..6ec206e24c6b 100644 --- a/ydb/core/grpc_services/rpc_read_rows.cpp +++ b/ydb/core/grpc_services/rpc_read_rows.cpp @@ -445,6 +445,7 @@ class TReadRowsRPC : public TActorBootstrapped { auto keyRange = MakeHolder(entry.TableId, range, TKeyDesc::ERowOperation::Read, KeyColumnTypes, columns); auto request = std::make_unique(); + request->DatabaseName = GetDatabase(); request->ResultSet.emplace_back(std::move(keyRange)); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request.release()), 0, 0, Span.GetTraceId()); } diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 611c07a37f2a..5e5f421e743a 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -183,11 +183,12 @@ IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, } IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector&& computeActors, - const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings, const ui64 txId, - TMaybe lockTxId, ui32 lockNodeId, TMaybe lockMode, const TShardsScanningPolicy& shardsScanningPolicy, + const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings, + const TString& database, const ui64 txId, TMaybe lockTxId, ui32 lockNodeId, + TMaybe lockMode, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr counters, NWilson::TTraceId traceId, const TCPULimits& cpuLimits) { - return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, lockMode, meta, - shardsScanningPolicy, counters, std::move(traceId), cpuLimits); + return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, lockMode, + database, meta, shardsScanningPolicy, counters, std::move(traceId), cpuLimits); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index fa8937c53c36..d422fdf58a47 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -75,7 +75,8 @@ IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, NYql::N IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector&& computeActors, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings, - const ui64 txId, TMaybe lockTxId, ui32 lockNodeId, TMaybe lockMode, const TShardsScanningPolicy& shardsScanningPolicy, + const TString& database, const ui64 txId, TMaybe lockTxId, ui32 lockNodeId, + TMaybe lockMode, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr counters, NWilson::TTraceId traceId, const TCPULimits& cpuLimits); NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory( diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index f48628ed19d2..d83960a19803 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -24,12 +24,14 @@ constexpr TDuration PING_PERIOD = TDuration::Seconds(30); TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TComputeRuntimeSettings& settings, std::vector&& computeActors, const ui64 txId, const TMaybe lockTxId, const ui32 lockNodeId, - const TMaybe lockMode, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, - const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr counters, NWilson::TTraceId traceId, + const TMaybe lockMode, const TString& database, + const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, const TShardsScanningPolicy& shardsScanningPolicy, + TIntrusivePtr counters, NWilson::TTraceId traceId, const TCPULimits& cpuLimits) : Meta(meta) , ScanDataMeta(Meta) , RuntimeSettings(settings) + , Database(database) , TxId(txId) , LockTxId(lockTxId) , LockNodeId(lockNodeId) @@ -648,6 +650,7 @@ void TKqpScanFetcherActor::ResolveShard(TShardState& state) { << ", attempt #" << state.ResolveAttempt); auto request = MakeHolder(); + request->DatabaseName = Database; request->ResultSet.emplace_back(std::move(keyDesc)); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); } diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h index 38b3b207e9dd..60014cfa5283 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h @@ -50,6 +50,7 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped LockTxId; const ui32 LockNodeId; @@ -63,9 +64,9 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped&& computeActors, const ui64 txId, const TMaybe lockTxId, const ui32 lockNodeId, - const TMaybe lockMode, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, - const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr counters, NWilson::TTraceId traceId, - const TCPULimits& cpuLimits); + const TMaybe lockMode, const TString& database, + const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, const TShardsScanningPolicy& shardsScanningPolicy, + TIntrusivePtr counters, NWilson::TTraceId traceId, const TCPULimits& cpuLimits); static TVector BuildSerializedTableRanges( const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& readData); diff --git a/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp b/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp index d98ac736549f..6e08e9eec539 100644 --- a/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp @@ -426,6 +426,7 @@ class TKqpPartitionedExecuter : public TActorBootstrapped(TableId, range, OperationType, KeyColumnTypes, TVector{}); TAutoPtr request(new NSchemeCache::TSchemeCacheRequest()); + request->DatabaseName = Database; request->ResultSet.emplace_back(std::move(keyRange)); TAutoPtr resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request)); diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp index 33f1533663ad..214796f398e5 100644 --- a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp @@ -325,7 +325,9 @@ class TKqpTableResolver : public TActorBootstrapped { void ResolveKeys() { auto requestNavigate = std::make_unique(); auto request = MakeHolder(); - request->DatabaseName = TasksGraph.GetMeta().Database; + const auto& databaseName = TasksGraph.GetMeta().Database; + requestNavigate->DatabaseName = databaseName; + request->DatabaseName = databaseName; request->ResultSet.reserve(TasksGraph.GetStagesInfo().size()); if (UserToken && !UserToken->GetSerializedToken().empty()) { request->UserToken = UserToken; diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 2cd4042e7e32..112d9d0fb177 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -398,6 +398,7 @@ void TKqpTasksGraph::BuildStreamLookupChannels(const TStageInfo& stageInfo, ui32 NKikimrKqp::TKqpStreamLookupSettings* settings = GetMeta().Allocate(); + settings->SetDatabase(GetMeta().Database); settings->MutableTable()->CopyFrom(streamLookup.GetTable()); auto columnToProto = [] (TString columnName, @@ -470,6 +471,7 @@ void TKqpTasksGraph::BuildVectorResolveChannels(const TStageInfo& stageInfo, ui3 YQL_ENSURE(stageInfo.Meta.IndexMetas.size() == 1); const auto& levelTableInfo = stageInfo.Meta.IndexMetas.back().TableConstInfo; + settings->SetDatabase(GetMeta().Database); auto* levelMeta = settings->MutableLevelTable(); auto& kqpMeta = vectorResolve.GetLevelTable(); levelMeta->SetTablePath(kqpMeta.GetPath()); @@ -2506,6 +2508,7 @@ TMaybe TKqpTasksGraph::BuildScanTasksFromSource(TStageInfo& stageInfo, b input.Meta.SourceSettings = GetMeta().Allocate(); NKikimrTxDataShard::TKqpReadRangesSourceSettings* settings = input.Meta.SourceSettings; + settings->SetDatabase(GetMeta().Database); FillTableMeta(stageInfo, settings->MutableTable()); settings->SetIsTableImmutable(source.GetIsTableImmutable()); diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 07e88d79b359..85c2b49173bc 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -309,7 +309,7 @@ class TKqpNodeService : public TActorBootstrapped { for (auto&& i : computesByStage) { for (auto&& m : i.second.MutableMetaInfo()) { Register(CreateKqpScanFetcher(msg.GetSnapshot(), std::move(m.MutableActorIds()), - m.GetMeta(), runtimeSettingsBase, txId, lockTxId, lockNodeId, lockMode, + m.GetMeta(), runtimeSettingsBase, msg.GetDatabase(), txId, lockTxId, lockNodeId, lockMode, scanPolicy, Counters, NWilson::TTraceId(ev->TraceId), cpuLimits)); } } diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 6f7fd0ca4467..94015cff1bbf 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -520,6 +520,7 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq << ", attempt #" << state->ResolveAttempt); auto request = MakeHolder(); + request->DatabaseName = Settings->GetDatabase(); request->ResultSet.emplace_back(std::move(keyDesc)); request->ResultSet.front().UserData = ResolveShardId; diff --git a/ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp b/ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp index 90397194ee65..040d5c73d654 100644 --- a/ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp @@ -31,7 +31,7 @@ Y_UNIT_TEST_SUITE(TKqpScanFetcher) { NWilson::TTraceId traceId(0); NKikimr::NKqp::TCPULimits cpuLimits; NMonitoring::TDynamicCounterPtr counters = MakeIntrusive(); - auto scanFetcher = runtime.Register(CreateKqpScanFetcher(snapshot, { compute }, meta, settings, + auto scanFetcher = runtime.Register(CreateKqpScanFetcher(snapshot, { compute }, meta, settings, "/Root", 0, TMaybe(), 0, TMaybe(), shardsScanningPolicy, MakeIntrusive(counters), 0, cpuLimits) ); diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index fb3a5e77a8e8..dd1961db2a5d 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -48,6 +48,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped(); + request->DatabaseName = Database; auto keyColumnTypes = StreamLookupWorker->GetKeyColumnTypes(); @@ -828,6 +830,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped(); auto src = arena->Allocate(); + src->SetDatabase(Settings.GetDatabase()); *src->MutableTable() = Settings.GetLevelTable(); range.Serialize(*src->MutableFullRange()); src->SetDataFormat(NKikimrDataEvents::FORMAT_CELLVEC); diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 3a10733738f6..96a686e5f54c 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -640,6 +640,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { TVector{}); TAutoPtr request(new NSchemeCache::TSchemeCacheRequest()); + request->DatabaseName = Database; request->ResultSet.emplace_back(std::move(keyRange)); TAutoPtr resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request)); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index bbf3e8fe0aa4..e51c3c963085 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -887,6 +887,7 @@ message TKqpStreamLookupSettings { optional bool AllowUseFollowers = 15; optional bool IsTableImmutable = 16; optional EIsolationLevel IsolationLevel = 17 [default = ISOLATION_LEVEL_UNDEFINED]; + optional string Database = 18; } message TKqpSequencerSettings { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index ddc2fd279764..0b0f169e085f 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -274,6 +274,8 @@ message TKqpReadRangesSourceSettings { optional bool IsTableImmutable = 23 [default = false]; optional NKikimrKqp.EIsolationLevel IsolationLevel = 24 [default = ISOLATION_LEVEL_UNDEFINED]; + + optional string Database = 25; } // Takes input rows with a vector column, resolves the leaf cluster ID using the given @@ -307,6 +309,9 @@ message TKqpVectorResolveSettings { // Input root cluster ID column (for the prefixed index) optional uint32 RootClusterColumnIndex = 16; + + // Database name for KqpReadActor + optional string Database = 17; } message TKqpTaskInfo { diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp index 404f351e0679..7460e8eb5a39 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.cpp +++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp @@ -559,7 +559,7 @@ void TStatisticsAggregator::Navigate() { } void TStatisticsAggregator::Resolve() { - Y_ABORT_UNLESS(NavigateType == ENavigateType::Traversal && !NavigateAnalyzeOperationId + Y_ABORT_UNLESS(NavigateType == ENavigateType::Traversal && !NavigateAnalyzeOperationId || NavigateType == ENavigateType::Analyze && NavigateAnalyzeOperationId); Y_ABORT_UNLESS(NavigatePathId); @@ -571,6 +571,7 @@ void TStatisticsAggregator::Resolve() { NavigatePathId, range, TKeyDesc::ERowOperation::Read, KeyColumnTypes, Columns); auto request = std::make_unique(); + request->DatabaseName = NavigateDatabase; request->ResultSet.emplace_back(std::move(keyDesc)); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request.release())); diff --git a/ydb/core/statistics/aggregator/tx_resolve.cpp b/ydb/core/statistics/aggregator/tx_resolve.cpp index 36d9d561144b..131fd9a5954c 100644 --- a/ydb/core/statistics/aggregator/tx_resolve.cpp +++ b/ydb/core/statistics/aggregator/tx_resolve.cpp @@ -46,8 +46,8 @@ struct TStatisticsAggregator::TTxResolve : public TTxBase { }); } } - - SA_LOG_D("[" << Self->TabletID() << "] TTxResolve::ExecuteAnalyze. Table OperationId " << Self->NavigateAnalyzeOperationId << ", PathId " << Self->NavigatePathId + + SA_LOG_D("[" << Self->TabletID() << "] TTxResolve::ExecuteAnalyze. Table OperationId " << Self->NavigateAnalyzeOperationId << ", PathId " << Self->NavigatePathId << ", AnalyzedShards " << forceTraversalTable->AnalyzedShards.size()); Self->UpdateForceTraversalTableStatus(TForceTraversalTable::EStatus::AnalyzeStarted, Self->NavigateAnalyzeOperationId, *forceTraversalTable, db); @@ -112,7 +112,7 @@ struct TStatisticsAggregator::TTxResolve : public TTxBase { case ENavigateType::Traversal: return ExecuteTraversal(entry, db); }; - + } void CompleteTraversal(const TActorContext& ctx) { @@ -141,6 +141,7 @@ struct TStatisticsAggregator::TTxResolve : public TTxBase { }; Self->NavigateAnalyzeOperationId.clear(); + Self->NavigateDatabase = ""; Self->NavigatePathId = {}; } }; diff --git a/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp b/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp index cf3ea094bec4..f470649f929d 100644 --- a/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp +++ b/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp @@ -30,11 +30,11 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase { bool ExecuteStartForceTraversal(TTransactionContext& txc) { ++Self->TraversalRound; ++Self->GlobalTraversalRound; - + NIceDb::TNiceDb db(txc.DB); Self->PersistGlobalTraversalRound(db); - AggregateStatisticsRequest = std::make_unique(); + AggregateStatisticsRequest = std::make_unique(); auto& outRecord = AggregateStatisticsRequest->Record; outRecord.SetRound(Self->GlobalTraversalRound); Self->TraversalPathId.ToProto(outRecord.MutablePathId()); @@ -51,7 +51,7 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase { outNode.MutableTabletIds()->CopyFrom(inNode.GetTabletIds()); } - return true; + return true; } bool Execute(TTransactionContext& txc, const TActorContext&) override { @@ -79,6 +79,7 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase { if (!distribution.empty() && Self->ResolveRound < Self->MaxResolveRoundCount) { SA_LOG_W("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Some tablets do not exist in Hive anymore; tablet count = " << distribution.size()); // these tablets do not exist in Hive anymore + Self->NavigateDatabase = Self->TraversalDatabase; Self->NavigatePathId = Self->TraversalPathId; Action = EAction::ScheduleResolve; return true; diff --git a/ydb/core/tx/replication/service/base_table_writer.cpp b/ydb/core/tx/replication/service/base_table_writer.cpp index aa9a98eef08c..848b71e90561 100644 --- a/ydb/core/tx/replication/service/base_table_writer.cpp +++ b/ydb/core/tx/replication/service/base_table_writer.cpp @@ -378,6 +378,7 @@ class TLocalTableWriter void ResolveKeys() { auto request = MakeHolder(); + request->DatabaseName = Database; request->ResultSet.emplace_back(std::move(KeyDesc)); Send(MakeSchemeCacheID(), new TEvResolve(request.Release())); } diff --git a/ydb/core/tx/tx_proxy/read_table_impl.cpp b/ydb/core/tx/tx_proxy/read_table_impl.cpp index a6f2773f8ce4..c82b959deee7 100644 --- a/ydb/core/tx/tx_proxy/read_table_impl.cpp +++ b/ydb/core/tx/tx_proxy/read_table_impl.cpp @@ -697,6 +697,7 @@ class TReadTableWorker : public TActorBootstrapped { Y_ABORT_UNLESS(!ResolveInProgress, "Only one resolve request may be active at a time"); auto request = MakeHolder(); + request->DatabaseName = Settings.DatabaseName; request->DomainOwnerId = DomainInfo->ExtractSchemeShard(); request->ResultSet.emplace_back(std::move(KeyDesc)); diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 9e93207a0f0d..419a7423c515 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -623,7 +623,7 @@ class TUploadRowsBase : public TActorBootstrapped request(new NSchemeCache::TSchemeCacheNavigate()); - request->DatabaseName = std::move(GetDatabase()); + request->DatabaseName = GetDatabase(); NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.Path = ::NKikimr::SplitPath(table); @@ -1049,7 +1049,7 @@ class TUploadRowsBase : public TActorBootstrapped(entry.TableId, range, TKeyDesc::ERowOperation::Update, KeyColumnTypes, columns); TAutoPtr request(new NSchemeCache::TSchemeCacheRequest()); - + request->DatabaseName = GetDatabase(); request->ResultSet.emplace_back(std::move(keyRange)); TAutoPtr resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request));