diff --git a/setup.py b/setup.py index 072166f1d..712e61b1c 100755 --- a/setup.py +++ b/setup.py @@ -545,6 +545,11 @@ def define_DSOS(self): src_pvxs = [os.path.join('src', src) for src in src_pvxs] + if OS_CLASS=='WIN32': + src_pvxs += ['src/os/WIN32/osdSockExt.cpp'] + else: + src_pvxs += ['src/os/default/osdSockExt.cpp'] + event_libs = [] if OS_CLASS=='WIN32': event_libs = ['ws2_32','shell32','advapi32','bcrypt','iphlpapi'] diff --git a/src/Makefile b/src/Makefile index a118bf728..a53d14f65 100644 --- a/src/Makefile +++ b/src/Makefile @@ -86,6 +86,8 @@ LIB_SRCS += nt.cpp LIB_SRCS += evhelper.cpp LIB_SRCS += udp_collector.cpp +LIB_SRCS += osdSockExt.cpp + LIB_SRCS += config.cpp LIB_SRCS += conn.cpp diff --git a/src/client.cpp b/src/client.cpp index faeef085a..54949ba96 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -750,14 +750,13 @@ bool ContextImpl::onSearch() { searchMsg.resize(0x10000); SockAddr src; - uint32_t ndrop = 0u; - osiSocklen_t alen = src.size(); - const int nrx = recvfromx(searchTx.sock, (char*)&searchMsg[0], searchMsg.size()-1, &src->sa, &alen, &ndrop); + recvfromx rx{searchTx.sock, (char*)&searchMsg[0], searchMsg.size()-1, &src}; + const int nrx = rx.call(); - if(nrx>=0 && ndrop!=0 && prevndrop!=ndrop) { - log_debug_printf(io, "UDP search reply buffer overflow %u -> %u\n", unsigned(prevndrop), unsigned(ndrop)); - prevndrop = ndrop; + if(nrx>=0 && rx.ndrop!=0 && prevndrop!=rx.ndrop) { + log_debug_printf(io, "UDP search reply buffer overflow %u -> %u\n", unsigned(prevndrop), unsigned(rx.ndrop)); + prevndrop = rx.ndrop; } if(nrx<0) { diff --git a/src/evhelper.cpp b/src/evhelper.cpp index 56889f06b..f0c30e2af 100644 --- a/src/evhelper.cpp +++ b/src/evhelper.cpp @@ -20,7 +20,6 @@ #include #include -#include #include #include #include @@ -44,6 +43,7 @@ namespace pvxs {namespace impl { DEFINE_LOGGER(logerr, "pvxs.loop"); DEFINE_LOGGER(logtimer, "pvxs.timer"); +DEFINE_LOGGER(logiface, "pvxs.iface"); namespace mdetail { VFunctor0::~VFunctor0() {} @@ -367,8 +367,16 @@ bool evbase::assertInRunningLoop() const evsocket::evsocket(evutil_socket_t sock) :sock(sock) { - if(sock==evutil_socket_t(-1)) - throw std::bad_alloc(); + if(sock==evutil_socket_t(-1)) { + int err = SOCKERRNO; +#ifdef _WIN32 + if(err==WSANOTINITIALISED) { + throw std::runtime_error("WSANOTINITIALISED"); + } +#endif + (void)err; + throw std::runtime_error("Unable to allocate socket"); + } evutil_make_socket_closeonexec(sock); @@ -386,6 +394,19 @@ evsocket::evsocket(evutil_socket_t sock) evsocket::evsocket(int af, int type, int proto) :evsocket(socket(af, type | SOCK_CLOEXEC, proto)) { +#ifdef __linux__ +# ifndef IP_MULTICAST_ALL +# define IP_MULTICAST_ALL 49 +# endif + // Disable non-compliant legacy behavior of Linux IP stack + if(af==AF_INET && type==SOCK_DGRAM){ + int val = 0; + if(setsockopt(sock, IPPROTO_IP, IP_MULTICAST_ALL, (char*)&val, sizeof(val))) { + log_warn_printf(logerr, "Unable to clear IP_MULTICAST_ALL (err=%d). This may cause problems on multi-homed hosts.\n", + evutil_socket_geterror(sock)); + } + } +#endif } evsocket::evsocket(evsocket&& o) noexcept @@ -511,6 +532,41 @@ std::vector evsocket::broadcasts(const SockAddr* match) return ret; } +#if EPICS_VERSION_INT 60) { + refresh(); + updated = now; + } else { + log_debug_printf(logiface, "using cache age %.2f sec\n", age); + } + + auto ifit(info.find(ifindex)); + if(ifit!=info.end()) { + const auto& iface = ifit->second; + auto adit(iface.find(addr)); + return adit!=iface.end(); + } + log_warn_printf(logiface, "Encountered unknown interface index %lld\n", (long long)ifindex); + return false; +} + void to_wire(Buffer& buf, const SockAddr& val) { if(!buf.ensure(16)) { diff --git a/src/evhelper.h b/src/evhelper.h index cbb711a9e..d2fbfb181 100644 --- a/src/evhelper.h +++ b/src/evhelper.h @@ -11,6 +11,8 @@ #include #include #include +#include +#include #include #include @@ -20,6 +22,8 @@ #include #include +#include + #include "pvaproto.h" // hooks for std::unique_ptr @@ -230,6 +234,18 @@ struct PVXS_API evsocket std::vector broadcasts(const SockAddr* match=nullptr); }; +struct PVXS_API IfaceMap { + IfaceMap(); + + // return true if ifindex is valid, and addr is one of the addresses currently assigned to it. + bool has_address(int64_t ifindex, const SockAddr& addr); + + void refresh(); + + std::map> info; + epicsTime updated; +}; + } // namespace impl diff --git a/src/os/WIN32/osdSockExt.cpp b/src/os/WIN32/osdSockExt.cpp new file mode 100644 index 000000000..33d9e820e --- /dev/null +++ b/src/os/WIN32/osdSockExt.cpp @@ -0,0 +1,159 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +#include "osiSockExt.h" + +#include + +#include + +#include +#include "evhelper.h" + +#include +#include + +namespace pvxs { + +DEFINE_LOGGER(log, "pvxs.util"); +DEFINE_LOGGER(logiface, "pvxs.iface"); + +static +LPFN_WSARECVMSG WSARecvMsg; + +static +epicsThreadOnceId oseOnce = EPICS_THREAD_ONCE_INIT; + +static +void oseDoOnce(void*) +{ + evsocket dummy(AF_INET, SOCK_DGRAM, 0); + GUID guid = WSAID_WSARECVMSG; + DWORD nout; + + if(WSAIoctl(dummy.sock, SIO_GET_EXTENSION_FUNCTION_POINTER, + &guid, sizeof(guid), + &WSARecvMsg, sizeof(WSARecvMsg), + &nout, nullptr, nullptr)) + { + cantProceed("Unable to get &WSARecvMsg: %d", WSAGetLastError()); + } + if(!WSARecvMsg) + cantProceed("Unable to get &WSARecvMsg!!"); +} + +void osiSockAttachExt() +{ + osiSockAttach(); + epicsThreadOnce(&oseOnce, &oseDoOnce, nullptr); +} + +void enable_SO_RXQ_OVFL(SOCKET sock) {} + +void enable_IP_PKTINFO(SOCKET sock) +{ + int val = 1; + if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val))) + log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO); +} + +int recvfromx::call() +{ + ndrop = 0u; + dstif = -1; + + WSAMSG msg{}; + + WSABUF iov = {(ULONG)buflen, (char*)buf}; + msg.lpBuffers = &iov; + msg.dwBufferCount = 1u; + + msg.name = &(*src)->sa; + msg.namelen = src->size(); + + alignas (alignof (WSACMSGHDR)) char cbuf[WSA_CMSG_SPACE(sizeof(in_pktinfo))]; + msg.Control = {sizeof(cbuf), cbuf}; + + DWORD nrx=0u; + if(!WSARecvMsg(sock, &msg, &nrx, nullptr, nullptr)) { + if(msg.dwFlags & MSG_CTRUNC) + log_debug_printf(log, "MSG_CTRUNC %zu, %zu\n", msg.Control.len, sizeof(cbuf)); + + for(WSACMSGHDR *hdr = WSA_CMSG_FIRSTHDR(&msg); hdr ; hdr = WSA_CMSG_NXTHDR(&msg, hdr)) { + if(hdr->cmsg_level==IPPROTO_IP && hdr->cmsg_type==IP_PKTINFO && hdr->cmsg_len>=WSA_CMSG_LEN(sizeof(in_pktinfo))) { + if(dst) { + (*dst)->in.sin_family = AF_INET; + memcpy(&(*dst)->in.sin_addr, WSA_CMSG_DATA(hdr) + offsetof(in_pktinfo, ipi_addr), sizeof(IN_ADDR)); + } + + decltype(in_pktinfo::ipi_ifindex) idx; + memcpy(&idx, WSA_CMSG_DATA(hdr) + offsetof(in_pktinfo, ipi_ifindex), sizeof(idx)); + dstif = idx; + } + } + + return nrx; + + } else { + return -1; + } +} + +namespace impl { + +#ifndef GAA_FLAG_INCLUDE_ALL_INTERFACES +# define GAA_FLAG_INCLUDE_ALL_INTERFACES 0 +#endif + +void IfaceMap::refresh() { + std::vector ifaces(1024u); + decltype (info) temp; + + { + constexpr ULONG flags = GAA_FLAG_SKIP_ANYCAST|GAA_FLAG_SKIP_MULTICAST|GAA_FLAG_SKIP_DNS_SERVER|GAA_FLAG_INCLUDE_ALL_INTERFACES; + + ULONG buflen = ifaces.size(); + auto err = GetAdaptersAddresses(AF_INET, flags, 0, reinterpret_cast(ifaces.data()), &buflen); + + if(err == ERROR_BUFFER_OVERFLOW) { + // buflen updated with necessary length, retry + ifaces.resize(buflen); + + err = GetAdaptersAddresses(AF_INET, flags, 0, reinterpret_cast(ifaces.data()), &buflen); + } + + if(err) { + log_warn_printf(logiface, "Unable to GetAdaptersAddresses() error=%lld\n", (unsigned long long)err); + return; + } + } + + for(auto iface = reinterpret_cast(ifaces.data()); iface ; iface = iface->Next) { + auto& info = temp[iface->IfIndex]; + + //TODO: any flags to check? + + for(auto addr = iface->FirstUnicastAddress; addr; addr = addr->Next) { + + if(addr->Address.lpSockaddr->sa_family!=AF_INET) + continue; + + auto pair = info.emplace(addr->Address.lpSockaddr, sizeof(sockaddr_in)); + + log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %s\n", + (long long)iface->IfIndex, iface->AdapterName, pair.first->tostring().c_str()); + } + } + + info.swap(temp); +} + +} // namespace impl + +} // namespace pvxs diff --git a/src/os/default/osdSockExt.cpp b/src/os/default/osdSockExt.cpp new file mode 100644 index 000000000..36620bd87 --- /dev/null +++ b/src/os/default/osdSockExt.cpp @@ -0,0 +1,197 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include "osiSockExt.h" + +#include + +#include +#include +#include + +#ifdef __rtems__ +// missing extern C circa RTEMS 5.1 +extern "C" { +# include +} +#endif + +#include +#include + +namespace pvxs { + +DEFINE_LOGGER(log, "pvxs.util"); +DEFINE_LOGGER(logiface, "pvxs.iface"); + +void osiSockAttachExt() {} + +void enable_SO_RXQ_OVFL(SOCKET sock) +{ +#ifdef SO_RXQ_OVFL + // Linux specific feature exposes OS dropped packet count + int val = 1; + if(setsockopt(sock, SOL_SOCKET, SO_RXQ_OVFL, (char*)&val, sizeof(val))) + log_warn_printf(log, "Unable to set SO_RXQ_OVFL: %d\n", SOCKERRNO); + +#endif +} + +void enable_IP_PKTINFO(SOCKET sock) +{ + /* linux, some *BSD's (OSX), and winsock package both destination address (from ip header) + * and receiving interface index (from host) into one IP_PKTINFO control message. + * Remaining *BSD's can deliver these in separate IP_ORIGDSTADDR and IP_RECVIF messages. + */ +#ifdef IP_PKTINFO + int val = 1; + if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val))) + log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO); + +#else +# ifdef IP_ORIGDSTADDR + { + int val = 1; + if(setsockopt(sock, IPPROTO_IP, IP_ORIGDSTADDR, (char*)&val, sizeof(val))) + log_warn_printf(log, "Unable to set IP_ORIGDSTADDR: %d\n", SOCKERRNO); + } + +# endif +# ifdef IP_RECVIF + { + int val = 1; + if(setsockopt(sock, IPPROTO_IP, IP_RECVIF, (char*)&val, sizeof(val))) + log_warn_printf(log, "Unable to set IP_RECVIF: %d\n", SOCKERRNO); + } +# endif +#endif +} + +int recvfromx::call() +{ + msghdr msg{}; + + iovec iov = {buf, buflen}; + msg.msg_iov = &iov; + msg.msg_iovlen = 1u; + + msg.msg_name = &(*src)->sa; + msg.msg_namelen = src ? src->size() : 0u; + + alignas (alignof (cmsghdr)) char cbuf[0u +#ifdef SO_RXQ_OVFL + + CMSG_SPACE(sizeof(ndrop)) +#endif +#ifdef IP_PKTINFO + + CMSG_SPACE(sizeof(in_pktinfo)) +#else +# if defined(IP_ORIGDSTADDR) + + CMSG_SPACE(sizeof(sockaddr_in)) +# endif +# if defined(IP_RECVIF) + + CMSG_SPACE(sizeof(sockaddr_dl)) +# endif +#endif + ]; + msg.msg_control = cbuf; + msg.msg_controllen = sizeof(cbuf); + + if(dst) + *dst = SockAddr(); + dstif = -1; + ndrop = 0u; + + int ret = recvmsg(sock, &msg, 0); + + if(ret>=0) { // on success, check for control messages + if(msg.msg_flags & MSG_CTRUNC) + log_warn_printf(log, "MSG_CTRUNC, expand buffer %zu <- %zu\n", msg.msg_controllen, sizeof(cbuf)); + + for(cmsghdr *hdr = CMSG_FIRSTHDR(&msg); hdr ; hdr = CMSG_NXTHDR(&msg, hdr)) { + log_debug_printf(log, "XXX %d %d %zu\n", hdr->cmsg_level, hdr->cmsg_type, hdr->cmsg_len); + if(0) {} +#ifdef SO_RXQ_OVFL + else if(hdr->cmsg_level==SOL_SOCKET && hdr->cmsg_type==SO_RXQ_OVFL && hdr->cmsg_len>=CMSG_LEN(sizeof(ndrop))) { + memcpy(&ndrop, CMSG_DATA(hdr), sizeof(ndrop)); + } +#endif +#ifdef IP_PKTINFO + else if(hdr->cmsg_level==IPPROTO_IP && hdr->cmsg_type==IP_PKTINFO && hdr->cmsg_len>=CMSG_LEN(sizeof(in_pktinfo))) { + if(dst) { + (*dst)->in.sin_family = AF_INET; + memcpy(&(*dst)->in.sin_addr, CMSG_DATA(hdr) + offsetof(in_pktinfo, ipi_addr), sizeof(in_addr_t)); + } + + decltype(in_pktinfo::ipi_ifindex) idx; + memcpy(&idx, CMSG_DATA(hdr) + offsetof(in_pktinfo, ipi_ifindex), sizeof(idx)); + dstif = idx; + } + +#else +# ifdef IP_ORIGDSTADDR + else if(dst && hdr->cmsg_level==IPPROTO_IP && hdr->cmsg_type==IP_ORIGDSTADDR && hdr->cmsg_len>=CMSG_LEN(sizeof(sockaddr_in))) { + memcpy(&(*dst)->in, CMSG_DATA(hdr), sizeof(sockaddr_in)); + } +# endif +# ifdef IP_RECVIF + else if(dst && hdr->cmsg_level==IPPROTO_IP && hdr->cmsg_type==IP_RECVIF && hdr->cmsg_len>=CMSG_LEN(sizeof(sockaddr_dl))) { + decltype (sockaddr_dl::sdl_index) idx; + memcpy(&idx, CMSG_DATA(hdr) + offsetof(sockaddr_dl, sdl_index), sizeof(idx)); + dstif = idx; + } +# endif +#endif + } + } + + return ret; +} + +namespace impl { + +void IfaceMap::refresh() { + ifaddrs* addrs = nullptr; + + decltype (info) temp; + + if(getifaddrs(&addrs)) { + log_warn_printf(logiface, "Unable to getifaddrs() errno=%d\n", errno); + return; + } + + try { + for(const ifaddrs* ifa = addrs; ifa; ifa = ifa->ifa_next) { + if(ifa->ifa_addr->sa_family!=AF_INET) { + log_debug_printf(logiface, "Ignoring interface '%s' address !ipv4\n", ifa->ifa_name); + continue; + } + + auto idx(if_nametoindex(ifa->ifa_name)); + if(idx<=0) { + log_warn_printf(logiface, "Unable to find index of interface '%s'\n", ifa->ifa_name); + continue; + } + + //TODO: any flags to check? + + auto pair = temp[idx].emplace(ifa->ifa_addr, sizeof(sockaddr_in)); + + log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %s\n", + (long long)idx, ifa->ifa_name, pair.first->tostring().c_str()); + } + + } catch(...){ + freeifaddrs(addrs); + throw; + } + freeifaddrs(addrs); + + info.swap(temp); +} + +} // namespace impl + +} // namespace pvxs diff --git a/src/osiSockExt.h b/src/osiSockExt.h new file mode 100644 index 000000000..22752cc1a --- /dev/null +++ b/src/osiSockExt.h @@ -0,0 +1,116 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef OSISOCKEXT_H +#define OSISOCKEXT_H + +#include + +#include + +#include + +#include + +namespace pvxs { + +PVXS_API +void osiSockAttachExt(); + +struct SockAttach { + SockAttach() { osiSockAttachExt(); } + ~SockAttach() { osiSockRelease(); } +}; + +//! representation of a network address +struct PVXS_API SockAddr { + union store_t { + sockaddr sa; + sockaddr_in in; +#ifdef AF_INET6 + sockaddr_in6 in6; +#endif + }; +private: + store_t store; +public: + + explicit SockAddr(int af = AF_UNSPEC); + explicit SockAddr(int af, const char *address, unsigned short port=0); + explicit SockAddr(const sockaddr *addr, ev_socklen_t len); + inline explicit SockAddr(int af, const std::string& address) :SockAddr(af, address.c_str()) {} + + size_t size() const; + + inline unsigned short family() const { return store.sa.sa_family; } + unsigned short port() const; + void setPort(unsigned short port); + SockAddr withPort(unsigned short port) { + SockAddr temp(*this); + temp.setPort(port); + return temp; + } + + void setAddress(const char *, unsigned short port=0); + + bool isAny() const; + bool isLO() const; + + store_t* operator->() { return &store; } + const store_t* operator->() const { return &store; } + + std::string tostring() const; + + static SockAddr any(int af, unsigned port=0); + static SockAddr loopback(int af, unsigned port=0); + + inline int compare(const SockAddr& o, bool useport=true) const { + return evutil_sockaddr_cmp(&store.sa, &o.store.sa, useport); + } + + inline bool operator<(const SockAddr& o) const { + return evutil_sockaddr_cmp(&store.sa, &o.store.sa, true)<0; + } + inline bool operator==(const SockAddr& o) const { + return evutil_sockaddr_cmp(&store.sa, &o.store.sa, true)==0; + } + inline bool operator!=(const SockAddr& o) const { + return !(*this==o); + } +}; + +// compare address only, ignore port number +struct SockAddrOnlyLess { + bool operator()(const SockAddr& lhs, const SockAddr& rhs) const { + return lhs.compare(rhs, false)<0; + } +}; + +PVXS_API +std::ostream& operator<<(std::ostream& strm, const SockAddr& addr); + +// Linux specific include OS dropped packet counter as cmsg +void enable_SO_RXQ_OVFL(SOCKET sock); +// Include destination address as cmsg +PVXS_API +void enable_IP_PKTINFO(SOCKET sock); + +struct recvfromx { + evutil_socket_t sock; + void *buf; + size_t buflen; + SockAddr* src; + SockAddr* dst; // if enable_IP_PKTINFO() + int64_t dstif; // if enable_IP_PKTINFO(), destination interface index + uint32_t ndrop; // if enable_SO_RXQ_OVFL() + + PVXS_API + int call(); +}; + +} // namespace pvxs + +#endif // OSISOCKEXT_H diff --git a/src/udp_collector.cpp b/src/udp_collector.cpp index 92a331423..ac6a2d347 100644 --- a/src/udp_collector.cpp +++ b/src/udp_collector.cpp @@ -52,16 +52,16 @@ struct UDPCollector : public UDPManager::Search, bool handle_one() { - osiSocklen_t alen = src.size(); - uint32_t ndrop = 0u; + SockAddr dest; // For Search messages, we use PV name strings in-place by adding nils. // Ensure one extra byte at the end of the buffer for a nil after the last PV name - const int nrx = recvfromx(sock.sock, (char*)&buf[0], buf.size()-1, &src->sa, &alen, &ndrop); + recvfromx rx{sock.sock, (char*)&buf[0], buf.size()-1, &src, &dest}; + const int nrx = rx.call(); - if(nrx>=0 && ndrop!=0u && prevndrop!=ndrop) { - log_debug_printf(logio, "UDP collector socket buffer overflowed %u -> %u\n", unsigned(prevndrop), unsigned(ndrop)); - prevndrop = ndrop; + if(nrx>=0 && rx.ndrop!=0u && prevndrop!=rx.ndrop) { + log_debug_printf(logio, "UDP collector socket buffer overflowed %u -> %u\n", unsigned(prevndrop), unsigned(rx.ndrop)); + prevndrop = rx.ndrop; } if(nrx<0) { @@ -93,7 +93,8 @@ struct UDPCollector : public UDPManager::Search, return true; } - log_hex_printf(logio, Level::Debug, &buf[0], nrx, "UDP Rx %d from %s\n", nrx, src.tostring().c_str()); + log_hex_printf(logio, Level::Debug, &buf[0], nrx, "UDP Rx %d, %s -> %s\n", + nrx, src.tostring().c_str(), dest.tostring().c_str()); names.clear(); @@ -269,6 +270,7 @@ UDPCollector::UDPCollector(UDPManager::Pvt *manager, const SockAddr& bind_addr) epicsSocketEnableAddressUseForDatagramFanout(sock.sock); enable_SO_RXQ_OVFL(sock.sock); + enable_IP_PKTINFO(sock.sock); sock.bind(this->bind_addr); name = "UDP "+this->bind_addr.tostring(); diff --git a/src/util.cpp b/src/util.cpp index 9e25b1051..d2ca8bbbe 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -256,55 +256,6 @@ SigInt::~SigInt() #endif // !defined(__rtems__) && !defined(vxWorks) -void enable_SO_RXQ_OVFL(SOCKET sock) -{ -#ifdef SO_RXQ_OVFL - // Linux specific feature exposes OS dropped packet count - { - int val = 1; - if(setsockopt(sock, SOL_SOCKET, SO_RXQ_OVFL, (char*)&val, sizeof(val))) - log_warn_printf(log, "Unable to set SO_RXQ_OVFL: %d\n", SOCKERRNO); - } -#endif -} - -int recvfromx(SOCKET sock, void *buf, size_t buflen, sockaddr* peer, osiSocklen_t* peerlen, uint32_t *ndrop) -{ -#ifdef SO_RXQ_OVFL - alignas (alignof (cmsghdr)) char cbuf[CMSG_SPACE(4u)]; - iovec iov = {buf, buflen}; - msghdr msg = {}; - msg.msg_iov = &iov; - msg.msg_iovlen = 1u; - msg.msg_name = peer; - msg.msg_namelen = peerlen ? *peerlen : 0; - msg.msg_control = cbuf; - msg.msg_controllen = sizeof(cbuf); - - int ret = recvmsg(sock, &msg, 0); - - if(ret>=0) { - if(peerlen) - *peerlen = msg.msg_namelen; - - if(msg.msg_flags & MSG_CTRUNC) - log_debug_printf(log, "MSG_CTRUNC %zu, %zu\n", msg.msg_controllen, sizeof(cbuf)); - - if(ndrop) { - for(cmsghdr *hdr = CMSG_FIRSTHDR(&msg); hdr ; hdr = CMSG_NXTHDR(&msg, hdr)) { - if(hdr->cmsg_level==SOL_SOCKET && hdr->cmsg_type==SO_RXQ_OVFL && hdr->cmsg_len>=CMSG_LEN(4u)) { - memcpy(ndrop, CMSG_DATA(hdr), 4u); - } - } - } - } - - return ret; - -#else - return recvfrom(sock, (char*)buf, buflen, 0, peer, peerlen); -#endif -} SockAddr::SockAddr(int af) { diff --git a/src/utilpvt.h b/src/utilpvt.h index 51d93e653..bd0fd6f7d 100644 --- a/src/utilpvt.h +++ b/src/utilpvt.h @@ -6,7 +6,7 @@ #ifndef UTILPVT_H #define UTILPVT_H -#include +#include "osiSockExt.h" #ifdef _WIN32 # define WIN32_LEAN_AND_MEAN @@ -195,71 +195,6 @@ using aligned_union = std::aligned_union; } // namespace impl using namespace impl; -struct SockAttach { - SockAttach() { osiSockAttach(); } - ~SockAttach() { osiSockRelease(); } -}; - -// Linux specific SO_RXQ_OVFL exposes OS dropped packet counter -void enable_SO_RXQ_OVFL(SOCKET sock); -int recvfromx(SOCKET sock, void *buf, size_t buflen, sockaddr* peer, osiSocklen_t* peerlen, uint32_t *ndrop); - -//! representation of a network address -struct PVXS_API SockAddr { - union store_t { - sockaddr sa; - sockaddr_in in; -#ifdef AF_INET6 - sockaddr_in6 in6; -#endif - }; -private: - store_t store; -public: - - explicit SockAddr(int af = AF_UNSPEC); - explicit SockAddr(int af, const char *address, unsigned short port=0); - explicit SockAddr(const sockaddr *addr, ev_socklen_t len); - inline explicit SockAddr(int af, const std::string& address) :SockAddr(af, address.c_str()) {} - - size_t size() const; - - inline unsigned short family() const { return store.sa.sa_family; } - unsigned short port() const; - void setPort(unsigned short port); - SockAddr withPort(unsigned short port) { - SockAddr temp(*this); - temp.setPort(port); - return temp; - } - - void setAddress(const char *, unsigned short port=0); - - bool isAny() const; - bool isLO() const; - - store_t* operator->() { return &store; } - const store_t* operator->() const { return &store; } - - std::string tostring() const; - - static SockAddr any(int af, unsigned port=0); - static SockAddr loopback(int af, unsigned port=0); - - inline bool operator<(const SockAddr& o) const { - return evutil_sockaddr_cmp(&store.sa, &o.store.sa, true)<0; - } - inline bool operator==(const SockAddr& o) const { - return evutil_sockaddr_cmp(&store.sa, &o.store.sa, true)==0; - } - inline bool operator!=(const SockAddr& o) const { - return !(*this==o); - } -}; - -PVXS_API -std::ostream& operator<<(std::ostream& strm, const SockAddr& addr); - inline timeval totv(double t) { diff --git a/test/testsock.cpp b/test/testsock.cpp index 8dcbf6cff..853eef965 100644 --- a/test/testsock.cpp +++ b/test/testsock.cpp @@ -4,6 +4,8 @@ * in file LICENSE that is included with this distribution. */ +#include + #include #include @@ -15,9 +17,43 @@ #include #include +#ifdef _WIN32 +# include +# include + +static +bool is_wine() +{ + HMODULE nt = GetModuleHandle("ntdll.dll"); + return nt && GetProcAddress(nt, "wine_get_version"); +} +#endif + namespace { using namespace pvxs; +void test_ifacemap() +{ + testDiag("Enter %s", __func__); + + impl::IfaceMap ifs; + ifs.refresh(); + + testFalse(ifs.info.empty())<<" found "<sa, &slen); + ret = recvfromx{A.sock, (char*)rxbuf, sizeof(rxbuf), &src, &dest}.call(); + // only the destination address is captured, not the port + if(dest.family()==AF_INET) + dest.setPort(bind_addr.port()); testOk(ret==4 && rxbuf[0]==0x12 && rxbuf[1]==0x34 && rxbuf[2]==0x56 && rxbuf[3]==0x78, "Recv'd %d(%d) [%u, %u, %u, %u]", ret, EVUTIL_SOCKET_ERROR(), rxbuf[0], rxbuf[1], rxbuf[2], rxbuf[3]); testEq(src, send_addr); + testEq(dest, bind_addr); } void test_local_mcast() { testDiag("Enter %s", __func__); + IfaceMap ifinfo; + evsocket A(AF_INET, SOCK_DGRAM, 0), B(AF_INET, SOCK_DGRAM, 0); - SockAddr mcast_addr(AF_INET); - mcast_addr.setAddress("224.0.0.128"); + SockAddr mcast_addr(AF_INET, "224.0.0.128"); -#ifdef _WIN32 + // We could bind to mcast_addr on all targets except WIN32 SockAddr bind_addr(SockAddr::any(AF_INET)); -#else - SockAddr bind_addr(mcast_addr); -#endif + enable_IP_PKTINFO(A.sock); A.bind(bind_addr); mcast_addr.setPort(bind_addr.port()); @@ -88,14 +128,108 @@ void test_local_mcast() uint8_t rxbuf[8] = {}; SockAddr src; + SockAddr dest; testDiag("Call recvfrom()"); - socklen_t slen = src.size(); - ret = recvfrom(A.sock, (char*)rxbuf, sizeof(rxbuf), 0, &src->sa, &slen); + recvfromx rx{A.sock, (char*)rxbuf, sizeof(rxbuf), &src, &dest}; + ret = rx.call(); + if(dest.family()==AF_INET) + dest.setPort(mcast_addr.port()); + + testTrue(ret>=0 && rx.dstif>0 && ifinfo.has_address(rx.dstif, sender_addr)) + <<" received on index "<sa, lo.size()); + testEq(ret, int(msglen))<<" sendto("< "<