Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ydb/library/actors/interconnect/interconnect_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ namespace NActors {
switch (State) {
case EState::INITIAL:
event.InitChecksum();
if (event.EnqueueTime) {
TDuration duration = NActors::TlsActivationContext->Now() - event.EnqueueTime;
Metrics->UpdateIcQueueTimeHistogram(duration.MicroSeconds());
}
event.Span && event.Span.Event("FeedBuf:INITIAL");
if (event.Buffer) {
State = EState::BODY;
Iter = event.Buffer->GetBeginIter();
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/actors/interconnect/interconnect_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ namespace NActors {
~TEventOutputChannel() {
}

std::pair<ui32, TEventHolder*> Push(IEventHandle& ev, TEventHolderPool& pool) {
std::pair<ui32, TEventHolder*> Push(IEventHandle& ev, TEventHolderPool& pool, TInstant now) {
TEventHolder& event = pool.Allocate(Queue);
const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr2);
const ui32 bytes = event.Fill(ev, now) + sizeof(TEventDescr2);
OutputQueueSize += bytes;
if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) {
event.Span
Expand Down
14 changes: 14 additions & 0 deletions ydb/library/actors/interconnect/interconnect_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ namespace {
PingTimeHistogram->Collect(value);
}

void UpdateIcQueueTimeHistogram(ui64 value) override {
InterconnectQueueTimeHistogram->Collect(value);
}

void UpdateOutputChannelTraffic(ui16 channel, ui64 value) override {
auto& ch = GetOutputChannel(channel);
if (ch.OutgoingTraffic) {
Expand Down Expand Up @@ -317,6 +321,8 @@ namespace {

PingTimeHistogram = AdaptiveCounters->GetHistogram(
"PingTimeUs", NMonitoring::ExponentialHistogram(18, 2, 125));
InterconnectQueueTimeHistogram = AdaptiveCounters->GetHistogram(
"InterconnectQueueTimeHistogramUs", NMonitoring::ExplicitHistogram({500, 1000, 5000, 10000, 50000, 100000}));
}

if (updateGlobal) {
Expand Down Expand Up @@ -379,6 +385,7 @@ namespace {
NMonitoring::TDynamicCounters::TCounterPtr UsefulWriteWakeups;
NMonitoring::TDynamicCounters::TCounterPtr SpuriousWriteWakeups;
NMonitoring::THistogramPtr PingTimeHistogram;
NMonitoring::THistogramPtr InterconnectQueueTimeHistogram;

std::unordered_map<ui16, TOutputChannel> OutputChannels;
TOutputChannel OtherOutputChannel;
Expand Down Expand Up @@ -580,6 +587,10 @@ namespace {
PingTimeHistogram_->Record(value);
}

void UpdateIcQueueTimeHistogram(ui64 value) override {
InterconnectQueueTimeHistogram_->Record(value);
}

void UpdateOutputChannelTraffic(ui16 channel, ui64 value) override {
auto& ch = GetOutputChannel(channel);
if (ch.OutgoingTraffic) {
Expand Down Expand Up @@ -672,6 +683,8 @@ namespace {
InflightDataAmount_ = createRate(AdaptiveMetrics_, "interconnect.inflight_data");
PingTimeHistogram_ = AdaptiveMetrics_->HistogramRate(
NMonitoring::MakeLabels({{"sensor", "interconnect.ping_time_us"}}), NMonitoring::ExponentialHistogram(18, 2, 125));
InterconnectQueueTimeHistogram_ = AdaptiveMetrics_->HistogramRate(
NMonitoring::MakeLabels({{"sensor", "interconnect.ic_queue_time_us"}}), NMonitoring::ExplicitHistogram({500, 1000, 5000, 10000, 50000, 100000}));
}

if (updateGlobal) {
Expand Down Expand Up @@ -756,6 +769,7 @@ namespace {
NMonitoring::IIntGauge* ClockSkewMicrosec_;

NMonitoring::IHistogram* PingTimeHistogram_;
NMonitoring::IHistogram* InterconnectQueueTimeHistogram_;

THashMap<ui16, TOutputChannel> OutputChannels_;
TOutputChannel OtherOutputChannel_;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/interconnect/interconnect_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class IInterconnectMetrics {
virtual void IncRecvSyscalls(ui64 ns) = 0;
virtual void AddTotalBytesRead(ui64 value) = 0;
virtual void UpdatePingTimeHistogram(ui64 value) = 0;
virtual void UpdateIcQueueTimeHistogram(ui64 value) = 0;
virtual void UpdateOutputChannelTraffic(ui16 channel, ui64 value) = 0;
virtual void UpdateOutputChannelEvents(ui16 channel) = 0;
virtual void SetUtilization(ui32 total, ui32 starvation) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ namespace NActors {
auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
const bool wasWorking = oChannel.IsWorking();

const auto [dataSize, event] = oChannel.Push(*ev, *Pool);
TInstant now = TlsActivationContext->Now();

const auto [dataSize, event] = oChannel.Push(*ev, *Pool, now);
LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);

TotalOutputQueueSize += dataSize;
Expand Down
14 changes: 14 additions & 0 deletions ydb/library/actors/interconnect/packet.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
#include "packet.h"
#include "interconnect_counters.h"

#include <ydb/library/actors/core/probes.h>

#include <util/system/datetime.h>

LWTRACE_USING(ACTORLIB_PROVIDER);

TTcpPacketOutTask::TTcpPacketOutTask(const TSessionParams& params, NInterconnect::TOutgoingStream& outgoingStream,
NInterconnect::TOutgoingStream& xdcStream)
: Params(params)
, OutgoingStream(outgoingStream)
, XdcStream(xdcStream)
, HeaderBookmark(OutgoingStream.Bookmark(sizeof(TTcpPacketHeader_v2)))
{}

ui32 TEventHolder::Fill(IEventHandle& ev) {
Serial = 0;
Descr.Type = ev.Type;
Expand All @@ -31,3 +40,8 @@ ui32 TEventHolder::Fill(IEventHandle& ev) {

return EventSerializedSize;
}

ui32 TEventHolder::Fill(IEventHandle& ev, TInstant now) {
EnqueueTime = now;
return Fill(ev);
}
11 changes: 5 additions & 6 deletions ydb/library/actors/interconnect/packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ struct TEventHolder : TNonCopyable {
mutable NLWTrace::TOrbit Orbit;
NWilson::TSpan Span;
ui32 ZcTransferId; //id of zero copy transfer. In case of RDMA it is a place where some internal handle can be stored to identify events
TInstant EnqueueTime;

ui32 Fill(IEventHandle& ev);
ui32 Fill(IEventHandle& ev, TInstant now);

void InitChecksum() {
Descr.Checksum = 0;
Expand Down Expand Up @@ -135,13 +137,15 @@ struct TEventHolder : TNonCopyable {

namespace NActors {
class TEventOutputChannel;
class IInterconnectMetrics;
}

struct TTcpPacketOutTask : TNonCopyable {
const TSessionParams& Params;
NInterconnect::TOutgoingStream& OutgoingStream;
NInterconnect::TOutgoingStream& XdcStream;
NInterconnect::TOutgoingStream::TBookmark HeaderBookmark;

ui32 InternalSize = 0;
ui32 ExternalSize = 0;

Expand All @@ -153,12 +157,7 @@ struct TTcpPacketOutTask : TNonCopyable {
ui32 ExternalChecksum = 0;

TTcpPacketOutTask(const TSessionParams& params, NInterconnect::TOutgoingStream& outgoingStream,
NInterconnect::TOutgoingStream& xdcStream)
: Params(params)
, OutgoingStream(outgoingStream)
, XdcStream(xdcStream)
, HeaderBookmark(OutgoingStream.Bookmark(sizeof(TTcpPacketHeader_v2)))
{}
NInterconnect::TOutgoingStream& xdcStream);

// Preallocate some space to fill it later.
NInterconnect::TOutgoingStream::TBookmark Bookmark(size_t len) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
ch.Push(*ev, pool);
ch.Push(*ev, pool, TInstant::Zero()/*Do not account time outside AS*/);
if (!wasWorking) {
scheduler.AddToHeap(ch, 0);
}
Expand Down
Loading