From 004e2675aab3ae702ab32d96153a0d68607407db Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 29 Apr 2025 16:47:29 +0200 Subject: [PATCH 1/2] Fix windows compatibility. MSG_ZEROCOPY related code works only on linux. --- .../interconnect_zc_processor.cpp | 56 ++++++++++++++----- .../actors/interconnect/ut_fat/main.cpp | 4 ++ 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/ydb/library/actors/interconnect/interconnect_zc_processor.cpp b/ydb/library/actors/interconnect/interconnect_zc_processor.cpp index 767f9d533d93..b9dc26e1025d 100644 --- a/ydb/library/actors/interconnect/interconnect_zc_processor.cpp +++ b/ydb/library/actors/interconnect/interconnect_zc_processor.cpp @@ -145,6 +145,7 @@ size_t AdjustLen(std::span wbuf, std::span wbuf, TStre return r; } -TInterconnectZcProcessor::TInterconnectZcProcessor(bool enabled) - : ZcState(enabled ? ZC_OK : ZC_DISABLED) -{} - TString TInterconnectZcProcessor::GetCurrentStateName() const { switch (ZcState) { case ZC_DISABLED: @@ -288,8 +290,23 @@ TString TInterconnectZcProcessor::ExtractErrText() { } } +void TInterconnectZcProcessor::AddErr(const TString& err) { + if (LastErr) { + LastErr.reserve(err.size() + 2); + LastErr += ", "; + LastErr += err; + } else { + LastErr = err; + } +} + /////////////////////////////////////////////////////////////////////////////// +#ifdef YDB_MSG_ZEROCOPY_SUPPORTED +TInterconnectZcProcessor::TInterconnectZcProcessor(bool enabled) + : ZcState(enabled ? ZC_OK : ZC_DISABLED) +{} + // Guard part. // We must guarantee liveness of buffers used for zc // until enqueued zc operation completed by kernel @@ -394,20 +411,33 @@ class TGuardRunner : public IZcGuard { std::list Delayed; }; -void TInterconnectZcProcessor::AddErr(const TString& err) { - if (LastErr) { - LastErr.reserve(err.size() + 2); - LastErr += ", "; - LastErr += err; - } else { - LastErr = err; - } +std::unique_ptr TInterconnectZcProcessor::GetGuard() +{ + return std::make_unique(SendAsZc, Confirmed); } +#else +TInterconnectZcProcessor::TInterconnectZcProcessor(bool) + : ZcState(ZC_DISABLED) +{} + +class TDummyGuardRunner : public IZcGuard { +public: + TDummyGuardRunner(ui64 uncompleted, ui64 confirmed) + { + Y_UNUSED(uncompleted); + Y_UNUSED(confirmed); + } + + void ExtractToSafeTermination(std::list&) noexcept override {} + void Terminate(std::unique_ptr&&, TIntrusivePtr, const NActors::TActorContext&) override {} +}; std::unique_ptr TInterconnectZcProcessor::GetGuard() { - return std::make_unique(SendAsZc, Confirmed); + return std::make_unique(SendAsZc, Confirmed); } +#endif + } diff --git a/ydb/library/actors/interconnect/ut_fat/main.cpp b/ydb/library/actors/interconnect/ut_fat/main.cpp index 36dcc9afd5a1..a765a8724e20 100644 --- a/ydb/library/actors/interconnect/ut_fat/main.cpp +++ b/ydb/library/actors/interconnect/ut_fat/main.cpp @@ -212,6 +212,10 @@ Y_UNIT_TEST_SUITE(InterconnectZcLocalOp) { NanoSleep(5ULL * 1000 * 1000 * 1000); // Zero copy send via loopback causes hidden copy inside linux kernel +#if defined (__linux__) UNIT_ASSERT_VALUES_EQUAL("DisabledHiddenCopy", GetZcState(testCluster, 1, 2)); +#else + UNIT_ASSERT_VALUES_EQUAL("Disabled", GetZcState(testCluster, 1, 2)); +#endif } } From 432715d6c581cd5e1ebf7fc7598722e7b11a668c Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 29 Apr 2025 19:29:43 +0200 Subject: [PATCH 2/2] Only linux has ERRQUEUE --- ydb/library/actors/interconnect/interconnect_stream.cpp | 2 ++ ydb/library/actors/interconnect/interconnect_stream.h | 2 ++ 2 files changed, 4 insertions(+) diff --git a/ydb/library/actors/interconnect/interconnect_stream.cpp b/ydb/library/actors/interconnect/interconnect_stream.cpp index 6cc128613f96..9a5575bd77fb 100644 --- a/ydb/library/actors/interconnect/interconnect_stream.cpp +++ b/ydb/library/actors/interconnect/interconnect_stream.cpp @@ -125,6 +125,7 @@ namespace NInterconnect { return ret; } +#if defined(__linux__) ssize_t TStreamSocket::RecvErrQueue(struct msghdr* msg) const { const auto ret = ::recvmsg(Descriptor, msg, MSG_ERRQUEUE); @@ -133,6 +134,7 @@ namespace NInterconnect { return ret; } +#endif ssize_t TStreamSocket::Recv(void* buf, size_t len, TString* /*err*/) const { diff --git a/ydb/library/actors/interconnect/interconnect_stream.h b/ydb/library/actors/interconnect/interconnect_stream.h index f5bbb1c184b9..0189896b73e0 100644 --- a/ydb/library/actors/interconnect/interconnect_stream.h +++ b/ydb/library/actors/interconnect/interconnect_stream.h @@ -60,7 +60,9 @@ namespace NInterconnect { virtual ssize_t ReadV(const struct iovec* iov, int iovcnt) const; ssize_t SendWithFlags(const void* msg, size_t len, int flags) const; +#if defined(__linux__) ssize_t RecvErrQueue(struct msghdr* msg) const; +#endif int Connect(const TAddress& addr) const; int Connect(const NAddr::IRemoteAddr* addr) const;