Skip to content

Commit

Permalink
Merge 70c89b9 into 98edf61
Browse files Browse the repository at this point in the history
  • Loading branch information
mdavidsaver committed Aug 6, 2021
2 parents 98edf61 + 70c89b9 commit 475a345
Show file tree
Hide file tree
Showing 12 changed files with 717 additions and 142 deletions.
5 changes: 5 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,11 @@ def define_DSOS(self):

src_pvxs = [os.path.join('src', src) for src in src_pvxs]

if OS_CLASS=='WIN32':
src_pvxs += ['src/os/WIN32/osdSockExt.cpp']
else:
src_pvxs += ['src/os/default/osdSockExt.cpp']

event_libs = []
if OS_CLASS=='WIN32':
event_libs = ['ws2_32','shell32','advapi32','bcrypt','iphlpapi']
Expand Down
2 changes: 2 additions & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ LIB_SRCS += nt.cpp
LIB_SRCS += evhelper.cpp
LIB_SRCS += udp_collector.cpp

LIB_SRCS += osdSockExt.cpp

LIB_SRCS += config.cpp
LIB_SRCS += conn.cpp

Expand Down
11 changes: 5 additions & 6 deletions src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -750,14 +750,13 @@ bool ContextImpl::onSearch()
{
searchMsg.resize(0x10000);
SockAddr src;
uint32_t ndrop = 0u;

osiSocklen_t alen = src.size();
const int nrx = recvfromx(searchTx.sock, (char*)&searchMsg[0], searchMsg.size()-1, &src->sa, &alen, &ndrop);
recvfromx rx{searchTx.sock, (char*)&searchMsg[0], searchMsg.size()-1, &src};
const int nrx = rx.call();

if(nrx>=0 && ndrop!=0 && prevndrop!=ndrop) {
log_debug_printf(io, "UDP search reply buffer overflow %u -> %u\n", unsigned(prevndrop), unsigned(ndrop));
prevndrop = ndrop;
if(nrx>=0 && rx.ndrop!=0 && prevndrop!=rx.ndrop) {
log_debug_printf(io, "UDP search reply buffer overflow %u -> %u\n", unsigned(prevndrop), unsigned(rx.ndrop));
prevndrop = rx.ndrop;
}

if(nrx<0) {
Expand Down
62 changes: 59 additions & 3 deletions src/evhelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <event2/thread.h>

#include <errlog.h>
#include <osiSock.h>
#include <epicsEvent.h>
#include <epicsThread.h>
#include <epicsExit.h>
Expand All @@ -44,6 +43,7 @@ namespace pvxs {namespace impl {

DEFINE_LOGGER(logerr, "pvxs.loop");
DEFINE_LOGGER(logtimer, "pvxs.timer");
DEFINE_LOGGER(logiface, "pvxs.iface");

namespace mdetail {
VFunctor0::~VFunctor0() {}
Expand Down Expand Up @@ -367,8 +367,16 @@ bool evbase::assertInRunningLoop() const
evsocket::evsocket(evutil_socket_t sock)
:sock(sock)
{
if(sock==evutil_socket_t(-1))
throw std::bad_alloc();
if(sock==evutil_socket_t(-1)) {
int err = SOCKERRNO;
#ifdef _WIN32
if(err==WSANOTINITIALISED) {
throw std::runtime_error("WSANOTINITIALISED");
}
#endif
(void)err;
throw std::runtime_error("Unable to allocate socket");
}

evutil_make_socket_closeonexec(sock);

Expand All @@ -386,6 +394,19 @@ evsocket::evsocket(evutil_socket_t sock)
evsocket::evsocket(int af, int type, int proto)
:evsocket(socket(af, type | SOCK_CLOEXEC, proto))
{
#ifdef __linux__
# ifndef IP_MULTICAST_ALL
# define IP_MULTICAST_ALL 49
# endif
// Disable non-compliant legacy behavior of Linux IP stack
if(af==AF_INET && type==SOCK_DGRAM){
int val = 0;
if(setsockopt(sock, IPPROTO_IP, IP_MULTICAST_ALL, (char*)&val, sizeof(val))) {
log_warn_printf(logerr, "Unable to clear IP_MULTICAST_ALL (err=%d). This may cause problems on multi-homed hosts.\n",
evutil_socket_geterror(sock));
}
}
#endif
}

evsocket::evsocket(evsocket&& o) noexcept
Expand Down Expand Up @@ -511,6 +532,41 @@ std::vector<SockAddr> evsocket::broadcasts(const SockAddr* match)
return ret;
}

#if EPICS_VERSION_INT<VERSION_INT(7,0,3,1)
# define getMonotonic getCurrent
#endif

IfaceMap::IfaceMap()
{
refresh();
updated = epicsTime::getMonotonic();
}

bool IfaceMap::has_address(int64_t ifindex, const SockAddr &addr)
{
if(addr.isAny())
return true;

auto now(epicsTime::getMonotonic());
auto age = now-updated;

if(age > 60) {
refresh();
updated = now;
} else {
log_debug_printf(logiface, "using cache age %.2f sec\n", age);
}

auto ifit(info.find(ifindex));
if(ifit!=info.end()) {
const auto& iface = ifit->second;
auto adit(iface.find(addr));
return adit!=iface.end();
}
log_warn_printf(logiface, "Encountered unknown interface index %lld\n", (long long)ifindex);
return false;
}

void to_wire(Buffer& buf, const SockAddr& val)
{
if(!buf.ensure(16)) {
Expand Down
16 changes: 16 additions & 0 deletions src/evhelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <functional>
#include <memory>
#include <string>
#include <map>
#include <set>

#include <event2/event.h>
#include <event2/buffer.h>
Expand All @@ -20,6 +22,8 @@
#include <pvxs/version.h>
#include <utilpvt.h>

#include <epicsTime.h>

#include "pvaproto.h"

// hooks for std::unique_ptr
Expand Down Expand Up @@ -230,6 +234,18 @@ struct PVXS_API evsocket
std::vector<SockAddr> broadcasts(const SockAddr* match=nullptr);
};

struct PVXS_API IfaceMap {
IfaceMap();

// return true if ifindex is valid, and addr is one of the addresses currently assigned to it.
bool has_address(int64_t ifindex, const SockAddr& addr);

void refresh();

std::map<int64_t, std::set<SockAddr, SockAddrOnlyLess>> info;
epicsTime updated;
};

} // namespace impl


Expand Down
159 changes: 159 additions & 0 deletions src/os/WIN32/osdSockExt.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/

#include <winsock2.h>
#include <iphlpapi.h>

#include "osiSockExt.h"

#include <mswsock.h>

#include <vector>

#include <pvxs/log.h>
#include "evhelper.h"

#include <epicsThread.h>
#include <cantProceed.h>

namespace pvxs {

DEFINE_LOGGER(log, "pvxs.util");
DEFINE_LOGGER(logiface, "pvxs.iface");

static
LPFN_WSARECVMSG WSARecvMsg;

static
epicsThreadOnceId oseOnce = EPICS_THREAD_ONCE_INIT;

static
void oseDoOnce(void*)
{
evsocket dummy(AF_INET, SOCK_DGRAM, 0);
GUID guid = WSAID_WSARECVMSG;
DWORD nout;

if(WSAIoctl(dummy.sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guid, sizeof(guid),
&WSARecvMsg, sizeof(WSARecvMsg),
&nout, nullptr, nullptr))
{
cantProceed("Unable to get &WSARecvMsg: %d", WSAGetLastError());
}
if(!WSARecvMsg)
cantProceed("Unable to get &WSARecvMsg!!");
}

void osiSockAttachExt()
{
osiSockAttach();
epicsThreadOnce(&oseOnce, &oseDoOnce, nullptr);
}

void enable_SO_RXQ_OVFL(SOCKET sock) {}

void enable_IP_PKTINFO(SOCKET sock)
{
int val = 1;
if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val)))
log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO);
}

int recvfromx::call()
{
ndrop = 0u;
dstif = -1;

WSAMSG msg{};

WSABUF iov = {(ULONG)buflen, (char*)buf};
msg.lpBuffers = &iov;
msg.dwBufferCount = 1u;

msg.name = &(*src)->sa;
msg.namelen = src->size();

alignas (alignof (WSACMSGHDR)) char cbuf[WSA_CMSG_SPACE(sizeof(in_pktinfo))];
msg.Control = {sizeof(cbuf), cbuf};

DWORD nrx=0u;
if(!WSARecvMsg(sock, &msg, &nrx, nullptr, nullptr)) {
if(msg.dwFlags & MSG_CTRUNC)
log_debug_printf(log, "MSG_CTRUNC %zu, %zu\n", msg.Control.len, sizeof(cbuf));

for(WSACMSGHDR *hdr = WSA_CMSG_FIRSTHDR(&msg); hdr ; hdr = WSA_CMSG_NXTHDR(&msg, hdr)) {
if(hdr->cmsg_level==IPPROTO_IP && hdr->cmsg_type==IP_PKTINFO && hdr->cmsg_len>=WSA_CMSG_LEN(sizeof(in_pktinfo))) {
if(dst) {
(*dst)->in.sin_family = AF_INET;
memcpy(&(*dst)->in.sin_addr, WSA_CMSG_DATA(hdr) + offsetof(in_pktinfo, ipi_addr), sizeof(IN_ADDR));
}

decltype(in_pktinfo::ipi_ifindex) idx;
memcpy(&idx, WSA_CMSG_DATA(hdr) + offsetof(in_pktinfo, ipi_ifindex), sizeof(idx));
dstif = idx;
}
}

return nrx;

} else {
return -1;
}
}

namespace impl {

#ifndef GAA_FLAG_INCLUDE_ALL_INTERFACES
# define GAA_FLAG_INCLUDE_ALL_INTERFACES 0
#endif

void IfaceMap::refresh() {
std::vector<char> ifaces(1024u);
decltype (info) temp;

{
constexpr ULONG flags = GAA_FLAG_SKIP_ANYCAST|GAA_FLAG_SKIP_MULTICAST|GAA_FLAG_SKIP_DNS_SERVER|GAA_FLAG_INCLUDE_ALL_INTERFACES;

ULONG buflen = ifaces.size();
auto err = GetAdaptersAddresses(AF_INET, flags, 0, reinterpret_cast<IP_ADAPTER_ADDRESSES*>(ifaces.data()), &buflen);

if(err == ERROR_BUFFER_OVERFLOW) {
// buflen updated with necessary length, retry
ifaces.resize(buflen);

err = GetAdaptersAddresses(AF_INET, flags, 0, reinterpret_cast<IP_ADAPTER_ADDRESSES*>(ifaces.data()), &buflen);
}

if(err) {
log_warn_printf(logiface, "Unable to GetAdaptersAddresses() error=%lld\n", (unsigned long long)err);
return;
}
}

for(auto iface = reinterpret_cast<const IP_ADAPTER_ADDRESSES*>(ifaces.data()); iface ; iface = iface->Next) {
auto& info = temp[iface->IfIndex];

//TODO: any flags to check?

for(auto addr = iface->FirstUnicastAddress; addr; addr = addr->Next) {

if(addr->Address.lpSockaddr->sa_family!=AF_INET)
continue;

auto pair = info.emplace(addr->Address.lpSockaddr, sizeof(sockaddr_in));

log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %s\n",
(long long)iface->IfIndex, iface->AdapterName, pair.first->tostring().c_str());
}
}

info.swap(temp);
}

} // namespace impl

} // namespace pvxs
Loading

0 comments on commit 475a345

Please sign in to comment.