Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,17 @@ static TInterconnectSettings GetInterconnectSettings(const NKikimrConfig::TInter
result.EventDelay = TDuration::MicroSeconds(config.GetEventDelayMicrosec());
}

if (config.HasSocketSendOptimization()) {
switch (config.GetSocketSendOptimization()) {
case NKikimrConfig::TInterconnectConfig::IC_SO_DISABLED:
result.SocketSendOptimization = ESocketSendOptimization::DISABLED;
break;
case NKikimrConfig::TInterconnectConfig::IC_SO_MSG_ZEROCOPY:
result.SocketSendOptimization = ESocketSendOptimization::IC_MSG_ZEROCOPY;
break;
}
}

return result;
}

Expand Down
8 changes: 7 additions & 1 deletion ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ message TInterconnectConfig {
REQUIRED = 2;
};

enum ESocketSendOptimization {
IC_SO_DISABLED = 0;
IC_SO_MSG_ZEROCOPY = 1;
};

repeated TChannel Channel = 1;
optional bool FirstTryBeforePoll = 2; // DEPRECATED
optional bool StartTcp = 3 [default = false];
Expand Down Expand Up @@ -433,9 +438,10 @@ message TInterconnectConfig {
optional bool SuppressConnectivityCheck = 39 [default = false];
optional uint32 PreallocatedBufferSize = 40;
optional uint32 NumPreallocatedBuffers = 41;
optional bool EnableExternalDataChannel = 42;
optional bool EnableExternalDataChannel = 42 [default = false];
optional bool ValidateIncomingPeerViaDirectLookup = 44;
optional uint32 SocketBacklogSize = 45; // SOMAXCONN if not set or zero
optional ESocketSendOptimization SocketSendOptimization = 51 [default = IC_SO_DISABLED];

// ballast is added to IC handshake frames to ensure correctness of jumbo frames transmission over network
optional uint32 HandshakeBallastSize = 14;
Expand Down
8 changes: 3 additions & 5 deletions ydb/library/actors/interconnect/channel_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace NActors {
std::array<std::optional<TEventOutputChannel>, 16> ChannelArray;
THashMap<ui16, TEventOutputChannel> ChannelMap;
std::shared_ptr<IInterconnectMetrics> Metrics;
TEventHolderPool& Pool;
const ui32 MaxSerializedEventSize;
const TSessionParams Params;

Expand All @@ -29,11 +28,10 @@ namespace NActors {

public:
TChannelScheduler(ui32 peerNodeId, const TChannelsConfig& predefinedChannels,
std::shared_ptr<IInterconnectMetrics> metrics, TEventHolderPool& pool, ui32 maxSerializedEventSize,
std::shared_ptr<IInterconnectMetrics> metrics, ui32 maxSerializedEventSize,
TSessionParams params)
: PeerNodeId(peerNodeId)
, Metrics(std::move(metrics))
, Pool(pool)
, MaxSerializedEventSize(maxSerializedEventSize)
, Params(std::move(params))
{
Expand Down Expand Up @@ -72,15 +70,15 @@ namespace NActors {
if (channel < ChannelArray.size()) {
auto& res = ChannelArray[channel];
if (Y_UNLIKELY(!res)) {
res.emplace(Pool, channel, PeerNodeId, MaxSerializedEventSize, Metrics,
res.emplace(channel, PeerNodeId, MaxSerializedEventSize, Metrics,
Params);
}
return *res;
} else {
auto it = ChannelMap.find(channel);
if (Y_UNLIKELY(it == ChannelMap.end())) {
it = ChannelMap.emplace(std::piecewise_construct, std::forward_as_tuple(channel),
std::forward_as_tuple(Pool, channel, PeerNodeId, MaxSerializedEventSize,
std::forward_as_tuple(channel, PeerNodeId, MaxSerializedEventSize,
Metrics, Params)).first;
}
return it->second;
Expand Down
27 changes: 18 additions & 9 deletions ydb/library/actors/interconnect/interconnect_channel.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "interconnect_channel.h"
#include "interconnect_zc_processor.h"

#include <ydb/library/actors/core/events.h>
#include <ydb/library/actors/core/executor_thread.h>
Expand Down Expand Up @@ -56,10 +57,10 @@ namespace NActors {
return true;
}

void TEventOutputChannel::DropConfirmed(ui64 confirm) {
void TEventOutputChannel::DropConfirmed(ui64 confirm, TEventHolderPool& pool) {
LOG_DEBUG_IC_SESSION("ICOCH98", "Dropping confirmed messages");
for (auto it = NotYetConfirmed.begin(); it != NotYetConfirmed.end() && it->Serial <= confirm; ) {
Pool.Release(NotYetConfirmed, it++);
pool.Release(NotYetConfirmed, it++);
}
}

Expand Down Expand Up @@ -185,7 +186,7 @@ namespace NActors {
if (allowCopy && (reinterpret_cast<uintptr_t>(data) & 63) + len <= 64) {
task.Write<External>(data, len);
} else {
task.Append<External>(data, len);
task.Append<External>(data, len, &event.ZcTransferId);
}
*bytesSerialized += len;
Y_DEBUG_ABORT_UNLESS(len <= PartLenRemain);
Expand Down Expand Up @@ -314,17 +315,19 @@ namespace NActors {
};
char *ptr = reinterpret_cast<char*>(part + 1);
*ptr++ = static_cast<ui8>(EXdcCommand::PUSH_DATA);
*reinterpret_cast<ui16*>(ptr) = bytesSerialized;

WriteUnaligned<ui16>(ptr, bytesSerialized);
ptr += sizeof(ui16);
if (task.ChecksummingXxhash()) {
XXH3_state_t state;
XXH3_64bits_reset(&state);
task.XdcStream.ScanLastBytes(bytesSerialized, [&state](TContiguousSpan span) {
XXH3_64bits_update(&state, span.data(), span.size());
});
*reinterpret_cast<ui32*>(ptr) = XXH3_64bits_digest(&state);
const ui32 cs = XXH3_64bits_digest(&state);
WriteUnaligned<ui32>(ptr, cs);
} else if (task.ChecksummingCrc32c()) {
*reinterpret_cast<ui32*>(ptr) = task.ExternalChecksum;
WriteUnaligned<ui32>(ptr, task.ExternalChecksum);
}

task.WriteBookmark(std::move(partBookmark), buffer, partSize);
Expand All @@ -335,7 +338,7 @@ namespace NActors {
return complete;
}

void TEventOutputChannel::NotifyUndelivered() {
void TEventOutputChannel::ProcessUndelivered(TEventHolderPool& pool, NInterconnect::IZcGuard* zg) {
LOG_DEBUG_IC_SESSION("ICOCH89", "Notyfying about Undelivered messages! NotYetConfirmed size: %zu, Queue size: %zu", NotYetConfirmed.size(), Queue.size());
if (State == EState::BODY && Queue.front().Event) {
Y_ABORT_UNLESS(!Chunker.IsComplete()); // chunk must have an event being serialized
Expand All @@ -350,11 +353,17 @@ namespace NActors {
item.ForwardOnNondelivery(true);
}
}
Pool.Release(NotYetConfirmed);

// Events in the NotYetConfirmed may be actualy not sended by kernel.
// In case of enabled ZC we need to wait kernel send task to be completed before reusing buffers
if (zg) {
zg->ExtractToSafeTermination(NotYetConfirmed);
}
pool.Release(NotYetConfirmed);
for (auto& item : Queue) {
item.ForwardOnNondelivery(false);
}
Pool.Release(Queue);
pool.Release(Queue);
}

}
16 changes: 9 additions & 7 deletions ydb/library/actors/interconnect/interconnect_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
#include "packet.h"
#include "event_holder_pool.h"

namespace NInterconnect {
class IZcGuard;
}

namespace NActors {
#pragma pack(push, 1)

Expand Down Expand Up @@ -59,10 +63,9 @@ namespace NActors {

class TEventOutputChannel : public TInterconnectLoggingBase {
public:
TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize,
TEventOutputChannel(ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize,
std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params)
: TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId))
, Pool(pool)
, PeerNodeId(peerNodeId)
, ChannelId(id)
, Metrics(std::move(metrics))
Expand All @@ -73,8 +76,8 @@ namespace NActors {
~TEventOutputChannel() {
}

std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) {
TEventHolder& event = Pool.Allocate(Queue);
std::pair<ui32, TEventHolder*> Push(IEventHandle& ev, TEventHolderPool& pool) {
TEventHolder& event = pool.Allocate(Queue);
const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr2);
OutputQueueSize += bytes;
if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) {
Expand All @@ -85,7 +88,7 @@ namespace NActors {
return std::make_pair(bytes, &event);
}

void DropConfirmed(ui64 confirm);
void DropConfirmed(ui64 confirm, TEventHolderPool& pool);

bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed);

Expand All @@ -105,9 +108,8 @@ namespace NActors {
return OutputQueueSize;
}

void NotifyUndelivered();
void ProcessUndelivered(TEventHolderPool& pool, NInterconnect::IZcGuard* zg);

TEventHolderPool& Pool;
const ui32 PeerNodeId;
const ui16 ChannelId;
std::shared_ptr<IInterconnectMetrics> Metrics;
Expand Down
8 changes: 7 additions & 1 deletion ydb/library/actors/interconnect/interconnect_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ namespace NActors {
REQUIRED, // encryption is mandatory
};

enum class ESocketSendOptimization {
DISABLED,
IC_MSG_ZEROCOPY,
};

struct TInterconnectSettings {
TDuration Handshake;
TDuration DeadPeer;
Expand All @@ -48,13 +53,14 @@ namespace NActors {
ui32 MaxSerializedEventSize = NActors::EventMaxByteSize;
ui32 PreallocatedBufferSize = 8 << 10; // 8 KB
ui32 NumPreallocatedBuffers = 16;
bool EnableExternalDataChannel = false;
bool EnableExternalDataChannel = true;
bool ValidateIncomingPeerViaDirectLookup = false;
ui32 SocketBacklogSize = 0; // SOMAXCONN if zero
TDuration FirstErrorSleep = TDuration::MilliSeconds(10);
TDuration MaxErrorSleep = TDuration::Seconds(1);
double ErrorSleepRetryMultiplier = 4.0;
TDuration EventDelay = TDuration::Zero();
ESocketSendOptimization SocketSendOptimization = ESocketSendOptimization::DISABLED;
};

struct TWhiteboardSessionStatus {
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/actors/interconnect/interconnect_mon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ namespace NInterconnect {
}
TABLED() { str << kv.second.TotalOutputQueueSize; }
TABLED() { str << (kv.second.Connected ? "yes" : "<strong>no</strong>"); }
TABLED() { str << (kv.second.ExternalDataChannel ? "yes" : "no"); }
TABLED() { str << (kv.second.ExternalDataChannel ? "yes" : "no")
<< " (" << (kv.second.XDCFlags & TInterconnectProxyTCP::TProxyStats::XDCFlags::MSG_ZERO_COPY_SEND ? "MSG_ZC_SEND" : "_") << ")"; }
TABLED() { str << kv.second.Host; }
TABLED() { str << kv.second.Port; }
TABLED() {
Expand Down
18 changes: 17 additions & 1 deletion ydb/library/actors/interconnect/interconnect_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,29 @@ namespace NInterconnect {

ssize_t
TStreamSocket::Send(const void* msg, size_t len, TString* /*err*/) const {
const auto ret = ::send(Descriptor, static_cast<const char*>(msg), int(len), 0);
return SendWithFlags(msg, len, 0);
}

ssize_t
TStreamSocket::SendWithFlags(const void* msg, size_t len, int flags) const {
const auto ret = ::send(Descriptor, static_cast<const char*>(msg), int(len), flags);
if (ret < 0)
return -LastSocketError();

return ret;
}

#if defined(__linux__)
ssize_t
TStreamSocket::RecvErrQueue(struct msghdr* msg) const {
const auto ret = ::recvmsg(Descriptor, msg, MSG_ERRQUEUE);
if (ret < 0)
return -LastSocketError();

return ret;
}
#endif

ssize_t
TStreamSocket::Recv(void* buf, size_t len, TString* /*err*/) const {
const auto ret = ::recv(Descriptor, static_cast<char*>(buf), int(len), 0);
Expand Down
5 changes: 5 additions & 0 deletions ydb/library/actors/interconnect/interconnect_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ namespace NInterconnect {
virtual ssize_t WriteV(const struct iovec* iov, int iovcnt) const;
virtual ssize_t ReadV(const struct iovec* iov, int iovcnt) const;

ssize_t SendWithFlags(const void* msg, size_t len, int flags) const;
#if defined(__linux__)
ssize_t RecvErrQueue(struct msghdr* msg) const;
#endif

int Connect(const TAddress& addr) const;
int Connect(const NAddr::IRemoteAddr* addr) const;
int Listen(int backlog) const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,14 +575,14 @@ namespace NActors {
throw TExDestroySession{TDisconnectReason::FormatError()};
}

auto size = *reinterpret_cast<const ui16*>(ptr);
const ui16 size = ReadUnaligned<ui16>(ptr);
if (!size) {
LOG_CRIT_IC_SESSION("ICIS03", "XDC empty payload");
throw TExDestroySession{TDisconnectReason::FormatError()};
}

if (!Params.Encryption) {
const ui32 checksumExpected = *reinterpret_cast<const ui32*>(ptr + sizeof(ui16));
const ui32 checksumExpected = ReadUnaligned<ui32>(ptr + sizeof(ui16));
XdcChecksumQ.emplace_back(size, checksumExpected);
}

Expand Down
10 changes: 9 additions & 1 deletion ydb/library/actors/interconnect/interconnect_tcp_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,15 @@ namespace NActors {
stats.LastSessionDieTime = LastSessionDieTime;
stats.TotalOutputQueueSize = Session ? Session->TotalOutputQueueSize : 0;
stats.Connected = Session ? (bool)Session->Socket : false;
stats.ExternalDataChannel = Session && Session->XdcSocket;
if (Session) {
if (const auto xdcFlags = Session->GetXDCFlags()) {
stats.ExternalDataChannel = true;
stats.XDCFlags = *xdcFlags;
} else {
stats.ExternalDataChannel = false;
stats.XDCFlags = 0;
}
}
stats.Host = TechnicalPeerHostName;
stats.Port = 0;
ui32 rep = 0;
Expand Down
5 changes: 5 additions & 0 deletions ydb/library/actors/interconnect/interconnect_tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ namespace NActors {
TDuration Ping;
i64 ClockSkew;
TString Encryption;
enum XDCFlags {
NONE = 0,
MSG_ZERO_COPY_SEND = 1,
};
ui8 XDCFlags;
};

struct TEvStats : TEventLocal<TEvStats, EvStats> {
Expand Down
Loading
Loading