Skip to content

Commit

Permalink
fix(TaskManager): waits for all threads in the ThreadPool #3704
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-f committed Jul 21, 2022
1 parent 999c1c0 commit da2a962
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 23 deletions.
20 changes: 14 additions & 6 deletions Foundation/include/Poco/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ class Foundation_API TaskManager
using TaskPtr = AutoPtr<Task>;
using TaskList = std::list<TaskPtr>;

TaskManager();
/// Creates the TaskManager, using the
/// default ThreadPool.
TaskManager(const std::string& name = "",
int minCapacity = 2,
int maxCapacity = 16,
int idleTime = 60,
int stackSize = POCO_THREAD_STACK_SIZE);
/// Creates the TaskManager.

TaskManager(ThreadPool& pool);
/// Creates the TaskManager, using the
/// given ThreadPool.
/// given ThreadPool (should be used
/// by this TaskManager exclusively).

~TaskManager();
/// Destroys the TaskManager.
Expand Down Expand Up @@ -110,11 +114,15 @@ class Foundation_API TaskManager
void taskFailed(Task* pTask, const Exception& exc);

private:
using MutexT = FastMutex;
using ScopedLockT = MutexT::ScopedLock;

ThreadPool& _threadPool;
bool _ownPool;
TaskList _taskList;
Timestamp _lastProgressNotification;
NotificationCenter _nc;
mutable FastMutex _mutex;
mutable MutexT _mutex;

friend class Task;
};
Expand All @@ -125,7 +133,7 @@ class Foundation_API TaskManager
//
inline int TaskManager::count() const
{
FastMutex::ScopedLock lock(_mutex);
ScopedLockT lock(_mutex);

return (int) _taskList.size();
}
Expand Down
3 changes: 1 addition & 2 deletions Foundation/src/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ void Task::run()
pOwner->taskFailed(this, SystemException("unknown exception"));
}
_state = TASK_FINISHED;
if (pOwner)
pOwner->taskFinished(this);
if (pOwner) pOwner->taskFinished(this);
}


Expand Down
28 changes: 20 additions & 8 deletions Foundation/src/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "Poco/TaskManager.h"
#include "Poco/TaskNotification.h"
#include "Poco/ThreadPool.h"
#include "Poco/Timespan.h"


