Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,8 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
DataShardIteratorFails = KqpGroup->GetCounter("IteratorReads/DatashardFails", true);
DataShardIteratorMessages = KqpGroup->GetCounter("IteratorReads/DatashardMessages", true);
IteratorDeliveryProblems = KqpGroup->GetCounter("IteratorReads/DeliveryProblems", true);
StreamLookupIteratorTotalQuotaBytesInFlight = KqpGroup->GetCounter("IteratorReads/StreamLookupIteratorTotalQuotaBytesInFlight", false);
StreamLookupIteratorTotalQuotaBytesExceeded = KqpGroup->GetCounter("IteratorReads/StreamLookupIteratorTotalQuotaBytesExceeded", true);

/* sink writes */
WriteActorsShardResolve = KqpGroup->GetCounter("SinkWrites/WriteActorShardResolve", true);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRetries;
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorFails;
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages;
::NMonitoring::TDynamicCounters::TCounterPtr StreamLookupIteratorTotalQuotaBytesInFlight;
::NMonitoring::TDynamicCounters::TCounterPtr StreamLookupIteratorTotalQuotaBytesExceeded;
::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems;

// Sink write counters
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
ptr->ReadResponseTimeout = TDuration::MilliSeconds(settings.GetIteratorResponseTimeoutMs());
}
ptr->MaxRetryDelay = TDuration::MilliSeconds(settings.GetMaxDelayMs());
ptr->MaxRowsProcessingStreamLookup = settings.GetMaxRowsProcessingStreamLookup();
ptr->MaxTotalBytesQuotaStreamLookup = settings.GetMaxTotalBytesQuotaStreamLookup();
SetReadIteratorBackoffSettings(ptr);
}

Expand Down
10 changes: 9 additions & 1 deletion ydb/core/kqp/runtime/kqp_read_iterator_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ size_t MaxShardResolves() {
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxShardResolves;
}

size_t MaxShardRetries() {
size_t MaxShardRetries() {
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxShardAttempts;
}

Expand All @@ -103,5 +103,13 @@ TMaybe<TDuration> ShardTimeout() {
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->ReadResponseTimeout;
}

size_t MaxRowsProcessingStreamLookup() {
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxRowsProcessingStreamLookup;
}

ui64 MaxTotalBytesQuotaStreamLookup() {
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxTotalBytesQuotaStreamLookup;
}

} // namespace NKqp
} // namespace NKikimr
4 changes: 4 additions & 0 deletions ydb/core/kqp/runtime/kqp_read_iterator_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ struct TIteratorReadBackoffSettings : TAtomicRefCount<TIteratorReadBackoffSettin

TMaybe<size_t> MaxTotalRetries;
TMaybe<TDuration> ReadResponseTimeout;
size_t MaxRowsProcessingStreamLookup = 65536;
ui64 MaxTotalBytesQuotaStreamLookup = 5_MB * 512;
};

struct TEvReadSettings : public TAtomicRefCount<TEvReadSettings> {
Expand All @@ -38,6 +40,8 @@ size_t MaxShardResolves();
size_t MaxShardRetries();
TMaybe<size_t> MaxTotalRetries();
TMaybe<TDuration> ShardTimeout();
size_t MaxRowsProcessingStreamLookup();
ui64 MaxTotalBytesQuotaStreamLookup();

void SetDefaultIteratorQuotaSettings(ui32 rows, ui32 bytes);
THolder<NKikimr::TEvDataShard::TEvRead> GetDefaultReadSettings();
Expand Down
40 changes: 35 additions & 5 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
, LookupStrategy(settings.GetLookupStrategy())
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TaskId, args.TypeEnv, args.HolderFactory, args.InputDesc))
, MaxTotalBytesQuota(MaxTotalBytesQuotaStreamLookup())
, MaxRowsProcessing(MaxRowsProcessingStreamLookup())
, Counters(counters)
, LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor")
{
Expand Down Expand Up @@ -305,7 +307,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
ReadRowsCount += replyResultStats.ReadRowsCount;
ReadBytesCount += replyResultStats.ReadBytesCount;

auto overloaded = StreamLookupWorker->IsOverloaded();
auto overloaded = StreamLookupWorker->IsOverloaded(MaxRowsProcessing);
if (!overloaded.has_value()) {
FetchInputRows();
} else {
Expand Down Expand Up @@ -445,6 +447,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
}
}

TotalBytesQuota -= MaxBytesDefaultQuota;
Counters->StreamLookupIteratorTotalQuotaBytesInFlight->Sub(MaxBytesDefaultQuota);

if (!Snapshot.IsValid()) {
Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId());
}
Expand Down Expand Up @@ -502,6 +507,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
}
}


YQL_ENSURE(read.LastSeqNo < record.GetSeqNo());
read.LastSeqNo = record.GetSeqNo();

Expand All @@ -514,8 +520,14 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
request->Record.SetSeqNo(record.GetSeqNo());

