diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 95a16c43adcd..6a68588397e5 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -825,6 +825,8 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co DataShardIteratorFails = KqpGroup->GetCounter("IteratorReads/DatashardFails", true); DataShardIteratorMessages = KqpGroup->GetCounter("IteratorReads/DatashardMessages", true); IteratorDeliveryProblems = KqpGroup->GetCounter("IteratorReads/DeliveryProblems", true); + StreamLookupIteratorTotalQuotaBytesInFlight = KqpGroup->GetCounter("IteratorReads/StreamLookupIteratorTotalQuotaBytesInFlight", false); + StreamLookupIteratorTotalQuotaBytesExceeded = KqpGroup->GetCounter("IteratorReads/StreamLookupIteratorTotalQuotaBytesExceeded", true); /* sink writes */ WriteActorsShardResolve = KqpGroup->GetCounter("SinkWrites/WriteActorShardResolve", true); diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index 940517454c26..3f9dd035d46a 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -415,6 +415,8 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter ::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRetries; ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorFails; ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages; + ::NMonitoring::TDynamicCounters::TCounterPtr StreamLookupIteratorTotalQuotaBytesInFlight; + ::NMonitoring::TDynamicCounters::TCounterPtr StreamLookupIteratorTotalQuotaBytesExceeded; ::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems; // Sink write counters diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 719c91e1008d..d6075cabc2e5 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -455,6 +455,8 @@ class TKqpNodeService : public TActorBootstrapped { ptr->ReadResponseTimeout = TDuration::MilliSeconds(settings.GetIteratorResponseTimeoutMs()); } ptr->MaxRetryDelay = TDuration::MilliSeconds(settings.GetMaxDelayMs()); + ptr->MaxRowsProcessingStreamLookup = settings.GetMaxRowsProcessingStreamLookup(); + ptr->MaxTotalBytesQuotaStreamLookup = settings.GetMaxTotalBytesQuotaStreamLookup(); SetReadIteratorBackoffSettings(ptr); } diff --git a/ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp index d77a342f4cc3..073358e1deb1 100644 --- a/ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_buffer_lookup_actor.cpp @@ -258,7 +258,7 @@ class TKqpBufferLookupActor : public NActors::TActorBootstrappedBuildRequests(Partitioning, ReadId); // lookup can't be overloaded - AFL_ENSURE(!worker->IsOverloaded()); + AFL_ENSURE(!worker->IsOverloaded(std::numeric_limits::max())); for (auto& [shardId, read] : reads) { ++state.ReadsInflight; diff --git a/ydb/core/kqp/runtime/kqp_read_iterator_common.cpp b/ydb/core/kqp/runtime/kqp_read_iterator_common.cpp index 0227d21419d6..387e9d55246a 100644 --- a/ydb/core/kqp/runtime/kqp_read_iterator_common.cpp +++ b/ydb/core/kqp/runtime/kqp_read_iterator_common.cpp @@ -91,7 +91,7 @@ size_t MaxShardResolves() { return Singleton()->SettingsPtr.AtomicLoad()->MaxShardResolves; } -size_t MaxShardRetries() { +size_t MaxShardRetries() { return Singleton()->SettingsPtr.AtomicLoad()->MaxShardAttempts; } @@ -103,5 +103,13 @@ TMaybe ShardTimeout() { return Singleton()->SettingsPtr.AtomicLoad()->ReadResponseTimeout; } +size_t MaxRowsProcessingStreamLookup() { + return Singleton()->SettingsPtr.AtomicLoad()->MaxRowsProcessingStreamLookup; +} + +ui64 MaxTotalBytesQuotaStreamLookup() { + return Singleton()->SettingsPtr.AtomicLoad()->MaxTotalBytesQuotaStreamLookup; +} + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_read_iterator_common.h b/ydb/core/kqp/runtime/kqp_read_iterator_common.h index e95964821641..d6b068b5b2a6 100644 --- a/ydb/core/kqp/runtime/kqp_read_iterator_common.h +++ b/ydb/core/kqp/runtime/kqp_read_iterator_common.h @@ -17,6 +17,8 @@ struct TIteratorReadBackoffSettings : TAtomicRefCount MaxTotalRetries; TMaybe ReadResponseTimeout; + size_t MaxRowsProcessingStreamLookup = 65536; + ui64 MaxTotalBytesQuotaStreamLookup = 5_MB * 512; }; struct TEvReadSettings : public TAtomicRefCount { @@ -38,6 +40,8 @@ size_t MaxShardResolves(); size_t MaxShardRetries(); TMaybe MaxTotalRetries(); TMaybe ShardTimeout(); +size_t MaxRowsProcessingStreamLookup(); +ui64 MaxTotalBytesQuotaStreamLookup(); void SetDefaultIteratorQuotaSettings(ui32 rows, ui32 bytes); THolder GetDefaultReadSettings(); diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 8be8a6f23163..1ad7475cdf4c 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -50,6 +50,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedIsOverloaded(); + auto overloaded = StreamLookupWorker->IsOverloaded(MaxRowsProcessing); if (!overloaded.has_value()) { FetchInputRows(); } else { @@ -451,6 +453,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedStreamLookupIteratorTotalQuotaBytesInFlight->Sub(MaxBytesDefaultQuota); + if (!Snapshot.IsValid()) { Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId()); } @@ -508,6 +513,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedRecord.SetSeqNo(record.GetSeqNo()); auto defaultSettings = GetDefaultReadAckSettings()->Record; - request->Record.SetMaxRows(defaultSettings.GetMaxRows()); - request->Record.SetMaxBytes(defaultSettings.GetMaxBytes()); + request->Record.SetMaxRows(MaxRowsDefaultQuota); + request->Record.SetMaxBytes(MaxBytesDefaultQuota); + + TotalBytesQuota += MaxBytesDefaultQuota; + Counters->StreamLookupIteratorTotalQuotaBytesInFlight->Add(MaxBytesDefaultQuota); + if (TotalBytesQuota > MaxTotalBytesQuota) { + Counters->StreamLookupIteratorTotalQuotaBytesExceeded->Inc(); + } const bool needToCreatePipe = Reads.NeedToCreatePipe(read.ShardId); @@ -663,10 +675,22 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedRecord; - record.SetMaxRows(defaultSettings.GetMaxRows()); - record.SetMaxBytes(defaultSettings.GetMaxBytes()); + if (!MaxRowsDefaultQuota || !MaxBytesDefaultQuota) { + MaxRowsDefaultQuota = defaultSettings.GetMaxRows(); + MaxBytesDefaultQuota = defaultSettings.GetMaxBytes(); + } + + record.SetMaxRows(MaxRowsDefaultQuota); + record.SetMaxBytes(MaxBytesDefaultQuota); record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC); + TotalBytesQuota += MaxBytesDefaultQuota; + Counters->StreamLookupIteratorTotalQuotaBytesInFlight->Add(MaxBytesDefaultQuota); + + if (TotalBytesQuota > MaxTotalBytesQuota) { + Counters->StreamLookupIteratorTotalQuotaBytesExceeded->Inc(); + } + CA_LOG_D(TStringBuilder() << "Send EvRead (stream lookup) to shardId=" << shardId << ", readId = " << record.GetReadId() << ", tablePath: " << StreamLookupWorker->GetTablePath() @@ -838,6 +862,12 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped Counters; NWilson::TSpan LookupActorSpan; NWilson::TSpan LookupActorStateSpan; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index c46a5d735bdc..d50dd4e78c33 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -16,7 +16,6 @@ namespace NKikimr { namespace NKqp { -constexpr ui64 MAX_IN_FLIGHT_LIMIT = 500; constexpr ui64 SEQNO_SPACE = 40; constexpr ui64 MaxTaskId = (1ULL << (64 - SEQNO_SPACE)); @@ -324,7 +323,7 @@ class TKqpLookupRows : public TKqpStreamLookupWorker { ReadResults.emplace_back(std::move(result)); } - std::optional IsOverloaded() final { + std::optional IsOverloaded(size_t) final { return std::nullopt; } @@ -608,10 +607,10 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { YQL_ENSURE(false); } - std::optional IsOverloaded() final { - if (UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT || - PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT || - ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT) + std::optional IsOverloaded(size_t maxRowsProcessing) final { + if (UnprocessedRows.size() >= maxRowsProcessing || + PendingLeftRowsByKey.size() >= maxRowsProcessing || + ResultRowsBySeqNo.size() >= maxRowsProcessing) { TStringBuilder overloadDescriptor; overloadDescriptor << "unprocessed rows: " << UnprocessedRows.size() diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h index 5e750e0c7b72..c5f324b36134 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h @@ -81,7 +81,7 @@ class TKqpStreamLookupWorker { virtual bool AllRowsProcessed() = 0; virtual bool HasPendingResults() = 0; virtual void ResetRowsProcessing(ui64 readId) = 0; - virtual std::optional IsOverloaded() = 0; + virtual std::optional IsOverloaded(size_t maxRowsProcessing) = 0; protected: const NMiniKQL::TTypeEnvironment& TypeEnv; diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index f7a9a98b2f4e..0712f52adbd4 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -2580,7 +2580,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetUnsertaintyRatio(0.5); appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMultiplier(2.0); appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxTotalRetries(100); - + appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxRowsProcessingStreamLookup(500); + appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxTotalBytesQuotaStreamLookup(100); TPortManager tp; ui16 mbusport = tp.GetPort(2134); diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 78c340671885..981ac52f3928 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -209,6 +209,9 @@ message TTableServiceConfig { optional double Multiplier = 5; optional uint32 IteratorResponseTimeoutMs = 6; optional uint32 MaxTotalRetries = 7; + + optional uint64 MaxRowsProcessingStreamLookup = 9 [default = 65536]; + optional uint64 MaxTotalBytesQuotaStreamLookup = 10 [default = 2684354560]; // 5_MB * 512 } message TIteratorReadQuotaSettings {