Skip to content

Commit

Permalink
fix(PollSet): #1459 #3628 #3655 #3661
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-f committed Jul 5, 2022
1 parent 840044f commit 8169648
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 183 deletions.
34 changes: 29 additions & 5 deletions Net/include/Poco/Net/PollSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class Net_API PollSet
public:
enum Mode
{
POLL_READ = 0x01,
POLL_WRITE = 0x02,
POLL_ERROR = 0x04
POLL_READ = Socket::SELECT_READ,
POLL_WRITE = Socket::SELECT_WRITE,
POLL_ERROR = Socket::SELECT_ERROR
};

using SocketModeMap = std::map<Poco::Net::Socket, int>;
Expand All @@ -56,12 +56,36 @@ class Net_API PollSet
/// Adds the given socket to the set, for polling with
/// the given mode, which can be an OR'd combination of
/// POLL_READ, POLL_WRITE and POLL_ERROR.
/// Subsequent socket additions to the PollSet are mode-cumulative,
/// so the following code:
///
/// StreamSocket ss;
/// PollSet ps;
/// ps.add(ss, PollSet::POLL_READ);
/// ps.add(ss, PollSet::POLL_WRITE);
///
/// shall result in the socket being monitored for read and write,
/// equivalent to this:
///
/// ps.update(ss, PollSet::POLL_READ | PollSet::POLL_WRITE);

void remove(const Poco::Net::Socket& socket);
/// Removes the given socket from the set.

void update(const Poco::Net::Socket& socket, int mode);
/// Updates the mode of the given socket.
/// Updates the mode of the given socket. If socket does
/// not exist in the PollSet, it is silently added. For
/// an existing socket, any prior mode is overwritten.
/// Updating socket is non-mode-cumulative.
///
/// The following code:
///
/// StreamSocket ss;
/// PollSet ps;
/// ps.update(ss, PollSet::POLL_READ);
/// ps.update(ss, PollSet::POLL_WRITE);
///
/// shall result in the socket being monitored for write only.

bool has(const Socket& socket) const;
/// Returns true if socket is registered for polling.
Expand All @@ -79,7 +103,7 @@ class Net_API PollSet
/// their state changed.

int count() const;
/// Returns the numberof sockets monitored.
/// Returns the number of sockets monitored.

void wakeUp();
/// Wakes up a waiting PollSet.
Expand Down
109 changes: 63 additions & 46 deletions Net/src/PollSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ class PollSetImpl
public:
using Mutex = Poco::SpinlockMutex;
using ScopedLock = Mutex::ScopedLock;
using SocketMode = std::pair<Socket, int>;
using SocketMap = std::map<void*, SocketMode>;

PollSetImpl(): _epollfd(epoll_create(1)),
_events(1024),
_eventfd(eventfd(0, 0))
{
int err = addImpl(_eventfd, PollSet::POLL_READ, 0);
int err = addFD(_eventfd, PollSet::POLL_READ, EPOLL_CTL_ADD);
if ((err) || (_epollfd < 0))
{
SocketImpl::error();
Expand All @@ -59,25 +61,24 @@ class PollSetImpl
~PollSetImpl()
{
::close(_eventfd.exchange(0));
ScopedLock l(_mutex);
if (_epollfd >= 0) ::close(_epollfd);
}

void add(const Socket& socket, int mode)
{
SocketImpl* sockImpl = socket.impl();

int err = addImpl(sockImpl->sockfd(), mode, sockImpl);

int newMode = getNewMode(socket.impl(), mode);
int err = addImpl(socket, newMode);
if (err)
{
if (errno == EEXIST) update(socket, mode);
if (errno == EEXIST) update(socket, newMode);
else SocketImpl::error();
}
}

ScopedLock lock(_mutex);
if (_socketMap.find(sockImpl) == _socketMap.end())
_socketMap[sockImpl] = socket;
void update(const Socket& socket, int mode)
{
int err = updateImpl(socket, mode);
if (err) SocketImpl::error();
}

void remove(const Socket& socket)
Expand All @@ -87,9 +88,9 @@ class PollSetImpl
ev.events = 0;
ev.data.ptr = 0;

ScopedLock lock(_mutex);
int err = epoll_ctl(_epollfd, EPOLL_CTL_DEL, fd, &ev);
if (err) SocketImpl::error();
ScopedLock lock(_mutex);
_socketMap.erase(socket.impl());
}

Expand All @@ -107,40 +108,19 @@ class PollSetImpl
return _socketMap.empty();
}