namespace Poco {
Expand All @@ -23,20 +24,31 @@ namespace Poco {
const int TaskManager::MIN_PROGRESS_NOTIFICATION_INTERVAL = 100000; // 100 milliseconds


TaskManager::TaskManager():
_threadPool(ThreadPool::defaultPool())
TaskManager::TaskManager(const std::string& name,
int minCapacity,
int maxCapacity,
int idleTime,
int stackSize):
_threadPool(*new ThreadPool(name, minCapacity, maxCapacity, idleTime, stackSize)),
_ownPool(true)
{
// prevent skipping the first progress update
_lastProgressNotification -= Timespan(MIN_PROGRESS_NOTIFICATION_INTERVAL*2);
}


TaskManager::TaskManager(ThreadPool& pool):
_threadPool(pool)
_threadPool(pool),
_ownPool(false)
{
// prevent skipping the first progress update
_lastProgressNotification -= Timespan(MIN_PROGRESS_NOTIFICATION_INTERVAL*2);
}


TaskManager::~TaskManager()
{
if (_ownPool) delete &_threadPool;
}


Expand All @@ -46,7 +58,7 @@ void TaskManager::start(Task* pTask)
pAutoTask->setOwner(this);
pAutoTask->setState(Task::TASK_STARTING);

FastMutex::ScopedLock lock(_mutex);
ScopedLockT lock(_mutex);
_taskList.push_back(pAutoTask);
try
{
Expand All @@ -65,7 +77,7 @@ void TaskManager::start(Task* pTask)

void TaskManager::cancelAll()
{
FastMutex::ScopedLock lock(_mutex);
ScopedLockT lock(_mutex);

for (auto& pTask: _taskList)
{
Expand All @@ -82,7 +94,7 @@ void TaskManager::joinAll()

TaskManager::TaskList TaskManager::taskList() const
{
FastMutex::ScopedLock lock(_mutex);
ScopedLockT lock(_mutex);

return _taskList;
}
Expand Down Expand Up @@ -114,7 +126,7 @@ void TaskManager::taskStarted(Task* pTask)

void TaskManager::taskProgress(Task* pTask, float progress)
{
ScopedLockWithUnlock<FastMutex> lock(_mutex);
ScopedLockWithUnlock<MutexT> lock(_mutex);

if (_lastProgressNotification.isElapsed(MIN_PROGRESS_NOTIFICATION_INTERVAL))
{
Expand All @@ -135,7 +147,7 @@ void TaskManager::taskFinished(Task* pTask)
{
_nc.postNotification(new TaskFinishedNotification(pTask));

FastMutex::ScopedLock lock(_mutex);
ScopedLockT lock(_mutex);
for (TaskList::iterator it = _taskList.begin(); it != _taskList.end(); ++it)
{
if (*it == pTask)
Expand Down
32 changes: 25 additions & 7 deletions Foundation/testsuite/src/TaskManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ namespace
}

private:
std::atomic<bool> _started;
std::atomic<bool> _cancelled;
std::atomic<bool> _finished;
Exception* _pException;
float _progress;
std::atomic<bool> _started;
std::atomic<bool> _cancelled;
std::atomic<bool> _finished;
std::atomic<Exception*> _pException;
std::atomic<float> _progress;
};


Expand Down Expand Up @@ -253,10 +253,11 @@ void TaskManagerTest::testFinish()
tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
AutoPtr<TestTask> pTT = new TestTask;
tm.start(pTT.duplicate());
while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50);
assertTrue (pTT->progress() == 0);
Thread::sleep(200);
pTT->cont();
while (pTT->progress() != 0.5) Thread::sleep(50);
while (to.progress() == 0) Thread::sleep(50);
assertTrue (to.progress() == 0.5);
assertTrue (to.started());
assertTrue (pTT->state() == Task::TASK_RUNNING);
Expand All @@ -274,6 +275,8 @@ void TaskManagerTest::testFinish()
list = tm.taskList();
assertTrue (list.empty());
assertTrue (!to.error());
tm.cancelAll();
tm.joinAll();
}


Expand All @@ -288,6 +291,7 @@ void TaskManagerTest::testCancel()
tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
AutoPtr<TestTask> pTT = new TestTask;
tm.start(pTT.duplicate());
while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50);
assertTrue (pTT->progress() == 0);
Thread::sleep(200);
pTT->cont();
Expand All @@ -299,15 +303,20 @@ void TaskManagerTest::testCancel()
assertTrue (list.size() == 1);
assertTrue (tm.count() == 1);
tm.cancelAll();
while (pTT->state() != Task::TASK_CANCELLING) Thread::sleep(50);
pTT->cont();
assertTrue (to.cancelled());
pTT->cont();
while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50);
assertTrue (pTT->state() == Task::TASK_FINISHED);
while (!to.finished()) Thread::sleep(50);
assertTrue (to.finished());
while (tm.count() == 1) Thread::sleep(50);
list = tm.taskList();
assertTrue (list.empty());
assertTrue (!to.error());
tm.cancelAll();
tm.joinAll();
}


Expand All @@ -322,6 +331,7 @@ void TaskManagerTest::testError()
tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
AutoPtr<TestTask> pTT = new TestTask;
tm.start(pTT.duplicate());
while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50);
assertTrue (pTT->progress() == 0);
Thread::sleep(200);
pTT->cont();
Expand All @@ -335,12 +345,17 @@ void TaskManagerTest::testError()
pTT->fail();
pTT->cont();
while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50);
pTT->cont();
while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50);
assertTrue (pTT->state() == Task::TASK_FINISHED);
while (!to.finished()) Thread::sleep(50);
assertTrue (to.finished());
assertTrue (to.error() != 0);
while (tm.count() == 1) Thread::sleep(50);
list = tm.taskList();
assertTrue (list.empty());
tm.cancelAll();
tm.joinAll();
}


Expand Down Expand Up @@ -424,6 +439,7 @@ void TaskManagerTest::testCustom()
tm.cancelAll();
while (tm.count() > 0) Thread::sleep(50);
assertTrue (tm.count() == 0);
tm.joinAll();
}


Expand All @@ -440,6 +456,7 @@ void TaskManagerTest::testMultiTasks()
tm.cancelAll();
while (tm.count() > 0) Thread::sleep(100);
assertTrue (tm.count() == 0);
tm.joinAll();
}


Expand Down Expand Up @@ -472,7 +489,8 @@ void TaskManagerTest::testCustomThreadPool()

assertTrue (tm.count() == tp.allocated());

tp.joinAll();
tm.cancelAll();
tm.joinAll();
}

void TaskManagerTest::setUp()
Expand Down

0 comments on commit da2a962

Please sign in to comment.