Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_import_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque

void ResolveKeys() {
auto request = MakeHolder<TResolve>();
request->DatabaseName = NKikimr::CanonizePath(GetDatabaseName());
request->DatabaseName = GetDatabaseName();

request->ResultSet.emplace_back(std::move(KeyDesc));
request->ResultSet.back().Access = NACLib::UpdateRow;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_kh_describe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class TKikhouseDescribeTableRPC : public TActorBootstrapped<TKikhouseDescribeTab
KeyRange.Reset(new TKeyDesc(entry.TableId, range, TKeyDesc::ERowOperation::Read, KeyColumnTypes, columns));

TAutoPtr<NSchemeCache::TSchemeCacheRequest> request(new NSchemeCache::TSchemeCacheRequest());

request->DatabaseName = Request->GetDatabaseName().GetOrElse("");
request->ResultSet.emplace_back(std::move(KeyRange));

TAutoPtr<TEvTxProxySchemeCache::TEvResolveKeySet> resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request));
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag

void ResolveShards(const NActors::TActorContext& ctx) {
TAutoPtr<NSchemeCache::TSchemeCacheRequest> request(new NSchemeCache::TSchemeCacheRequest());

request->DatabaseName = GrpcRequest->GetDatabaseName().GetOrElse("");
request->ResultSet.emplace_back(std::move(KeyRange));

TAutoPtr<TEvTxProxySchemeCache::TEvResolveKeySet> resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request));
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_read_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ class TReadColumnsRPC : public TActorBootstrapped<TReadColumnsRPC> {
<< " fromInclusive: " << true);

TAutoPtr<NSchemeCache::TSchemeCacheRequest> request(new NSchemeCache::TSchemeCacheRequest());

request->DatabaseName = Request->GetDatabaseName().GetOrElse("");
request->ResultSet.emplace_back(std::move(KeyRange));

TAutoPtr<TEvTxProxySchemeCache::TEvResolveKeySet> resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/rpc_read_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
auto keyRange = MakeHolder<TKeyDesc>(entry.TableId, range, TKeyDesc::ERowOperation::Read, KeyColumnTypes, columns);

auto request = std::make_unique<NSchemeCache::TSchemeCacheRequest>();
request->DatabaseName = GetDatabase();
request->ResultSet.emplace_back(std::move(keyRange));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request.release()), 0, 0, Span.GetTraceId());
}
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,12 @@ IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
}

IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings, const ui64 txId,
TMaybe<ui64> lockTxId, ui32 lockNodeId, TMaybe<NKikimrDataEvents::ELockMode> lockMode, const TShardsScanningPolicy& shardsScanningPolicy,
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
const TString& database, const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
TMaybe<NKikimrDataEvents::ELockMode> lockMode, const TShardsScanningPolicy& shardsScanningPolicy,
TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId, const TCPULimits& cpuLimits) {
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, lockMode, meta,
shardsScanningPolicy, counters, std::move(traceId), cpuLimits);
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, lockMode,
database, meta, shardsScanningPolicy, counters, std::move(traceId), cpuLimits);
}

} // namespace NKikimr::NKqp
3 changes: 2 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, NYql::N

IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId, TMaybe<NKikimrDataEvents::ELockMode> lockMode, const TShardsScanningPolicy& shardsScanningPolicy,
const TString& database, const ui64 txId, TMaybe<ui64> lockTxId, ui32 lockNodeId,
TMaybe<NKikimrDataEvents::ELockMode> lockMode, const TShardsScanningPolicy& shardsScanningPolicy,
TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId, const TCPULimits& cpuLimits);

NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ constexpr TDuration PING_PERIOD = TDuration::Seconds(30);

TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TComputeRuntimeSettings& settings,
std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId,
const TMaybe<NKikimrDataEvents::ELockMode> lockMode, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta,
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId,
const TMaybe<NKikimrDataEvents::ELockMode> lockMode, const TString& database,
const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, const TShardsScanningPolicy& shardsScanningPolicy,
TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId,
const TCPULimits& cpuLimits)
: Meta(meta)
, ScanDataMeta(Meta)
, RuntimeSettings(settings)
, Database(database)
, TxId(txId)
, LockTxId(lockTxId)
, LockNodeId(lockNodeId)
Expand Down Expand Up @@ -648,6 +650,7 @@ void TKqpScanFetcherActor::ResolveShard(TShardState& state) {
<< ", attempt #" << state.ResolveAttempt);

auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
request->DatabaseName = Database;
request->ResultSet.emplace_back(std::move(keyDesc));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
}
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta;
const NMiniKQL::TScanDataMetaFull ScanDataMeta;
const NYql::NDq::TComputeRuntimeSettings RuntimeSettings;
const TString Database;
const NYql::NDq::TTxId TxId;
const TMaybe<ui64> LockTxId;
const ui32 LockNodeId;
Expand All @@ -63,9 +64,9 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc

TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot, const NYql::NDq::TComputeRuntimeSettings& settings,
std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const TMaybe<ui64> lockTxId, const ui32 lockNodeId,
const TMaybe<NKikimrDataEvents::ELockMode> lockMode, const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta,
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId,
const TCPULimits& cpuLimits);
const TMaybe<NKikimrDataEvents::ELockMode> lockMode, const TString& database,
const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, const TShardsScanningPolicy& shardsScanningPolicy,
TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId, const TCPULimits& cpuLimits);

static TVector<TSerializedTableRange> BuildSerializedTableRanges(
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& readData);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ class TKqpPartitionedExecuter : public TActorBootstrapped<TKqpPartitionedExecute
auto keyRange = MakeHolder<TKeyDesc>(TableId, range, OperationType, KeyColumnTypes, TVector<TKeyDesc::TColumnOp>{});

TAutoPtr<NSchemeCache::TSchemeCacheRequest> request(new NSchemeCache::TSchemeCacheRequest());
request->DatabaseName = Database;
request->ResultSet.emplace_back(std::move(keyRange));

