From 03c8cc6ee54d412866bb8b24e04d128e2c1ed288 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 4 Nov 2025 17:26:59 +0200 Subject: [PATCH] Add IC queue time metric. EXT-1612 (#28184) Add IC queue time metric. Time spend in the interconnect queue did not report in any existed counter, but it is important for performance analysis. --- .../actors/interconnect/interconnect_channel.cpp | 5 +++++ .../actors/interconnect/interconnect_channel.h | 4 ++-- .../actors/interconnect/interconnect_counters.cpp | 14 ++++++++++++++ .../actors/interconnect/interconnect_counters.h | 1 + .../interconnect/interconnect_tcp_session.cpp | 4 +++- ydb/library/actors/interconnect/packet.cpp | 14 ++++++++++++++ ydb/library/actors/interconnect/packet.h | 11 +++++------ .../interconnect/ut/channel_scheduler_ut.cpp | 2 +- 8 files changed, 45 insertions(+), 10 deletions(-) diff --git a/ydb/library/actors/interconnect/interconnect_channel.cpp b/ydb/library/actors/interconnect/interconnect_channel.cpp index bedf405d1794..54b3cf4db66c 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.cpp +++ b/ydb/library/actors/interconnect/interconnect_channel.cpp @@ -70,6 +70,11 @@ namespace NActors { switch (State) { case EState::INITIAL: event.InitChecksum(); + if (event.EnqueueTime) { + TDuration duration = NActors::TlsActivationContext->Now() - event.EnqueueTime; + Metrics->UpdateIcQueueTimeHistogram(duration.MicroSeconds()); + } + event.Span && event.Span.Event("FeedBuf:INITIAL"); if (event.Buffer) { State = EState::BODY; Iter = event.Buffer->GetBeginIter(); diff --git a/ydb/library/actors/interconnect/interconnect_channel.h b/ydb/library/actors/interconnect/interconnect_channel.h index c6275c7e7385..0b8abbc03ab3 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.h +++ b/ydb/library/actors/interconnect/interconnect_channel.h @@ -76,9 +76,9 @@ namespace NActors { ~TEventOutputChannel() { } - std::pair Push(IEventHandle& ev, TEventHolderPool& pool) { + std::pair Push(IEventHandle& ev, TEventHolderPool& pool, TInstant now) { TEventHolder& event = pool.Allocate(Queue); - const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr2); + const ui32 bytes = event.Fill(ev, now) + sizeof(TEventDescr2); OutputQueueSize += bytes; if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) { event.Span diff --git a/ydb/library/actors/interconnect/interconnect_counters.cpp b/ydb/library/actors/interconnect/interconnect_counters.cpp index 2311a8ac3b64..9fef5cd8171d 100644 --- a/ydb/library/actors/interconnect/interconnect_counters.cpp +++ b/ydb/library/actors/interconnect/interconnect_counters.cpp @@ -236,6 +236,10 @@ namespace { PingTimeHistogram->Collect(value); } + void UpdateIcQueueTimeHistogram(ui64 value) override { + InterconnectQueueTimeHistogram->Collect(value); + } + void UpdateOutputChannelTraffic(ui16 channel, ui64 value) override { auto& ch = GetOutputChannel(channel); if (ch.OutgoingTraffic) { @@ -317,6 +321,8 @@ namespace { PingTimeHistogram = AdaptiveCounters->GetHistogram( "PingTimeUs", NMonitoring::ExponentialHistogram(18, 2, 125)); + InterconnectQueueTimeHistogram = AdaptiveCounters->GetHistogram( + "InterconnectQueueTimeHistogramUs", NMonitoring::ExplicitHistogram({500, 1000, 5000, 10000, 50000, 100000})); } if (updateGlobal) { @@ -379,6 +385,7 @@ namespace { NMonitoring::TDynamicCounters::TCounterPtr UsefulWriteWakeups; NMonitoring::TDynamicCounters::TCounterPtr SpuriousWriteWakeups; NMonitoring::THistogramPtr PingTimeHistogram; + NMonitoring::THistogramPtr InterconnectQueueTimeHistogram; std::unordered_map OutputChannels; TOutputChannel OtherOutputChannel; @@ -580,6 +587,10 @@ namespace { PingTimeHistogram_->Record(value); } + void UpdateIcQueueTimeHistogram(ui64 value) override { + InterconnectQueueTimeHistogram_->Record(value); + } + void UpdateOutputChannelTraffic(ui16 channel, ui64 value) override { auto& ch = GetOutputChannel(channel); if (ch.OutgoingTraffic) { @@ -672,6 +683,8 @@ namespace { InflightDataAmount_ = createRate(AdaptiveMetrics_, "interconnect.inflight_data"); PingTimeHistogram_ = AdaptiveMetrics_->HistogramRate( NMonitoring::MakeLabels({{"sensor", "interconnect.ping_time_us"}}), NMonitoring::ExponentialHistogram(18, 2, 125)); + InterconnectQueueTimeHistogram_ = AdaptiveMetrics_->HistogramRate( + NMonitoring::MakeLabels({{"sensor", "interconnect.ic_queue_time_us"}}), NMonitoring::ExplicitHistogram({500, 1000, 5000, 10000, 50000, 100000})); } if (updateGlobal) { @@ -756,6 +769,7 @@ namespace { NMonitoring::IIntGauge* ClockSkewMicrosec_; NMonitoring::IHistogram* PingTimeHistogram_; + NMonitoring::IHistogram* InterconnectQueueTimeHistogram_; THashMap OutputChannels_; TOutputChannel OtherOutputChannel_; diff --git a/ydb/library/actors/interconnect/interconnect_counters.h b/ydb/library/actors/interconnect/interconnect_counters.h index ed7643975a7d..8dbd855454ba 100644 --- a/ydb/library/actors/interconnect/interconnect_counters.h +++ b/ydb/library/actors/interconnect/interconnect_counters.h @@ -41,6 +41,7 @@ class IInterconnectMetrics { virtual void IncRecvSyscalls(ui64 ns) = 0; virtual void AddTotalBytesRead(ui64 value) = 0; virtual void UpdatePingTimeHistogram(ui64 value) = 0; + virtual void UpdateIcQueueTimeHistogram(ui64 value) = 0; virtual void UpdateOutputChannelTraffic(ui16 channel, ui64 value) = 0; virtual void UpdateOutputChannelEvents(ui16 channel) = 0; virtual void SetUtilization(ui32 total, ui32 starvation) = 0; diff --git a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp index f0397b5cee6c..de142c142a6e 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp @@ -151,7 +151,9 @@ namespace NActors { auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); - const auto [dataSize, event] = oChannel.Push(*ev, *Pool); + TInstant now = TlsActivationContext->Now(); + + const auto [dataSize, event] = oChannel.Push(*ev, *Pool, now); LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); TotalOutputQueueSize += dataSize; diff --git a/ydb/library/actors/interconnect/packet.cpp b/ydb/library/actors/interconnect/packet.cpp index 16acb1f9bce1..ffc3f8f47c72 100644 --- a/ydb/library/actors/interconnect/packet.cpp +++ b/ydb/library/actors/interconnect/packet.cpp @@ -1,4 +1,5 @@ #include "packet.h" +#include "interconnect_counters.h" #include @@ -6,6 +7,14 @@ LWTRACE_USING(ACTORLIB_PROVIDER); +TTcpPacketOutTask::TTcpPacketOutTask(const TSessionParams& params, NInterconnect::TOutgoingStream& outgoingStream, + NInterconnect::TOutgoingStream& xdcStream) + : Params(params) + , OutgoingStream(outgoingStream) + , XdcStream(xdcStream) + , HeaderBookmark(OutgoingStream.Bookmark(sizeof(TTcpPacketHeader_v2))) +{} + ui32 TEventHolder::Fill(IEventHandle& ev) { Serial = 0; Descr.Type = ev.Type; @@ -31,3 +40,8 @@ ui32 TEventHolder::Fill(IEventHandle& ev) { return EventSerializedSize; } + +ui32 TEventHolder::Fill(IEventHandle& ev, TInstant now) { + EnqueueTime = now; + return Fill(ev); +} \ No newline at end of file diff --git a/ydb/library/actors/interconnect/packet.h b/ydb/library/actors/interconnect/packet.h index ae85be308da2..4f4a5539f9de 100644 --- a/ydb/library/actors/interconnect/packet.h +++ b/ydb/library/actors/interconnect/packet.h @@ -100,8 +100,10 @@ struct TEventHolder : TNonCopyable { mutable NLWTrace::TOrbit Orbit; NWilson::TSpan Span; ui32 ZcTransferId; //id of zero copy transfer. In case of RDMA it is a place where some internal handle can be stored to identify events + TInstant EnqueueTime; ui32 Fill(IEventHandle& ev); + ui32 Fill(IEventHandle& ev, TInstant now); void InitChecksum() { Descr.Checksum = 0; @@ -135,6 +137,7 @@ struct TEventHolder : TNonCopyable { namespace NActors { class TEventOutputChannel; + class IInterconnectMetrics; } struct TTcpPacketOutTask : TNonCopyable { @@ -142,6 +145,7 @@ struct TTcpPacketOutTask : TNonCopyable { NInterconnect::TOutgoingStream& OutgoingStream; NInterconnect::TOutgoingStream& XdcStream; NInterconnect::TOutgoingStream::TBookmark HeaderBookmark; + ui32 InternalSize = 0; ui32 ExternalSize = 0; @@ -153,12 +157,7 @@ struct TTcpPacketOutTask : TNonCopyable { ui32 ExternalChecksum = 0; TTcpPacketOutTask(const TSessionParams& params, NInterconnect::TOutgoingStream& outgoingStream, - NInterconnect::TOutgoingStream& xdcStream) - : Params(params) - , OutgoingStream(outgoingStream) - , XdcStream(xdcStream) - , HeaderBookmark(OutgoingStream.Bookmark(sizeof(TTcpPacketHeader_v2))) - {} + NInterconnect::TOutgoingStream& xdcStream); // Preallocate some space to fill it later. NInterconnect::TOutgoingStream::TBookmark Bookmark(size_t len) { diff --git a/ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp b/ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp index 968c8d109bd9..88431b8998c1 100644 --- a/ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto ev = MakeHolder(1, 0, TActorId(), TActorId(), MakeIntrusive(payload, TEventSerializationInfo{}), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); - ch.Push(*ev, pool); + ch.Push(*ev, pool, TInstant::Zero()/*Do not account time outside AS*/); if (!wasWorking) { scheduler.AddToHeap(ch, 0); }