Skip to content

Commit

Permalink
feat(SocketReactor): improvements #3713
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-f committed Jul 25, 2022
1 parent 578efad commit 7a5a61c
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 76 deletions.
109 changes: 82 additions & 27 deletions Net/include/Poco/Net/SocketReactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "Poco/Timespan.h"
#include "Poco/Observer.h"
#include "Poco/AutoPtr.h"
#include "Poco/Event.h"
#include <map>
#include <atomic>

Expand Down Expand Up @@ -71,54 +72,103 @@ class Net_API SocketReactor: public Poco::Runnable
/// as argument.
///
/// Once started, the SocketReactor waits for events
/// on the registered sockets, using Socket::select().
/// on the registered sockets, using PollSet:poll().
/// If an event is detected, the corresponding event handler
/// is invoked. There are five event types (and corresponding
/// notification classes) defined: ReadableNotification, WritableNotification,
/// ErrorNotification, TimeoutNotification, IdleNotification and
/// ShutdownNotification.
/// ErrorNotification, TimeoutNotification and ShutdownNotification.
///
/// The ReadableNotification will be dispatched if a socket becomes
/// readable. The WritableNotification will be dispatched if a socket
/// becomes writable. The ErrorNotification will be dispatched if
/// there is an error condition on a socket.
///
/// If the timeout expires and no event has occurred, a
/// Timeout/sleep strategy operates as follows:
///
/// If the poll timeout expires and no event has occurred, a
/// TimeoutNotification will be dispatched to all event handlers
/// registered for it. This is done in the onTimeout() method
/// which can be overridden by subclasses to perform custom
/// timeout processing.
///
/// If there are no sockets for the SocketReactor to pass to
/// Socket::select(), an IdleNotification will be dispatched to
/// all event handlers registered for it. This is done in the
/// onIdle() method which can be overridden by subclasses
/// to perform custom idle processing. Since onIdle() will be
/// called repeatedly in a loop, it is recommended to do a
/// short sleep or yield in the event handler.
/// By default, the SocketReactor is configured to start sleeping
/// when the poll timeout is zero and there are no socket events for
/// a certain amount of time; sleep duration is progressive, up to
/// the configured limit. This behavior can be disabled through
/// configuration parameters.
///
/// When there are no registered handlers, the SocketRactor sleeps
/// an incremental amount of milliseconds, up to the sleep limit.
/// Increment step value and sleep limit are configurable.
///
/// Finally, when the SocketReactor is about to shut down (as a result
/// of stop() being called), it dispatches a ShutdownNotification
/// to all event handlers. This is done in the onShutdown() method
/// which can be overridded by subclasses to perform custom
/// shutdown processing.
///
/// The SocketReactor is implemented so that it can
/// run in its own thread. It is also possible to run
/// multiple SocketReactors in parallel, as long as
/// they work on different sockets.
/// The SocketReactor is implemented so that it can run in its own thread.
/// Moreover, the thread affinity to a CPU core can optionally be set for the
/// thread on platforms where that functionality is supported and implemented.
/// It is also possible to run multiple SocketReactors in parallel, as long
/// as they work on different sockets.
///
/// It is safe to call addEventHandler() and removeEventHandler()
/// from another thread while the SocketReactor is running. Also,
/// it is safe to call addEventHandler() and removeEventHandler()
/// from event handlers.
/// It is safe to call addEventHandler() and removeEventHandler() from another
/// thread while the SocketReactor is running. Also, it is safe to call
/// addEventHandler() and removeEventHandler() from event handlers.
{
public:
struct Params
/// Reactor parameters.
/// Default values should work well for most scenarios.
///
/// Note: the default behavior on zero poll timeout is to start
/// incrementally sleeping after `idleThreshold` and no socket events.
/// This prevents high CPU usage during periods without network
/// activity. To disable it, set `throttle` to false.
{
Poco::Timespan pollTimeout = DEFAULT_TIMEOUT;
/// Timeout for PolllSet::poll()

long sleep = 0;
/// Amount of milliseconds to sleep, progressively incremented,
/// at `increment` step, up to the `sleepLimit`.

long sleepLimit = DEFAULT_SLEEP_LIMIT;
/// Max sleep duration in milliseconds
/// This is the ceiling value in milliseconds for the sleep algorithm,
/// which kicks in in two cases:
///
/// - when there are no subscribers and the reactor is just idle-spinning
/// - when there are subscribers, but there was no socket events signalled
/// for `sleepLimit` milliseconds and `throttle` is true

int increment = 1;
/// Increment value for the sleep backoff algorithm.

long idleThreshold = DEFAULT_SLEEP_LIMIT;
/// Indicates when to start sleeping (throttling) on zero poll timeout

bool throttle = true;
/// Indicates whether to start sleeping when poll timeout is zero and
/// there's no socket events for a period longer than `idleThreshold`
};

SocketReactor();
/// Creates the SocketReactor.

explicit SocketReactor(const Poco::Timespan& timeout);
/// Creates the SocketReactor, using the given timeout.
explicit SocketReactor(const Poco::Timespan& pollTimeout, int threadAffinity = -1);
/// Creates the SocketReactor, using the given poll timeout.
///
/// The threadAffinity argument, when non-negative, indicates on which CPU core should
/// the run() method run. Nonexisting core or situation when this feature is not implemented
/// are silently ignored and this argument has no effect in such scenarios.

SocketReactor(const Params& params, int threadAffinity = -1);
/// Creates the SocketReactor, using the given parameters.
/// The threadAffinity argument, when non-negative, indicates on which CPU core should
/// the run() method run. Nonexisting core or situation when this feature is not implemented
/// are silently ignored and this argument has no effect in such scenarios.

virtual ~SocketReactor();
/// Destroys the SocketReactor.
Expand All @@ -140,13 +190,12 @@ class Net_API SocketReactor: public Poco::Runnable
void setTimeout(const Poco::Timespan& timeout);
/// Sets the timeout.
///
/// If no other event occurs for the given timeout
/// If no socket event occurs for the given timeout
/// interval, a timeout event is sent to all event listeners.
///
/// The default timeout is 250 milliseconds;
///
/// The timeout is passed to the Socket::select()
/// method.
/// The timeout is passed to the PollSet::poll() method.

const Poco::Timespan& getTimeout() const;
/// Returns the timeout.
Expand Down Expand Up @@ -211,13 +260,19 @@ class Net_API SocketReactor: public Poco::Runnable
void dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification);
NotifierPtr getNotifier(const Socket& socket, bool makeNew = false);

