Skip to content

Commit

Permalink
Fix/tcp dispatcher (#1965)
Browse files Browse the repository at this point in the history
* TCPServerDispatcher::run() issue #1884; make integral members atomic and minimize locking

* Update TCPServerDispatcher.cpp

(cherry picked from commit a91f9a0)
  • Loading branch information
aleks-f authored and akuzm committed Feb 7, 2020
1 parent f0b9bb6 commit b8af168
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 60 deletions.
38 changes: 31 additions & 7 deletions Net/include/Poco/Net/TCPServerDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,38 @@ class Net_API TCPServerDispatcher: public Poco::Runnable
TCPServerDispatcher(const TCPServerDispatcher&);
TCPServerDispatcher& operator = (const TCPServerDispatcher&);

int _rc;
class ThreadCountWatcher
{
public:
ThreadCountWatcher(TCPServerDispatcher* pDisp) : _pDisp(pDisp)
{
}

~ThreadCountWatcher()
{
FastMutex::ScopedLock lock(_pDisp->_mutex);
if (_pDisp->_currentThreads > 1 && _pDisp->_queue.empty())
{
--_pDisp->_currentThreads;
}
}

private:
ThreadCountWatcher();
ThreadCountWatcher(const ThreadCountWatcher&);
ThreadCountWatcher& operator=(const ThreadCountWatcher&);

TCPServerDispatcher* _pDisp;
};

std::atomic<int> _rc;
TCPServerParams::Ptr _pParams;
int _currentThreads;
int _totalConnections;
int _currentConnections;
int _maxConcurrentConnections;
int _refusedConnections;
bool _stopped;
std::atomic<int> _currentThreads;
std::atomic<int> _totalConnections;
std::atomic<int> _currentConnections;
std::atomic<int> _maxConcurrentConnections;
std::atomic<int> _refusedConnections;
std::atomic<bool> _stopped;
Poco::NotificationQueue _queue;
TCPServerConnectionFactory::Ptr _pConnectionFactory;
Poco::ThreadPool& _threadPool;
Expand Down
74 changes: 21 additions & 53 deletions Net/src/TCPServerDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,13 @@ TCPServerDispatcher::~TCPServerDispatcher()

void TCPServerDispatcher::duplicate()
{
_mutex.lock();
++_rc;
_mutex.unlock();
}


void TCPServerDispatcher::release()
{
_mutex.lock();
int rc = --_rc;
_mutex.unlock();
if (rc == 0) delete this;
if (--_rc == 0) delete this;
}


Expand All @@ -103,44 +98,29 @@ void TCPServerDispatcher::run()

for (;;)
{
try {
AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
if (pNf)
{
ThreadCountWatcher tcw(this);
try
{
TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
if (pCNf)
AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
if (pNf)
{
#if __cplusplus < 201103L
std::auto_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
#else
std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
#endif
poco_check_ptr(pConnection.get());
beginConnection();
pConnection->start();
endConnection();
TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
if (pCNf)
{
std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
poco_check_ptr(pConnection.get());
beginConnection();
pConnection->start();
endConnection();
}
}
}
catch (Poco::Exception &exc) { ErrorHandler::handle(exc); }
catch (std::exception &exc) { ErrorHandler::handle(exc); }
catch (...) { ErrorHandler::handle(); }
}
catch (Poco::Exception &exc)
{
ErrorHandler::handle(exc);
}
catch (std::exception &exc)
{
ErrorHandler::handle(exc);
}
catch (...)
{
ErrorHandler::handle();
}

FastMutex::ScopedLock lock(_mutex);
if (_stopped || (_currentThreads > 1 && _queue.empty()))
{
--_currentThreads;
break;
}
if (_stopped || (_currentThreads > 1 && _queue.empty())) break;
}
}

Expand Down Expand Up @@ -189,8 +169,6 @@ void TCPServerDispatcher::stop()

int TCPServerDispatcher::currentThreads() const
{
FastMutex::ScopedLock lock(_mutex);

return _currentThreads;
}

Expand All @@ -204,24 +182,18 @@ int TCPServerDispatcher::maxThreads() const

int TCPServerDispatcher::totalConnections() const
{
FastMutex::ScopedLock lock(_mutex);

return _totalConnections;
}


int TCPServerDispatcher::currentConnections() const
{
FastMutex::ScopedLock lock(_mutex);

return _currentConnections;
}


int TCPServerDispatcher::maxConcurrentConnections() const
{
FastMutex::ScopedLock lock(_mutex);

return _maxConcurrentConnections;
}

Expand All @@ -234,27 +206,23 @@ int TCPServerDispatcher::queuedConnections() const

int TCPServerDispatcher::refusedConnections() const
{
FastMutex::ScopedLock lock(_mutex);

return _refusedConnections;
}


void TCPServerDispatcher::beginConnection()
{
FastMutex::ScopedLock lock(_mutex);

++_totalConnections;
++_currentConnections;
if (_currentConnections > _maxConcurrentConnections)
_maxConcurrentConnections = _currentConnections;
_maxConcurrentConnections.store(_currentConnections);
}


void TCPServerDispatcher::endConnection()
{
FastMutex::ScopedLock lock(_mutex);

--_currentConnections;
}

Expand Down

0 comments on commit b8af168

Please sign in to comment.