From da2a9626b6a4f87081a6b79badc0ac6628f7ab80 Mon Sep 17 00:00:00 2001 From: Alex Fabijanic Date: Thu, 21 Jul 2022 08:25:05 +0200 Subject: [PATCH] fix(TaskManager): waits for all threads in the ThreadPool #3704 --- Foundation/include/Poco/TaskManager.h | 20 ++++++++---- Foundation/src/Task.cpp | 3 +- Foundation/src/TaskManager.cpp | 28 ++++++++++++----- Foundation/testsuite/src/TaskManagerTest.cpp | 32 +++++++++++++++----- 4 files changed, 60 insertions(+), 23 deletions(-) diff --git a/Foundation/include/Poco/TaskManager.h b/Foundation/include/Poco/TaskManager.h index 6b7cd20900..4499ceceff 100644 --- a/Foundation/include/Poco/TaskManager.h +++ b/Foundation/include/Poco/TaskManager.h @@ -50,13 +50,17 @@ class Foundation_API TaskManager using TaskPtr = AutoPtr; using TaskList = std::list; - TaskManager(); - /// Creates the TaskManager, using the - /// default ThreadPool. + TaskManager(const std::string& name = "", + int minCapacity = 2, + int maxCapacity = 16, + int idleTime = 60, + int stackSize = POCO_THREAD_STACK_SIZE); + /// Creates the TaskManager. TaskManager(ThreadPool& pool); /// Creates the TaskManager, using the - /// given ThreadPool. + /// given ThreadPool (should be used + /// by this TaskManager exclusively). ~TaskManager(); /// Destroys the TaskManager. @@ -110,11 +114,15 @@ class Foundation_API TaskManager void taskFailed(Task* pTask, const Exception& exc); private: + using MutexT = FastMutex; + using ScopedLockT = MutexT::ScopedLock; + ThreadPool& _threadPool; + bool _ownPool; TaskList _taskList; Timestamp _lastProgressNotification; NotificationCenter _nc; - mutable FastMutex _mutex; + mutable MutexT _mutex; friend class Task; }; @@ -125,7 +133,7 @@ class Foundation_API TaskManager // inline int TaskManager::count() const { - FastMutex::ScopedLock lock(_mutex); + ScopedLockT lock(_mutex); return (int) _taskList.size(); } diff --git a/Foundation/src/Task.cpp b/Foundation/src/Task.cpp index 8367d571a3..4b1e1fa1b3 100644 --- a/Foundation/src/Task.cpp +++ b/Foundation/src/Task.cpp @@ -79,8 +79,7 @@ void Task::run() pOwner->taskFailed(this, SystemException("unknown exception")); } _state = TASK_FINISHED; - if (pOwner) - pOwner->taskFinished(this); + if (pOwner) pOwner->taskFinished(this); } diff --git a/Foundation/src/TaskManager.cpp b/Foundation/src/TaskManager.cpp index 615541ed95..cddf027b69 100644 --- a/Foundation/src/TaskManager.cpp +++ b/Foundation/src/TaskManager.cpp @@ -15,6 +15,7 @@ #include "Poco/TaskManager.h" #include "Poco/TaskNotification.h" #include "Poco/ThreadPool.h" +#include "Poco/Timespan.h" namespace Poco { @@ -23,20 +24,31 @@ namespace Poco { const int TaskManager::MIN_PROGRESS_NOTIFICATION_INTERVAL = 100000; // 100 milliseconds -TaskManager::TaskManager(): - _threadPool(ThreadPool::defaultPool()) +TaskManager::TaskManager(const std::string& name, + int minCapacity, + int maxCapacity, + int idleTime, + int stackSize): + _threadPool(*new ThreadPool(name, minCapacity, maxCapacity, idleTime, stackSize)), + _ownPool(true) { + // prevent skipping the first progress update + _lastProgressNotification -= Timespan(MIN_PROGRESS_NOTIFICATION_INTERVAL*2); } TaskManager::TaskManager(ThreadPool& pool): - _threadPool(pool) + _threadPool(pool), + _ownPool(false) { + // prevent skipping the first progress update + _lastProgressNotification -= Timespan(MIN_PROGRESS_NOTIFICATION_INTERVAL*2); } TaskManager::~TaskManager() { + if (_ownPool) delete &_threadPool; } @@ -46,7 +58,7 @@ void TaskManager::start(Task* pTask) pAutoTask->setOwner(this); pAutoTask->setState(Task::TASK_STARTING); - FastMutex::ScopedLock lock(_mutex); + ScopedLockT lock(_mutex); _taskList.push_back(pAutoTask); try { @@ -65,7 +77,7 @@ void TaskManager::start(Task* pTask) void TaskManager::cancelAll() { - FastMutex::ScopedLock lock(_mutex); + ScopedLockT lock(_mutex); for (auto& pTask: _taskList) { @@ -82,7 +94,7 @@ void TaskManager::joinAll() TaskManager::TaskList TaskManager::taskList() const { - FastMutex::ScopedLock lock(_mutex); + ScopedLockT lock(_mutex); return _taskList; } @@ -114,7 +126,7 @@ void TaskManager::taskStarted(Task* pTask) void TaskManager::taskProgress(Task* pTask, float progress) { - ScopedLockWithUnlock lock(_mutex); + ScopedLockWithUnlock lock(_mutex); if (_lastProgressNotification.isElapsed(MIN_PROGRESS_NOTIFICATION_INTERVAL)) { @@ -135,7 +147,7 @@ void TaskManager::taskFinished(Task* pTask) { _nc.postNotification(new TaskFinishedNotification(pTask)); - FastMutex::ScopedLock lock(_mutex); + ScopedLockT lock(_mutex); for (TaskList::iterator it = _taskList.begin(); it != _taskList.end(); ++it) { if (*it == pTask) diff --git a/Foundation/testsuite/src/TaskManagerTest.cpp b/Foundation/testsuite/src/TaskManagerTest.cpp index 4660259134..070b0ed407 100644 --- a/Foundation/testsuite/src/TaskManagerTest.cpp +++ b/Foundation/testsuite/src/TaskManagerTest.cpp @@ -169,11 +169,11 @@ namespace } private: - std::atomic _started; - std::atomic _cancelled; - std::atomic _finished; - Exception* _pException; - float _progress; + std::atomic _started; + std::atomic _cancelled; + std::atomic _finished; + std::atomic _pException; + std::atomic _progress; }; @@ -253,10 +253,11 @@ void TaskManagerTest::testFinish() tm.addObserver(Observer(to, &TaskObserver::taskProgress)); AutoPtr pTT = new TestTask; tm.start(pTT.duplicate()); + while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50); assertTrue (pTT->progress() == 0); Thread::sleep(200); pTT->cont(); - while (pTT->progress() != 0.5) Thread::sleep(50); + while (to.progress() == 0) Thread::sleep(50); assertTrue (to.progress() == 0.5); assertTrue (to.started()); assertTrue (pTT->state() == Task::TASK_RUNNING); @@ -274,6 +275,8 @@ void TaskManagerTest::testFinish() list = tm.taskList(); assertTrue (list.empty()); assertTrue (!to.error()); + tm.cancelAll(); + tm.joinAll(); } @@ -288,6 +291,7 @@ void TaskManagerTest::testCancel() tm.addObserver(Observer(to, &TaskObserver::taskProgress)); AutoPtr pTT = new TestTask; tm.start(pTT.duplicate()); + while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50); assertTrue (pTT->progress() == 0); Thread::sleep(200); pTT->cont(); @@ -299,15 +303,20 @@ void TaskManagerTest::testCancel() assertTrue (list.size() == 1); assertTrue (tm.count() == 1); tm.cancelAll(); + while (pTT->state() != Task::TASK_CANCELLING) Thread::sleep(50); + pTT->cont(); assertTrue (to.cancelled()); pTT->cont(); while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50); assertTrue (pTT->state() == Task::TASK_FINISHED); + while (!to.finished()) Thread::sleep(50); assertTrue (to.finished()); while (tm.count() == 1) Thread::sleep(50); list = tm.taskList(); assertTrue (list.empty()); assertTrue (!to.error()); + tm.cancelAll(); + tm.joinAll(); } @@ -322,6 +331,7 @@ void TaskManagerTest::testError() tm.addObserver(Observer(to, &TaskObserver::taskProgress)); AutoPtr pTT = new TestTask; tm.start(pTT.duplicate()); + while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50); assertTrue (pTT->progress() == 0); Thread::sleep(200); pTT->cont(); @@ -335,12 +345,17 @@ void TaskManagerTest::testError() pTT->fail(); pTT->cont(); while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50); + pTT->cont(); + while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50); assertTrue (pTT->state() == Task::TASK_FINISHED); + while (!to.finished()) Thread::sleep(50); assertTrue (to.finished()); assertTrue (to.error() != 0); while (tm.count() == 1) Thread::sleep(50); list = tm.taskList(); assertTrue (list.empty()); + tm.cancelAll(); + tm.joinAll(); } @@ -424,6 +439,7 @@ void TaskManagerTest::testCustom() tm.cancelAll(); while (tm.count() > 0) Thread::sleep(50); assertTrue (tm.count() == 0); + tm.joinAll(); } @@ -440,6 +456,7 @@ void TaskManagerTest::testMultiTasks() tm.cancelAll(); while (tm.count() > 0) Thread::sleep(100); assertTrue (tm.count() == 0); + tm.joinAll(); } @@ -472,7 +489,8 @@ void TaskManagerTest::testCustomThreadPool() assertTrue (tm.count() == tp.allocated()); - tp.joinAll(); + tm.cancelAll(); + tm.joinAll(); } void TaskManagerTest::setUp()