void update(const Socket& socket, int mode)
{
poco_socket_t fd = socket.impl()->sockfd();
struct epoll_event ev;
ev.events = 0;
if (mode & PollSet::POLL_READ)
ev.events |= EPOLLIN;
if (mode & PollSet::POLL_WRITE)
ev.events |= EPOLLOUT;
if (mode & PollSet::POLL_ERROR)
ev.events |= EPOLLERR;
ev.data.ptr = socket.impl();

int err = 0;
{
ScopedLock lock(_mutex);
err = epoll_ctl(_epollfd, EPOLL_CTL_MOD, fd, &ev);
}
if (err) SocketImpl::error();
}

void clear()
{
{
ScopedLock lock(_mutex);

::close(_epollfd);
close(_epollfd);
_socketMap.clear();
_epollfd = epoll_create(1);
if (_epollfd < 0) SocketImpl::error();
}
::close(_eventfd.exchange(0));
close(_eventfd.exchange(0));
_eventfd = eventfd(0, 0);
addImpl(_eventfd, PollSet::POLL_READ, 0);
addFD(_eventfd, PollSet::POLL_READ, EPOLL_CTL_ADD);
}

PollSet::SocketModeMap poll(const Poco::Timespan& timeout)
Expand All @@ -154,6 +134,11 @@ class PollSetImpl
Poco::Timestamp start;
rc = epoll_wait(_epollfd, &_events[0], _events.size(), remainingTime.totalMilliseconds());
if (rc == 0) return result;

// if we are hitting the events limit, resize it; even without resizing, the subseqent
// calls would round-robin through the remaining ready sockets, but it's better to give
// the call enough room once we start hitting the boundary
if (rc >= _events.size()) _events.resize(_events.size()*2);
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
{
Poco::Timestamp end;
Expand All @@ -171,15 +156,15 @@ class PollSetImpl
{
if (_events[i].data.ptr) // skip eventfd
{
std::map<void *, Socket>::iterator it = _socketMap.find(_events[i].data.ptr);
SocketMap::iterator it = _socketMap.find(_events[i].data.ptr);
if (it != _socketMap.end())
{
if (_events[i].events & EPOLLIN)
result[it->second] |= PollSet::POLL_READ;
result[it->second.first] |= PollSet::POLL_READ;
if (_events[i].events & EPOLLOUT)
result[it->second] |= PollSet::POLL_WRITE;
result[it->second.first] |= PollSet::POLL_WRITE;
if (_events[i].events & EPOLLERR)
result[it->second] |= PollSet::POLL_ERROR;
result[it->second.first] |= PollSet::POLL_ERROR;
}
}
}
Expand All @@ -193,7 +178,7 @@ class PollSetImpl
// This is guaranteed to write into a valid fd,
// or 0 (meaning PollSet is being destroyed).
// Errors are ignored.
::write(_eventfd, &val, sizeof(val));
write(_eventfd, &val, sizeof(val));
}

int count() const
Expand All @@ -203,9 +188,42 @@ class PollSetImpl
}

private:
int addImpl(int fd, int mode, void* ptr)
int getNewMode(SocketImpl* sockImpl, int mode)
{
struct epoll_event ev;
ScopedLock lock(_mutex);
auto it = _socketMap.find(sockImpl);
if (it != _socketMap.end())
mode |= it->second.second;
return mode;
}

void socketMapUpdate(const Socket& socket, int mode)
{
SocketImpl* sockImpl = socket.impl();
ScopedLock lock(_mutex);
_socketMap[sockImpl] = {socket, mode};
}

int updateImpl(const Socket& socket, int mode)
{
SocketImpl* sockImpl = socket.impl();
int ret = addFD(sockImpl->sockfd(), mode, EPOLL_CTL_MOD, sockImpl);
if (ret == 0) socketMapUpdate(socket, mode);
return ret;
}

int addImpl(const Socket& socket, int mode)
{
SocketImpl* sockImpl = socket.impl();
int newMode = getNewMode(sockImpl, mode);
int ret = addFD(sockImpl->sockfd(), newMode, EPOLL_CTL_ADD, sockImpl);
if (ret == 0) socketMapUpdate(socket, newMode);
return ret;
}

int addFD(int fd, int mode, int op, void* ptr = 0)
{
struct epoll_event ev{};
ev.events = 0;
if (mode & PollSet::POLL_READ)
ev.events |= EPOLLIN;
Expand All @@ -214,13 +232,12 @@ class PollSetImpl
if (mode & PollSet::POLL_ERROR)
ev.events |= EPOLLERR;
ev.data.ptr = ptr;
ScopedLock lock(_mutex);
return epoll_ctl(_epollfd, EPOLL_CTL_ADD, fd, &ev);
return epoll_ctl(_epollfd, op, fd, &ev);
}

mutable Mutex _mutex;
int _epollfd;
std::map<void*, Socket> _socketMap;
std::atomic<int> _epollfd;
SocketMap _socketMap;
std::vector<struct epoll_event> _events;
std::atomic<int> _eventfd;
};
Expand Down
Loading

0 comments on commit 8169648

Please sign in to comment.