auto defaultSettings = GetDefaultReadAckSettings()->Record;
request->Record.SetMaxRows(defaultSettings.GetMaxRows());
request->Record.SetMaxBytes(defaultSettings.GetMaxBytes());
request->Record.SetMaxRows(MaxRowsDefaultQuota);
request->Record.SetMaxBytes(MaxBytesDefaultQuota);

TotalBytesQuota += MaxBytesDefaultQuota;
Counters->StreamLookupIteratorTotalQuotaBytesInFlight->Add(MaxBytesDefaultQuota);
if (TotalBytesQuota > MaxTotalBytesQuota) {
Counters->StreamLookupIteratorTotalQuotaBytesExceeded->Inc();
}

const bool needToCreatePipe = Reads.NeedToCreatePipe(read.ShardId);

Expand Down Expand Up @@ -657,10 +669,22 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
}

auto defaultSettings = GetDefaultReadSettings()->Record;
record.SetMaxRows(defaultSettings.GetMaxRows());
record.SetMaxBytes(defaultSettings.GetMaxBytes());
if (!MaxRowsDefaultQuota || !MaxBytesDefaultQuota) {
MaxRowsDefaultQuota = defaultSettings.GetMaxRows();
MaxBytesDefaultQuota = defaultSettings.GetMaxBytes();
}

record.SetMaxRows(MaxRowsDefaultQuota);
record.SetMaxBytes(MaxBytesDefaultQuota);
record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC);

TotalBytesQuota += MaxBytesDefaultQuota;
Counters->StreamLookupIteratorTotalQuotaBytesInFlight->Add(MaxBytesDefaultQuota);

if (TotalBytesQuota > MaxTotalBytesQuota) {
Counters->StreamLookupIteratorTotalQuotaBytesExceeded->Inc();
}

CA_LOG_D(TStringBuilder() << "Send EvRead (stream lookup) to shardId=" << shardId
<< ", readId = " << record.GetReadId()
<< ", tablePath: " << StreamLookupWorker->GetTablePath()
Expand Down Expand Up @@ -828,6 +852,12 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
ui64 ReadRowsCount = 0;
ui64 ReadBytesCount = 0;

size_t TotalBytesQuota = 0;
ui64 MaxTotalBytesQuota = 0;
size_t MaxRowsProcessing = 0;
size_t MaxBytesDefaultQuota = 0;
size_t MaxRowsDefaultQuota = 0;

TIntrusivePtr<TKqpCounters> Counters;
NWilson::TSpan LookupActorSpan;
NWilson::TSpan LookupActorStateSpan;
Expand Down
11 changes: 5 additions & 6 deletions ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
namespace NKikimr {
namespace NKqp {

constexpr ui64 MAX_IN_FLIGHT_LIMIT = 500;
constexpr ui64 SEQNO_SPACE = 40;
constexpr ui64 MaxTaskId = (1ULL << (64 - SEQNO_SPACE));

Expand Down Expand Up @@ -323,7 +322,7 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
ReadResults.emplace_back(std::move(result));
}

std::optional<TString> IsOverloaded() final {
std::optional<TString> IsOverloaded(size_t) final {
return std::nullopt;
}

Expand Down Expand Up @@ -596,10 +595,10 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
YQL_ENSURE(false);
}

std::optional<TString> IsOverloaded() final {
if (UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT ||
PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT ||
ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT)
std::optional<TString> IsOverloaded(size_t maxRowsProcessing) final {
if (UnprocessedRows.size() >= maxRowsProcessing ||
PendingLeftRowsByKey.size() >= maxRowsProcessing ||
ResultRowsBySeqNo.size() >= maxRowsProcessing)
{
TStringBuilder overloadDescriptor;
overloadDescriptor << "unprocessed rows: " << UnprocessedRows.size()
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class TKqpStreamLookupWorker {
virtual bool AllRowsProcessed() = 0;
virtual bool HasPendingResults() = 0;
virtual void ResetRowsProcessing(ui64 readId) = 0;
virtual std::optional<TString> IsOverloaded() = 0;
virtual std::optional<TString> IsOverloaded(size_t maxRowsProcessing) = 0;

protected:
const NMiniKQL::TTypeEnvironment& TypeEnv;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2579,7 +2579,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetUnsertaintyRatio(0.5);
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMultiplier(2.0);
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxTotalRetries(100);

appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxRowsProcessingStreamLookup(500);
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxTotalBytesQuotaStreamLookup(100);

TPortManager tp;
ui16 mbusport = tp.GetPort(2134);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ message TTableServiceConfig {
optional double Multiplier = 5;
optional uint32 IteratorResponseTimeoutMs = 6;
optional uint32 MaxTotalRetries = 7;

optional uint64 MaxRowsProcessingStreamLookup = 9 [default = 65536];
optional uint64 MaxTotalBytesQuotaStreamLookup = 10 [default = 2684354560]; // 5_MB * 512
}

message TIteratorReadQuotaSettings {
Expand Down
Loading