TAutoPtr<TEvTxProxySchemeCache::TEvResolveKeySet> resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request));
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ class TKqpTableResolver : public TActorBootstrapped<TKqpTableResolver> {
void ResolveKeys() {
auto requestNavigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
request->DatabaseName = TasksGraph.GetMeta().Database;
const auto& databaseName = TasksGraph.GetMeta().Database;
requestNavigate->DatabaseName = databaseName;
request->DatabaseName = databaseName;
request->ResultSet.reserve(TasksGraph.GetStagesInfo().size());
if (UserToken && !UserToken->GetSerializedToken().empty()) {
request->UserToken = UserToken;
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ void TKqpTasksGraph::BuildStreamLookupChannels(const TStageInfo& stageInfo, ui32

NKikimrKqp::TKqpStreamLookupSettings* settings = GetMeta().Allocate<NKikimrKqp::TKqpStreamLookupSettings>();

settings->SetDatabase(GetMeta().Database);
settings->MutableTable()->CopyFrom(streamLookup.GetTable());

auto columnToProto = [] (TString columnName,
Expand Down Expand Up @@ -470,6 +471,7 @@ void TKqpTasksGraph::BuildVectorResolveChannels(const TStageInfo& stageInfo, ui3
YQL_ENSURE(stageInfo.Meta.IndexMetas.size() == 1);
const auto& levelTableInfo = stageInfo.Meta.IndexMetas.back().TableConstInfo;

settings->SetDatabase(GetMeta().Database);
auto* levelMeta = settings->MutableLevelTable();
auto& kqpMeta = vectorResolve.GetLevelTable();
levelMeta->SetTablePath(kqpMeta.GetPath());
Expand Down Expand Up @@ -2506,6 +2508,7 @@ TMaybe<size_t> TKqpTasksGraph::BuildScanTasksFromSource(TStageInfo& stageInfo, b

input.Meta.SourceSettings = GetMeta().Allocate<NKikimrTxDataShard::TKqpReadRangesSourceSettings>();
NKikimrTxDataShard::TKqpReadRangesSourceSettings* settings = input.Meta.SourceSettings;
settings->SetDatabase(GetMeta().Database);
FillTableMeta(stageInfo, settings->MutableTable());

settings->SetIsTableImmutable(source.GetIsTableImmutable());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
for (auto&& i : computesByStage) {
for (auto&& m : i.second.MutableMetaInfo()) {
Register(CreateKqpScanFetcher(msg.GetSnapshot(), std::move(m.MutableActorIds()),
m.GetMeta(), runtimeSettingsBase, txId, lockTxId, lockNodeId, lockMode,
m.GetMeta(), runtimeSettingsBase, msg.GetDatabase(), txId, lockTxId, lockNodeId, lockMode,
scanPolicy, Counters, NWilson::TTraceId(ev->TraceId), cpuLimits));
}
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/runtime/kqp_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
<< ", attempt #" << state->ResolveAttempt);

auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
request->DatabaseName = Settings->GetDatabase();
request->ResultSet.emplace_back(std::move(keyDesc));

request->ResultSet.front().UserData = ResolveShardId;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Y_UNIT_TEST_SUITE(TKqpScanFetcher) {
NWilson::TTraceId traceId(0);
NKikimr::NKqp::TCPULimits cpuLimits;
NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>();
auto scanFetcher = runtime.Register(CreateKqpScanFetcher(snapshot, { compute }, meta, settings,
auto scanFetcher = runtime.Register(CreateKqpScanFetcher(snapshot, { compute }, meta, settings, "/Root",
0, TMaybe<ui64>(), 0, TMaybe<NKikimrDataEvents::ELockMode>(), shardsScanningPolicy,
MakeIntrusive<NKikimr::NKqp::TKqpCounters>(counters), 0, cpuLimits)
);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
, LookupStrategy(settings.GetLookupStrategy())
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TaskId, args.TypeEnv, args.HolderFactory, args.InputDesc))
, IsolationLevel(settings.GetIsolationLevel())
, Database(settings.GetDatabase())
, Counters(counters)
, LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor")
{
Expand Down Expand Up @@ -754,6 +755,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
Partitioning.reset();

auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
request->DatabaseName = Database;

auto keyColumnTypes = StreamLookupWorker->GetKeyColumnTypes();

Expand Down Expand Up @@ -828,6 +830,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
size_t TotalResolveShardsAttempts = 0;
bool ResolveShardsInProgress = false;
NKikimrKqp::EIsolationLevel IsolationLevel;
const TString Database;

// stats
ui64 ReadRowsCount = 0;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/runtime/kqp_vector_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class TKqpVectorResolveActor : public NActors::TActorBootstrapped<TKqpVectorReso
auto range = ParentRange(parent);
auto arena = MakeIntrusive<NActors::TProtoArenaHolder>();
auto src = arena->Allocate<NKikimrTxDataShard::TKqpReadRangesSourceSettings>();
src->SetDatabase(Settings.GetDatabase());
*src->MutableTable() = Settings.GetLevelTable();
range.Serialize(*src->MutableFullRange());
src->SetDataFormat(NKikimrDataEvents::FORMAT_CELLVEC);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
TVector<TKeyDesc::TColumnOp>{});

TAutoPtr<NSchemeCache::TSchemeCacheRequest> request(new NSchemeCache::TSchemeCacheRequest());
request->DatabaseName = Database;
request->ResultSet.emplace_back(std::move(keyRange));

TAutoPtr<TEvTxProxySchemeCache::TEvResolveKeySet> resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ message TKqpStreamLookupSettings {
optional bool AllowUseFollowers = 15;
optional bool IsTableImmutable = 16;
optional EIsolationLevel IsolationLevel = 17 [default = ISOLATION_LEVEL_UNDEFINED];
optional string Database = 18;
}

message TKqpSequencerSettings {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ message TKqpReadRangesSourceSettings {

optional bool IsTableImmutable = 23 [default = false];
optional NKikimrKqp.EIsolationLevel IsolationLevel = 24 [default = ISOLATION_LEVEL_UNDEFINED];

optional string Database = 25;
}

// Takes input rows with a vector column, resolves the leaf cluster ID using the given
Expand Down Expand Up @@ -307,6 +309,9 @@ message TKqpVectorResolveSettings {

// Input root cluster ID column (for the prefixed index)
optional uint32 RootClusterColumnIndex = 16;

// Database name for KqpReadActor
optional string Database = 17;
}

message TKqpTaskInfo {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ void TStatisticsAggregator::Navigate() {
}

void TStatisticsAggregator::Resolve() {
Y_ABORT_UNLESS(NavigateType == ENavigateType::Traversal && !NavigateAnalyzeOperationId
Y_ABORT_UNLESS(NavigateType == ENavigateType::Traversal && !NavigateAnalyzeOperationId
|| NavigateType == ENavigateType::Analyze && NavigateAnalyzeOperationId);
Y_ABORT_UNLESS(NavigatePathId);

Expand All @@ -571,6 +571,7 @@ void TStatisticsAggregator::Resolve() {
NavigatePathId, range, TKeyDesc::ERowOperation::Read, KeyColumnTypes, Columns);

auto request = std::make_unique<NSchemeCache::TSchemeCacheRequest>();
request->DatabaseName = NavigateDatabase;
request->ResultSet.emplace_back(std::move(keyDesc));

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request.release()));
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/statistics/aggregator/tx_resolve.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ struct TStatisticsAggregator::TTxResolve : public TTxBase {
});
}
}
SA_LOG_D("[" << Self->TabletID() << "] TTxResolve::ExecuteAnalyze. Table OperationId " << Self->NavigateAnalyzeOperationId << ", PathId " << Self->NavigatePathId

SA_LOG_D("[" << Self->TabletID() << "] TTxResolve::ExecuteAnalyze. Table OperationId " << Self->NavigateAnalyzeOperationId << ", PathId " << Self->NavigatePathId
<< ", AnalyzedShards " << forceTraversalTable->AnalyzedShards.size());

Self->UpdateForceTraversalTableStatus(TForceTraversalTable::EStatus::AnalyzeStarted, Self->NavigateAnalyzeOperationId, *forceTraversalTable, db);
Expand Down Expand Up @@ -112,7 +112,7 @@ struct TStatisticsAggregator::TTxResolve : public TTxBase {
case ENavigateType::Traversal:
return ExecuteTraversal(entry, db);
};

}

void CompleteTraversal(const TActorContext& ctx) {
Expand Down Expand Up @@ -141,6 +141,7 @@ struct TStatisticsAggregator::TTxResolve : public TTxBase {
};

Self->NavigateAnalyzeOperationId.clear();
Self->NavigateDatabase = "";
Self->NavigatePathId = {};
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
bool ExecuteStartForceTraversal(TTransactionContext& txc) {
++Self->TraversalRound;
++Self->GlobalTraversalRound;

NIceDb::TNiceDb db(txc.DB);
Self->PersistGlobalTraversalRound(db);

AggregateStatisticsRequest = std::make_unique<TEvStatistics::TEvAggregateStatistics>();
AggregateStatisticsRequest = std::make_unique<TEvStatistics::TEvAggregateStatistics>();
auto& outRecord = AggregateStatisticsRequest->Record;
outRecord.SetRound(Self->GlobalTraversalRound);
Self->TraversalPathId.ToProto(outRecord.MutablePathId());
Expand All @@ -51,7 +51,7 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
outNode.MutableTabletIds()->CopyFrom(inNode.GetTabletIds());
}

return true;
return true;
}

bool Execute(TTransactionContext& txc, const TActorContext&) override {
Expand Down Expand Up @@ -79,6 +79,7 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
if (!distribution.empty() && Self->ResolveRound < Self->MaxResolveRoundCount) {
SA_LOG_W("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Some tablets do not exist in Hive anymore; tablet count = " << distribution.size());
// these tablets do not exist in Hive anymore
Self->NavigateDatabase = Self->TraversalDatabase;
Self->NavigatePathId = Self->TraversalPathId;
Action = EAction::ScheduleResolve;
return true;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/service/base_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ class TLocalTableWriter

void ResolveKeys() {
auto request = MakeHolder<TResolve>();
request->DatabaseName = Database;
request->ResultSet.emplace_back(std::move(KeyDesc));
Send(MakeSchemeCacheID(), new TEvResolve(request.Release()));
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/tx_proxy/read_table_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ class TReadTableWorker : public TActorBootstrapped<TReadTableWorker> {
Y_ABORT_UNLESS(!ResolveInProgress, "Only one resolve request may be active at a time");

auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
request->DatabaseName = Settings.DatabaseName;
request->DomainOwnerId = DomainInfo->ExtractSchemeShard();
request->ResultSet.emplace_back(std::move(KeyDesc));

Expand Down
Loading
Loading