From db7f56afd2f9177c463b5c4b09809e572634fd10 Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Fri, 21 Nov 2025 21:44:36 +0300 Subject: [PATCH] increase and make configurable some slj params (#29319) --- ydb/core/kqp/counters/kqp_counters.cpp | 2 + ydb/core/kqp/counters/kqp_counters.h | 2 + .../kqp/node_service/kqp_node_service.cpp | 2 + .../kqp/runtime/kqp_read_iterator_common.cpp | 10 ++++- .../kqp/runtime/kqp_read_iterator_common.h | 4 ++ .../kqp/runtime/kqp_stream_lookup_actor.cpp | 40 ++++++++++++++++--- .../kqp/runtime/kqp_stream_lookup_worker.cpp | 11 +++-- .../kqp/runtime/kqp_stream_lookup_worker.h | 2 +- ydb/core/kqp/ut/scan/kqp_scan_ut.cpp | 3 +- ydb/core/protos/table_service_config.proto | 3 ++ 10 files changed, 65 insertions(+), 14 deletions(-) diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 5a23f677dc01..e7042b2cf824 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -815,6 +815,8 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co DataShardIteratorFails = KqpGroup->GetCounter("IteratorReads/DatashardFails", true); DataShardIteratorMessages = KqpGroup->GetCounter("IteratorReads/DatashardMessages", true); IteratorDeliveryProblems = KqpGroup->GetCounter("IteratorReads/DeliveryProblems", true); + StreamLookupIteratorTotalQuotaBytesInFlight = KqpGroup->GetCounter("IteratorReads/StreamLookupIteratorTotalQuotaBytesInFlight", false); + StreamLookupIteratorTotalQuotaBytesExceeded = KqpGroup->GetCounter("IteratorReads/StreamLookupIteratorTotalQuotaBytesExceeded", true); /* sink writes */ WriteActorsShardResolve = KqpGroup->GetCounter("SinkWrites/WriteActorShardResolve", true); diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index 8dbcc3b49e17..91032ee8ea87 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -409,6 +409,8 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRetries; ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorFails; ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages; + ::NMonitoring::TDynamicCounters::TCounterPtr StreamLookupIteratorTotalQuotaBytesInFlight; + ::NMonitoring::TDynamicCounters::TCounterPtr StreamLookupIteratorTotalQuotaBytesExceeded; ::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems; // Sink write counters diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 6df7a37eda10..5770a4dd4eb3 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -434,6 +434,8 @@ class TKqpNodeService : public TActorBootstrapped { ptr->ReadResponseTimeout = TDuration::MilliSeconds(settings.GetIteratorResponseTimeoutMs()); } ptr->MaxRetryDelay = TDuration::MilliSeconds(settings.GetMaxDelayMs()); + ptr->MaxRowsProcessingStreamLookup = settings.GetMaxRowsProcessingStreamLookup(); + ptr->MaxTotalBytesQuotaStreamLookup = settings.GetMaxTotalBytesQuotaStreamLookup(); SetReadIteratorBackoffSettings(ptr); } diff --git a/ydb/core/kqp/runtime/kqp_read_iterator_common.cpp b/ydb/core/kqp/runtime/kqp_read_iterator_common.cpp index 0227d21419d6..387e9d55246a 100644 --- a/ydb/core/kqp/runtime/kqp_read_iterator_common.cpp +++ b/ydb/core/kqp/runtime/kqp_read_iterator_common.cpp @@ -91,7 +91,7 @@ size_t MaxShardResolves() { return Singleton()->SettingsPtr.AtomicLoad()->MaxShardResolves; } -size_t MaxShardRetries() { +size_t MaxShardRetries() { return Singleton()->SettingsPtr.AtomicLoad()->MaxShardAttempts; } @@ -103,5 +103,13 @@ TMaybe ShardTimeout() { return Singleton()->SettingsPtr.AtomicLoad()->ReadResponseTimeout; } +size_t MaxRowsProcessingStreamLookup() { + return Singleton()->SettingsPtr.AtomicLoad()->MaxRowsProcessingStreamLookup; +} + +ui64 MaxTotalBytesQuotaStreamLookup() { + return Singleton()->SettingsPtr.AtomicLoad()->MaxTotalBytesQuotaStreamLookup; +} + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_read_iterator_common.h b/ydb/core/kqp/runtime/kqp_read_iterator_common.h index e95964821641..d6b068b5b2a6 100644 --- a/ydb/core/kqp/runtime/kqp_read_iterator_common.h +++ b/ydb/core/kqp/runtime/kqp_read_iterator_common.h @@ -17,6 +17,8 @@ struct TIteratorReadBackoffSettings : TAtomicRefCount MaxTotalRetries; TMaybe ReadResponseTimeout; + size_t MaxRowsProcessingStreamLookup = 65536; + ui64 MaxTotalBytesQuotaStreamLookup = 5_MB * 512; }; struct TEvReadSettings : public TAtomicRefCount { @@ -38,6 +40,8 @@ size_t MaxShardResolves(); size_t MaxShardRetries(); TMaybe MaxTotalRetries(); TMaybe ShardTimeout(); +size_t MaxRowsProcessingStreamLookup(); +ui64 MaxTotalBytesQuotaStreamLookup(); void SetDefaultIteratorQuotaSettings(ui32 rows, ui32 bytes); THolder GetDefaultReadSettings(); diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 6f8999d78fa1..629dce3895e8 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -47,6 +47,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedIsOverloaded(); + auto overloaded = StreamLookupWorker->IsOverloaded(MaxRowsProcessing); if (!overloaded.has_value()) { FetchInputRows(); } else { @@ -445,6 +447,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedStreamLookupIteratorTotalQuotaBytesInFlight->Sub(MaxBytesDefaultQuota); + if (!Snapshot.IsValid()) { Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId()); } @@ -502,6 +507,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedRecord.SetSeqNo(record.GetSeqNo()); auto defaultSettings = GetDefaultReadAckSettings()->Record; - request->Record.SetMaxRows(defaultSettings.GetMaxRows()); - request->Record.SetMaxBytes(defaultSettings.GetMaxBytes()); + request->Record.SetMaxRows(MaxRowsDefaultQuota); + request->Record.SetMaxBytes(MaxBytesDefaultQuota); + + TotalBytesQuota += MaxBytesDefaultQuota; + Counters->StreamLookupIteratorTotalQuotaBytesInFlight->Add(MaxBytesDefaultQuota); + if (TotalBytesQuota > MaxTotalBytesQuota) { + Counters->StreamLookupIteratorTotalQuotaBytesExceeded->Inc(); + } const bool needToCreatePipe = Reads.NeedToCreatePipe(read.ShardId); @@ -657,10 +669,22 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedRecord; - record.SetMaxRows(defaultSettings.GetMaxRows()); - record.SetMaxBytes(defaultSettings.GetMaxBytes()); + if (!MaxRowsDefaultQuota || !MaxBytesDefaultQuota) { + MaxRowsDefaultQuota = defaultSettings.GetMaxRows(); + MaxBytesDefaultQuota = defaultSettings.GetMaxBytes(); + } + + record.SetMaxRows(MaxRowsDefaultQuota); + record.SetMaxBytes(MaxBytesDefaultQuota); record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC); + TotalBytesQuota += MaxBytesDefaultQuota; + Counters->StreamLookupIteratorTotalQuotaBytesInFlight->Add(MaxBytesDefaultQuota); + + if (TotalBytesQuota > MaxTotalBytesQuota) { + Counters->StreamLookupIteratorTotalQuotaBytesExceeded->Inc(); + } + CA_LOG_D(TStringBuilder() << "Send EvRead (stream lookup) to shardId=" << shardId << ", readId = " << record.GetReadId() << ", tablePath: " << StreamLookupWorker->GetTablePath() @@ -828,6 +852,12 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped Counters; NWilson::TSpan LookupActorSpan; NWilson::TSpan LookupActorStateSpan; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index 445725e487b9..2efabf157090 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -15,7 +15,6 @@ namespace NKikimr { namespace NKqp { -constexpr ui64 MAX_IN_FLIGHT_LIMIT = 500; constexpr ui64 SEQNO_SPACE = 40; constexpr ui64 MaxTaskId = (1ULL << (64 - SEQNO_SPACE)); @@ -323,7 +322,7 @@ class TKqpLookupRows : public TKqpStreamLookupWorker { ReadResults.emplace_back(std::move(result)); } - std::optional IsOverloaded() final { + std::optional IsOverloaded(size_t) final { return std::nullopt; } @@ -596,10 +595,10 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { YQL_ENSURE(false); } - std::optional IsOverloaded() final { - if (UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT || - PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT || - ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT) + std::optional IsOverloaded(size_t maxRowsProcessing) final { + if (UnprocessedRows.size() >= maxRowsProcessing || + PendingLeftRowsByKey.size() >= maxRowsProcessing || + ResultRowsBySeqNo.size() >= maxRowsProcessing) { TStringBuilder overloadDescriptor; overloadDescriptor << "unprocessed rows: " << UnprocessedRows.size() diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h index f0e8b8ba3b7c..2c4fd6d84ab6 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h @@ -80,7 +80,7 @@ class TKqpStreamLookupWorker { virtual bool AllRowsProcessed() = 0; virtual bool HasPendingResults() = 0; virtual void ResetRowsProcessing(ui64 readId) = 0; - virtual std::optional IsOverloaded() = 0; + virtual std::optional IsOverloaded(size_t maxRowsProcessing) = 0; protected: const NMiniKQL::TTypeEnvironment& TypeEnv; diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index f2bdd403dbbc..06f46b1990a4 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -2579,7 +2579,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetUnsertaintyRatio(0.5); appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMultiplier(2.0); appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxTotalRetries(100); - + appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxRowsProcessingStreamLookup(500); + appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxTotalBytesQuotaStreamLookup(100); TPortManager tp; ui16 mbusport = tp.GetPort(2134); diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index b725a0d027c2..e7a197919a79 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -206,6 +206,9 @@ message TTableServiceConfig { optional double Multiplier = 5; optional uint32 IteratorResponseTimeoutMs = 6; optional uint32 MaxTotalRetries = 7; + + optional uint64 MaxRowsProcessingStreamLookup = 9 [default = 65536]; + optional uint64 MaxTotalBytesQuotaStreamLookup = 10 [default = 2684354560]; // 5_MB * 512 } message TIteratorReadQuotaSettings {