Skip to content
Permalink
Browse files
Fix more racy conditions
Switch from QtConcurrent::run to QThreadPool::start

QtConcurrent doesn't give us anyway to cancel queued but not
started tasks. QThreadPool does (and also allows us to specify
task priority)
  • Loading branch information
nyalldawson committed Dec 5, 2016
1 parent 01d6afa commit 5216220872576d50aa195c7f2262121ffc3c9fc4
Showing with 134 additions and 92 deletions.
  1. +1 −1 python/core/__init__.py
  2. +4 −4 python/core/qgstaskmanager.sip
  3. +38 −25 src/core/qgstaskmanager.cpp
  4. +8 −9 src/core/qgstaskmanager.h
  5. +0 −1 src/gui/qgstaskmanagerwidget.cpp
  6. +83 −52 tests/src/core/testqgstaskmanager.cpp
@@ -208,7 +208,7 @@ def __init__(self, description, function, on_finished, *args, **kwargs):
self.returned_values = None
self.exception = None

def run(self):
def _run(self):
try:
self.returned_values = self.function(self, *self.args, **self.kwargs)
except Exception as ex:
@@ -14,7 +14,7 @@
*
* \note Added in version 3.0
*/
class QgsTask : QObject
class QgsTask : QObject, QRunnable
{

%TypeHeaderCode
@@ -93,7 +93,7 @@ class QgsTask : QObject
* then this method should not be called directly, instead it is left to the
* task manager to start the task when appropriate.
*/
void start();
void run();

/**
* Notifies the task that it should terminate. Calling this is not guaranteed
@@ -212,7 +212,7 @@ class QgsTask : QObject
* @see completed()
* @see terminated()
*/
virtual TaskResult run() = 0;
virtual TaskResult _run() = 0;

/**
* If the task is managed by a QgsTaskManager, this will be called after the
@@ -295,7 +295,7 @@ class QgsTaskManager : QObject
/**
* Constructor for TaskDefinition. Ownership of the task is transferred to the definition.
*/
TaskDefinition( QgsTask* task, QgsTaskList dependencies = QgsTaskList() );
explicit TaskDefinition( QgsTask* task, QgsTaskList dependencies = QgsTaskList() );

//! Task
QgsTask* task;
@@ -34,25 +34,35 @@ QgsTask::QgsTask( const QString &name, const Flags& flags )
, mTotalProgress( 0.0 )
, mShouldTerminate( false )
, mStartCount( 0 )
{}
{
setAutoDelete( false );
}

QgsTask::~QgsTask()
{
Q_ASSERT_X( mStatus == Queued || mStatus == Terminated || mStatus == Complete, "delete", QString( "status was %1" ).arg( mStatus ).toLatin1() );

QThreadPool::globalInstance()->cancel( this );

Q_FOREACH ( const SubTask& subTask, mSubTasks )
{
delete subTask.task;
}
}

void QgsTask::start()
void QgsTask::run()
{
mStartCount++;
Q_ASSERT( mStartCount == 1 );

if ( mStatus != Queued )
return;

mStatus = Running;
mOverallStatus = Running;
emit statusChanged( Running );
emit begun();
TaskResult result = run();
TaskResult result = _run();
switch ( result )
{
case ResultSuccess:
@@ -74,6 +84,8 @@ void QgsTask::start()
void QgsTask::cancel()
{
mShouldTerminate = true;

QThreadPool::globalInstance()->cancel( this );
if ( mStatus == Queued || mStatus == OnHold )
{
// immediately terminate unstarted jobs
@@ -297,13 +309,14 @@ QgsTaskManager::~QgsTaskManager()
cancelAll();

//then clean them up, including waiting for them to terminate
mParentTaskMutex->lockForRead();
QSet< QgsTask* > parents = mParentTasks;
parents.detach();
mParentTaskMutex->unlock();
Q_FOREACH ( QgsTask* task, parents )
mTaskMutex->lockForRead();
QMap< long, TaskInfo > tasks = mTasks;
mTasks.detach();
mTaskMutex->unlock();
QMap< long, TaskInfo >::const_iterator it = tasks.constBegin();
for ( ; it != tasks.constEnd(); ++it )
{
cleanupAndDeleteTask( task );
cleanupAndDeleteTask( it.value().task );
}

delete mTaskMutex;
@@ -437,10 +450,7 @@ void QgsTaskManager::cancelAll()

Q_FOREACH ( QgsTask* task, parents )
{
if ( task->isActive() )
{
task->cancel();
}
task->cancel();
}
}

@@ -640,6 +650,10 @@ bool QgsTaskManager::cleanupAndDeleteTask( QgsTask *task )
return false;

long id = taskId( task );
if ( id < 0 )
return false;

task->disconnect( this );

mDependenciesMutex->lockForWrite();
if ( mTaskDependencies.contains( id ) )
@@ -668,20 +682,24 @@ bool QgsTaskManager::cleanupAndDeleteTask( QgsTask *task )
mLayerDependencies.remove( id );
mLayerDependenciesMutex->unlock();

if ( task->isActive() )
if ( task->status() != QgsTask::Complete || task->status() != QgsTask::Terminated )
{
task->cancel();
if ( isParent )
{
// delete task when it's terminated
connect( task, &QgsTask::taskCompleted, task, &QgsTask::deleteLater );
connect( task, &QgsTask::taskTerminated, task, &QgsTask::deleteLater );
}
task->cancel();
}
else if ( isParent )
else
{
//task already finished, kill it
task->deleteLater();
QThreadPool::globalInstance()->cancel( task );
if ( isParent )
{
//task already finished, kill it
task->deleteLater();
}
}

// at this stage (hopefully) dependent tasks have been cancelled or queued
@@ -700,20 +718,16 @@ bool QgsTaskManager::cleanupAndDeleteTask( QgsTask *task )

void QgsTaskManager::processQueue()
{
static QMutex processMutex;

processMutex.lock();
int prevActiveCount = countActiveTasks();
mActiveTaskMutex->lockForWrite();
mActiveTasks.clear();
mTaskMutex->lockForWrite();
for ( QMap< long, TaskInfo >::iterator it = mTasks.begin(); it != mTasks.end(); ++it )
{
QgsTask* task = it.value().task;
if ( !it.value().added && task && task->mStatus == QgsTask::Queued && dependenciesSatisified( it.key() ) )
if ( task && task->mStatus == QgsTask::Queued && dependenciesSatisified( it.key() ) && it.value().added.testAndSetRelaxed( 0, 1 ) )
{
it.value().added = true;
it.value().future = QtConcurrent::run( task, &QgsTask::start );
QThreadPool::globalInstance()->start( task );
}

if ( task && ( task->mStatus != QgsTask::Complete && task->mStatus != QgsTask::Terminated ) )
@@ -723,7 +737,6 @@ void QgsTaskManager::processQueue()
}
mTaskMutex->unlock();
mActiveTaskMutex->unlock();
processMutex.unlock();

mParentTaskMutex->lockForRead();
bool allFinished = mActiveTasks.isEmpty();
@@ -35,16 +35,16 @@ typedef QList< QgsTask* > QgsTaskList;
* or added to a QgsTaskManager for automatic management.
*
* Derived classes should implement the process they want to execute in the background
* within the run() method. This method will be called when the
* task commences (ie via calling start() ).
* within the _run() method. This method will be called when the
* task commences (ie via calling run() ).
*
* Long running tasks should periodically check the isCancelled() flag to detect if the task
* has been cancelled via some external event. If this flag is true then the task should
* clean up and terminate at the earliest possible convenience.
*
* \note Added in version 3.0
*/
class CORE_EXPORT QgsTask : public QObject
class CORE_EXPORT QgsTask : public QObject, public QRunnable
{
Q_OBJECT

@@ -123,7 +123,7 @@ class CORE_EXPORT QgsTask : public QObject
* then this method should not be called directly, instead it is left to the
* task manager to start the task when appropriate.
*/
void start();
void run() override;

/**
* Notifies the task that it should terminate. Calling this is not guaranteed
@@ -224,6 +224,7 @@ class CORE_EXPORT QgsTask : public QObject

void subTaskComplete();


protected:

/**
@@ -245,7 +246,7 @@ class CORE_EXPORT QgsTask : public QObject
* @see completed()
* @see terminated()
*/
virtual TaskResult run() = 0;
virtual TaskResult _run() = 0;

/**
* If the task is managed by a QgsTaskManager, this will be called after the
@@ -305,7 +306,6 @@ class CORE_EXPORT QgsTask : public QObject
//! Status of this task and all subtasks
TaskStatus mOverallStatus;


//! Progress of this (parent) task alone
double mProgress;
//! Overall progress of this task and all subtasks
@@ -367,7 +367,7 @@ class CORE_EXPORT QgsTaskManager : public QObject
/**
* Constructor for TaskDefinition.
*/
TaskDefinition( QgsTask* task, QgsTaskList dependencies = QgsTaskList() )
explicit TaskDefinition( QgsTask* task, QgsTaskList dependencies = QgsTaskList() )
: task( task )
, dependencies( dependencies )
{}
@@ -503,8 +503,7 @@ class CORE_EXPORT QgsTaskManager : public QObject
, added( false )
{}
QgsTask* task;
bool added;
QFuture< void > future;
QAtomicInt added;
};

mutable QReadWriteLock* mTaskMutex;
@@ -180,7 +180,6 @@ Qt::ItemFlags QgsTaskManagerModel::flags( const QModelIndex &index ) const
bool QgsTaskManagerModel::setData( const QModelIndex &index, const QVariant &value, int role )
{
Q_UNUSED( role );
return false;

if ( !index.isValid() )
return false;
Loading

0 comments on commit 5216220

Please sign in to comment.