void sleep();

enum
{
DEFAULT_TIMEOUT = 250000
DEFAULT_TIMEOUT = 250000,
/// Default timeout for PollSet::poll()
DEFAULT_SLEEP_LIMIT = 250
/// Default limit for event-based sleeping
};

Params _params;
int _threadAffinity = -1;
std::atomic<bool> _stop;
Poco::Timespan _timeout;
EventHandlerMap _handlers;
PollSet _pollSet;
NotificationPtr _pReadableNotification;
Expand All @@ -226,7 +281,7 @@ class Net_API SocketReactor: public Poco::Runnable
NotificationPtr _pTimeoutNotification;
NotificationPtr _pShutdownNotification;
MutexType _mutex;
Poco::Thread* _pThread;
Poco::Event _event;

friend class SocketNotifier;
};
Expand Down
122 changes: 75 additions & 47 deletions Net/src/SocketReactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "Poco/Net/SocketNotifier.h"
#include "Poco/ErrorHandler.h"
#include "Poco/Thread.h"
#include "Poco/Stopwatch.h"
#include "Poco/Exception.h"


Expand All @@ -30,27 +31,38 @@ namespace Net {

SocketReactor::SocketReactor():
_stop(false),
_timeout(DEFAULT_TIMEOUT),
_pReadableNotification(new ReadableNotification(this)),
_pWritableNotification(new WritableNotification(this)),
_pErrorNotification(new ErrorNotification(this)),
_pTimeoutNotification(new TimeoutNotification(this)),
_pShutdownNotification(new ShutdownNotification(this)),
_pThread(0)
_pShutdownNotification(new ShutdownNotification(this))
{
}


SocketReactor::SocketReactor(const Poco::Timespan& timeout):
SocketReactor::SocketReactor(const Poco::Timespan& pollTimeout, int threadAffinity):
_threadAffinity(threadAffinity),
_stop(false),
_timeout(timeout),
_pReadableNotification(new ReadableNotification(this)),
_pWritableNotification(new WritableNotification(this)),
_pErrorNotification(new ErrorNotification(this)),
_pTimeoutNotification(new TimeoutNotification(this)),
_pShutdownNotification(new ShutdownNotification(this)),
_pThread(0)
_pShutdownNotification(new ShutdownNotification(this))
{
_params.pollTimeout = pollTimeout.totalMilliseconds();
}

SocketReactor::SocketReactor(const Params& params, int threadAffinity):
_params(params),
_threadAffinity(threadAffinity),
_stop(false),
_pReadableNotification(new ReadableNotification(this)),
_pWritableNotification(new WritableNotification(this)),
_pErrorNotification(new ErrorNotification(this)),
_pTimeoutNotification(new TimeoutNotification(this)),
_pShutdownNotification(new ShutdownNotification(this))
{

}


Expand All @@ -61,36 +73,47 @@ SocketReactor::~SocketReactor()

void SocketReactor::run()
{
_pThread = Thread::current();
if (_threadAffinity >= 0)
{
Poco::Thread* pThread = Thread::current();
if (pThread) pThread->setAffinity(_threadAffinity);
}
Poco::Stopwatch sw;
if (_params.throttle) sw.start();
PollSet::SocketModeMap sm;
while (!_stop)
{
try
{
if (!hasSocketHandlers())
if (hasSocketHandlers())
{
Thread::trySleep(static_cast<long>(_timeout.totalMilliseconds()));
}
else
{
bool readable = false;
PollSet::SocketModeMap sm = _pollSet.poll(_timeout);
if (sm.size() > 0)
sm = _pollSet.poll(_params.pollTimeout);
for (const auto& s : sm)
{
PollSet::SocketModeMap::iterator it = sm.begin();
PollSet::SocketModeMap::iterator end = sm.end();
for (; it != end; ++it)
if (s.second & PollSet::POLL_READ)
{
if (it->second & PollSet::POLL_READ)
{
dispatch(it->first, _pReadableNotification);
readable = true;
}
if (it->second & PollSet::POLL_WRITE) dispatch(it->first, _pWritableNotification);
if (it->second & PollSet::POLL_ERROR) dispatch(it->first, _pErrorNotification);
dispatch(s.first, _pReadableNotification);
}
if (s.second & PollSet::POLL_WRITE)
{
dispatch(s.first, _pWritableNotification);
}
if (s.second & PollSet::POLL_ERROR)
{
dispatch(s.first, _pErrorNotification);
}
}
if (0 == sm.size())
{
onTimeout();
if (_params.throttle && _params.pollTimeout == 0)
{
if ((sw.elapsed()/1000) > _params.sleepLimit) sleep();
}
}
if (!readable) onTimeout();
else if (_params.throttle) sw.restart();
}
else sleep();
}
catch (Exception& exc)
{
Expand All @@ -112,48 +135,53 @@ void SocketReactor::run()
}


bool SocketReactor::hasSocketHandlers()
void SocketReactor::sleep()
{
if (!_pollSet.empty())
{
ScopedLock lock(_mutex);
for (auto& p: _handlers)
{
if (p.second->accepts(_pReadableNotification) ||
p.second->accepts(_pWritableNotification) ||
p.second->accepts(_pErrorNotification)) return true;
}
}

return false;
if (_params.sleep < _params.sleepLimit) ++_params.sleep;
_event.tryWait(_params.sleep);
}


void SocketReactor::stop()
{
_stop = true;
wakeUp();
}


void SocketReactor::wakeUp()
{
if (_pThread && _pThread != Thread::current())
{
_pThread->wakeUp();
_pollSet.wakeUp();
}
_pollSet.wakeUp();
_event.set();
}


void SocketReactor::setTimeout(const Poco::Timespan& timeout)
{
_timeout = timeout;
_params.pollTimeout = timeout;
}


const Poco::Timespan& SocketReactor::getTimeout() const
{
return _timeout;
return _params.pollTimeout;
}


bool SocketReactor::hasSocketHandlers()
{
if (!_pollSet.empty())
{
ScopedLock lock(_mutex);
for (auto& p: _handlers)
{
if (p.second->accepts(_pReadableNotification) ||
p.second->accepts(_pWritableNotification) ||
p.second->accepts(_pErrorNotification)) return true;
}
}

return false;
}


Expand Down
Loading

0 comments on commit 7a5a61c

Please sign in to comment.