@@ -50,6 +50,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
5050 , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TaskId, args.TypeEnv, args.HolderFactory, args.InputDesc))
5151 , IsolationLevel(settings.GetIsolationLevel())
5252 , Database(settings.GetDatabase())
53+ , MaxTotalBytesQuota(MaxTotalBytesQuotaStreamLookup())
54+ , MaxRowsProcessing(MaxRowsProcessingStreamLookup())
5355 , Counters(counters)
5456 , LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), " LookupActor" )
5557 {
@@ -308,7 +310,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
308310 ReadRowsCount += replyResultStats.ReadRowsCount ;
309311 ReadBytesCount += replyResultStats.ReadBytesCount ;
310312
311- auto overloaded = StreamLookupWorker->IsOverloaded ();
313+ auto overloaded = StreamLookupWorker->IsOverloaded (MaxRowsProcessing );
312314 if (!overloaded.has_value ()) {
313315 FetchInputRows ();
314316 } else {
@@ -451,6 +453,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
451453 }
452454 }
453455
456+ TotalBytesQuota -= MaxBytesDefaultQuota;
457+ Counters->StreamLookupIteratorTotalQuotaBytesInFlight ->Sub (MaxBytesDefaultQuota);
458+
454459 if (!Snapshot.IsValid ()) {
455460 Snapshot = IKqpGateway::TKqpSnapshot (record.GetSnapshot ().GetStep (), record.GetSnapshot ().GetTxId ());
456461 }
@@ -508,6 +513,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
508513 }
509514 }
510515
516+
511517 YQL_ENSURE (read.LastSeqNo < record.GetSeqNo ());
512518 read.LastSeqNo = record.GetSeqNo ();
513519
@@ -520,8 +526,14 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
520526 request->Record .SetSeqNo (record.GetSeqNo ());
521527
522528 auto defaultSettings = GetDefaultReadAckSettings ()->Record ;
523- request->Record .SetMaxRows (defaultSettings.GetMaxRows ());
524- request->Record .SetMaxBytes (defaultSettings.GetMaxBytes ());
529+ request->Record .SetMaxRows (MaxRowsDefaultQuota);
530+ request->Record .SetMaxBytes (MaxBytesDefaultQuota);
531+
532+ TotalBytesQuota += MaxBytesDefaultQuota;
533+ Counters->StreamLookupIteratorTotalQuotaBytesInFlight ->Add (MaxBytesDefaultQuota);
534+ if (TotalBytesQuota > MaxTotalBytesQuota) {
535+ Counters->StreamLookupIteratorTotalQuotaBytesExceeded ->Inc ();
536+ }
525537
526538 const bool needToCreatePipe = Reads.NeedToCreatePipe (read.ShardId );
527539
@@ -663,10 +675,22 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
663675 }
664676
665677 auto defaultSettings = GetDefaultReadSettings ()->Record ;
666- record.SetMaxRows (defaultSettings.GetMaxRows ());
667- record.SetMaxBytes (defaultSettings.GetMaxBytes ());
678+ if (!MaxRowsDefaultQuota || !MaxBytesDefaultQuota) {
679+ MaxRowsDefaultQuota = defaultSettings.GetMaxRows ();
680+ MaxBytesDefaultQuota = defaultSettings.GetMaxBytes ();
681+ }
682+
683+ record.SetMaxRows (MaxRowsDefaultQuota);
684+ record.SetMaxBytes (MaxBytesDefaultQuota);
668685 record.SetResultFormat (NKikimrDataEvents::FORMAT_CELLVEC);
669686
687+ TotalBytesQuota += MaxBytesDefaultQuota;
688+ Counters->StreamLookupIteratorTotalQuotaBytesInFlight ->Add (MaxBytesDefaultQuota);
689+
690+ if (TotalBytesQuota > MaxTotalBytesQuota) {
691+ Counters->StreamLookupIteratorTotalQuotaBytesExceeded ->Inc ();
692+ }
693+
670694 CA_LOG_D (TStringBuilder () << " Send EvRead (stream lookup) to shardId=" << shardId
671695 << " , readId = " << record.GetReadId ()
672696 << " , tablePath: " << StreamLookupWorker->GetTablePath ()
@@ -838,6 +862,12 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
838862 ui64 ReadRowsCount = 0 ;
839863 ui64 ReadBytesCount = 0 ;
840864
865+ size_t TotalBytesQuota = 0 ;
866+ ui64 MaxTotalBytesQuota = 0 ;
867+ size_t MaxRowsProcessing = 0 ;
868+ size_t MaxBytesDefaultQuota = 0 ;
869+ size_t MaxRowsDefaultQuota = 0 ;
870+
841871 TIntrusivePtr<TKqpCounters> Counters;
842872 NWilson::TSpan LookupActorSpan;
843873 NWilson::TSpan LookupActorStateSpan;
0 commit comments