Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/library/actors/interconnect/interconnect_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ namespace NInterconnect {
return ret;
}

#if defined(__linux__)
ssize_t
TStreamSocket::RecvErrQueue(struct msghdr* msg) const {
const auto ret = ::recvmsg(Descriptor, msg, MSG_ERRQUEUE);
Expand All @@ -133,6 +134,7 @@ namespace NInterconnect {

return ret;
}
#endif

ssize_t
TStreamSocket::Recv(void* buf, size_t len, TString* /*err*/) const {
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/actors/interconnect/interconnect_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ namespace NInterconnect {
virtual ssize_t ReadV(const struct iovec* iov, int iovcnt) const;

ssize_t SendWithFlags(const void* msg, size_t len, int flags) const;
#if defined(__linux__)
ssize_t RecvErrQueue(struct msghdr* msg) const;
#endif

int Connect(const TAddress& addr) const;
int Connect(const NAddr::IRemoteAddr* addr) const;
Expand Down
56 changes: 43 additions & 13 deletions ydb/library/actors/interconnect/interconnect_zc_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ size_t AdjustLen(std::span<const TConstIoVec> wbuf, std::span<const TOutgoingStr
}

void TInterconnectZcProcessor::DoProcessNotification(NInterconnect::TStreamSocket& socket) {
#ifdef YDB_MSG_ZEROCOPY_SUPPORTED
const TProcessErrQueueResult res = DoProcessErrQueue(socket);

std::visit(TOverloaded{
Expand Down Expand Up @@ -172,6 +173,9 @@ void TInterconnectZcProcessor::DoProcessNotification(NInterconnect::TStreamSocke
AddErr("Hidden copy during ZC operation");
ZcState = ZC_DISABLED_HIDDEN_COPY;
}
#else
Y_UNUSED(socket);
#endif
}

void TInterconnectZcProcessor::ApplySocketOption(NInterconnect::TStreamSocket& socket)
Expand All @@ -186,6 +190,8 @@ void TInterconnectZcProcessor::ApplySocketOption(NInterconnect::TStreamSocket& s
ResetState();
}
}
#else
Y_UNUSED(socket);
#endif
}

Expand Down Expand Up @@ -259,10 +265,6 @@ ssize_t TInterconnectZcProcessor::ProcessSend(std::span<TConstIoVec> wbuf, TStre
return r;
}

TInterconnectZcProcessor::TInterconnectZcProcessor(bool enabled)
: ZcState(enabled ? ZC_OK : ZC_DISABLED)
{}

TString TInterconnectZcProcessor::GetCurrentStateName() const {
switch (ZcState) {
case ZC_DISABLED:
Expand All @@ -288,8 +290,23 @@ TString TInterconnectZcProcessor::ExtractErrText() {
}
}

void TInterconnectZcProcessor::AddErr(const TString& err) {
if (LastErr) {
LastErr.reserve(err.size() + 2);
LastErr += ", ";
LastErr += err;
} else {
LastErr = err;
}
}

///////////////////////////////////////////////////////////////////////////////

#ifdef YDB_MSG_ZEROCOPY_SUPPORTED
TInterconnectZcProcessor::TInterconnectZcProcessor(bool enabled)
: ZcState(enabled ? ZC_OK : ZC_DISABLED)
{}

// Guard part.
// We must guarantee liveness of buffers used for zc
// until enqueued zc operation completed by kernel
Expand Down Expand Up @@ -394,20 +411,33 @@ class TGuardRunner : public IZcGuard {
std::list<TEventHolder> Delayed;
};

void TInterconnectZcProcessor::AddErr(const TString& err) {
if (LastErr) {
LastErr.reserve(err.size() + 2);
LastErr += ", ";
LastErr += err;
} else {
LastErr = err;
}
std::unique_ptr<IZcGuard> TInterconnectZcProcessor::GetGuard()
{
return std::make_unique<TGuardRunner>(SendAsZc, Confirmed);
}

#else
TInterconnectZcProcessor::TInterconnectZcProcessor(bool)
: ZcState(ZC_DISABLED)
{}

class TDummyGuardRunner : public IZcGuard {
public:
TDummyGuardRunner(ui64 uncompleted, ui64 confirmed)
{
Y_UNUSED(uncompleted);
Y_UNUSED(confirmed);
}

void ExtractToSafeTermination(std::list<TEventHolder>&) noexcept override {}
void Terminate(std::unique_ptr<NActors::TEventHolderPool>&&, TIntrusivePtr<NInterconnect::TStreamSocket>, const NActors::TActorContext&) override {}
};

std::unique_ptr<IZcGuard> TInterconnectZcProcessor::GetGuard()
{
return std::make_unique<TGuardRunner>(SendAsZc, Confirmed);
return std::make_unique<TDummyGuardRunner>(SendAsZc, Confirmed);
}

#endif

}
4 changes: 4 additions & 0 deletions ydb/library/actors/interconnect/ut_fat/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ Y_UNIT_TEST_SUITE(InterconnectZcLocalOp) {

NanoSleep(5ULL * 1000 * 1000 * 1000);
// Zero copy send via loopback causes hidden copy inside linux kernel
#if defined (__linux__)
UNIT_ASSERT_VALUES_EQUAL("DisabledHiddenCopy", GetZcState(testCluster, 1, 2));
#else
UNIT_ASSERT_VALUES_EQUAL("Disabled", GetZcState(testCluster, 1, 2));
#endif
}
}
Loading