Permalink
Browse files

CThreadPool: Add cancellation support

This adds CThreadPool::cancelJob() and cancelJobs() which can cancel a set of
jobs synchronously. These functions only return when the job was successfully
cancelled.

It tries to cancel the jobs as quickly as possible, skipping any callbacks on
CJob that were not yet called. A job that is already running can use
CJob::wasCancelled() to check if it should quit.

Signed-off-by: Uli Schlachter <psychon@znc.in>
  • Loading branch information...
1 parent 6118ad5 commit 1d67e87d905f730ffaf48764aced9d1ece8035f3 @psychon psychon committed Aug 6, 2014
Showing with 262 additions and 7 deletions.
  1. +38 −2 include/znc/Threads.h
  2. +113 −5 src/Threads.cpp
  3. +111 −0 test/ThreadTest.cpp
View
@@ -216,6 +216,17 @@ class CThread {
*/
class CJob {
public:
+ friend class CThreadPool;
+
+ enum EJobState {
+ READY,
+ RUNNING,
+ DONE,
+ CANCELLED
+ };
+
+ CJob() : m_eState(READY) {}
+
/// Destructor, always called from the main thread.
virtual ~CJob() {}
@@ -224,17 +235,26 @@ class CJob {
/// This function is called from the main thread after runThread()
/// finishes. It can be used to handle the results from runThread()
- /// without needing synchronization primitives.
+ /// without needing synchronization primitives.
virtual void runMain() = 0;
+ /// This can be used to check if the job was cancelled. For example,
+ /// runThread() can return early if this returns true.
+ bool wasCancelled() const;
+
private:
// Undefined copy constructor and assignment operator
CJob(const CJob&);
CJob& operator=(const CJob&);
+
+ // Synchronized via the thread pool's mutex! Do not access without that mutex!
+ EJobState m_eState;
};
class CThreadPool {
private:
+ friend class CJob;
+
CThreadPool();
~CThreadPool();
@@ -244,18 +264,31 @@ class CThreadPool {
/// Add a job to the thread pool and run it. The job will be deleted when done.
void addJob(CJob *job);
+ /// Cancel a job that was previously passed to addJob(). This *might*
+ /// mean that runThread() and/or runMain() will not be called on the job.
+ /// This function BLOCKS until the job finishes!
+ void cancelJob(CJob *job);
+
+ /// Cancel some jobs that were previously passed to addJob(). This *might*
+ /// mean that runThread() and/or runMain() will not be called on some of
+ /// the jobs. This function BLOCKS until all jobs finish!
+ void cancelJobs(const std::set<CJob *> &jobs);
+
int getReadFD() const {
return m_iJobPipe[0];
}
void handlePipeReadable() const;
private:
- void jobDone(const CJob* pJob) const;
+ void jobDone(CJob* pJob);
// Check if the calling thread is still needed, must be called with m_mutex held
bool threadNeeded() const;
+ CJob *getJobFromPipe() const;
+ void finishJob(CJob *) const;
+
void threadFunc();
static void *threadPoolFunc(void *arg) {
CThreadPool &pool = *reinterpret_cast<CThreadPool *>(arg);
@@ -269,6 +302,9 @@ class CThreadPool {
// condition variable for waiting idle threads
CConditionVariable m_cond;
+ // condition variable for reporting finished cancellation
+ CConditionVariable m_cancellationCond;
+
// when this is true, all threads should exit
bool m_done;
View
@@ -15,10 +15,12 @@
*/
#include <znc/Threads.h>
-#include <znc/ZNCDebug.h>
#ifdef HAVE_PTHREAD
+#include <znc/ZNCDebug.h>
+#include <algorithm>
+
/* Just an arbitrary limit for the number of idle threads */
static const size_t MAX_IDLE_THREADS = 3;
@@ -39,7 +41,18 @@ CThreadPool::CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) {
}
}
-void CThreadPool::jobDone(const CJob* job) const {
+void CThreadPool::jobDone(CJob* job) {
+ // This must be called with the mutex locked!
+
+ enum CJob::EJobState oldState = job->m_eState;
+ job->m_eState = CJob::DONE;
+
+ if (oldState == CJob::CANCELLED) {
+ // Signal the main thread that cancellation is done
+ m_cancellationCond.signal();
+ return;
+ }
+
// This write() must succeed because POSIX guarantees that writes of
// less than PIPE_BUF are atomic (and PIPE_BUF is at least 512).
// (Yes, this really wants to write a pointer(!) to the pipe.
@@ -51,15 +64,23 @@ void CThreadPool::jobDone(const CJob* job) const {
}
void CThreadPool::handlePipeReadable() const {
+ finishJob(getJobFromPipe());
+}
+
+CJob *CThreadPool::getJobFromPipe() const {
CJob* a = NULL;
ssize_t need = sizeof(a);
ssize_t r = read(m_iJobPipe[0], &a, need);
if (r != need) {
DEBUG("Something bad happened during read() from a pipe for thread pool: " << strerror(errno));
exit(1);
}
- a->runMain();
- delete a;
+ return a;
+}
+
+void CThreadPool::finishJob(CJob *job) const {
+ job->runMain();
+ delete job;
}
CThreadPool::~CThreadPool() {
@@ -101,12 +122,13 @@ void CThreadPool::threadFunc() {
// Now do the actual job
m_num_idle--;
+ job->m_eState = CJob::RUNNING;
guard.unlock();
job->runThread();
- jobDone(job);
guard.lock();
+ jobDone(job);
m_num_idle++;
}
assert(m_num_threads > 0 && m_num_idle > 0);
@@ -133,4 +155,90 @@ void CThreadPool::addJob(CJob *job) {
CThread::startThread(threadPoolFunc, this);
}
+void CThreadPool::cancelJob(CJob *job) {
+ std::set<CJob *> jobs;
+ jobs.insert(job);
+ cancelJobs(jobs);
+}
+
+void CThreadPool::cancelJobs(const std::set<CJob *> &jobs) {
+ CMutexLocker guard(m_mutex);
+ std::set<CJob *> wait, finished, deleteLater;
+ std::set<CJob *>::const_iterator it;
+
+ // Start cancelling all jobs
+ for (it = jobs.begin(); it != jobs.end(); ++it) {
+ switch ((*it)->m_eState) {
+ case CJob::READY: {
+ (*it)->m_eState = CJob::CANCELLED;
+
+ // Job wasn't started yet, must be in the queue
+ std::list<CJob *>::iterator it2 = std::find(m_jobs.begin(), m_jobs.end(), *it);
+ assert(it2 != m_jobs.end());
+ m_jobs.erase(it2);
+ deleteLater.insert(*it);
+ continue;
+ }
+
+ case CJob::RUNNING:
+ (*it)->m_eState = CJob::CANCELLED;
+ wait.insert(*it);
+ continue;
+
+ case CJob::DONE:
+ finished.insert(*it);
+ continue;
+
+ case CJob::CANCELLED:
+ default:
+ assert(0);
+ }
+ }
+
+ // Now wait for cancellation to be done
+
+ // Collect jobs that really were cancelled. Finished cancellation is
+ // signaled by changing their state to DONE.
+ while (!wait.empty()) {
+ it = wait.begin();
+ while (it != wait.end()) {
+ if ((*it)->m_eState != CJob::CANCELLED) {
+ assert((*it)->m_eState == CJob::DONE);
+ // Re-set state for the destructor
+ (*it)->m_eState = CJob::CANCELLED;;
+ deleteLater.insert(*it);
+ wait.erase(it++);
+ } else
+ it++;
+ }
+
+ if (wait.empty())
+ break;
+
+ // Then wait for more to be done
+ m_cancellationCond.wait(m_mutex);
+ }
+
+ // We must call destructors with m_mutex unlocked so that they can call wasCancelled()
+ guard.unlock();
+
+ // Handle finished jobs. They must already be in the pipe.
+ while (!finished.empty()) {
+ CJob *job = getJobFromPipe();
+ finishJob(job);
+ finished.erase(job);
+ }
+
+ // Delete things that still need to be deleted
+ while (!deleteLater.empty()) {
+ delete *deleteLater.begin();
+ deleteLater.erase(deleteLater.begin());
+ }
+}
+
+bool CJob::wasCancelled() const {
+ CMutexLocker guard(CThreadPool::Get().m_mutex);
+ return m_eState == CANCELLED;
+}
+
#endif // HAVE_PTHREAD
View
@@ -27,6 +27,7 @@ class CWaitingJob : public CJob {
~CWaitingJob() {
EXPECT_TRUE(m_bThreadReady);
EXPECT_TRUE(m_bThreadDone);
+ EXPECT_FALSE(wasCancelled());
m_bDestroyed = true;
}
@@ -72,3 +73,113 @@ TEST(Thread, RunJob) {
while (!destroyed)
CThreadPool::Get().handlePipeReadable();
}
+
+class CCancelJob : public CJob {
+public:
+ CCancelJob(bool& destroyed)
+ : m_bDestroyed(destroyed), m_Mutex(), m_CVThreadReady(), m_bThreadReady(false) {
+ }
+
+ ~CCancelJob() {
+ EXPECT_TRUE(wasCancelled());
+ m_bDestroyed = true;
+ }
+
+ void wait() {
+ CMutexLocker locker(m_Mutex);
+ // Wait for the thread to run
+ while (!m_bThreadReady)
+ m_CVThreadReady.wait(m_Mutex);
+ }
+
+ virtual void runThread() {
+ m_Mutex.lock();
+ // We are running, tell the main thread
+ m_bThreadReady = true;
+ m_CVThreadReady.broadcast();
+ // Have to unlock here so that wait() can get the mutex
+ m_Mutex.unlock();
+
+ while (!wasCancelled()) {
+ // We can't do much besides busy-looping here. If the
+ // job really gets cancelled while it is already
+ // running, the main thread is stuck in cancelJob(), so
+ // it cannot signal us in any way. And signaling us
+ // before calling cancelJob() effictively is the same
+ // thing as busy looping anyway. So busy looping it is.
+ // (Yes, CJob shouldn't be used for anything that
+ // requires synchronisation between threads!)
+ }
+ }
+
+ virtual void runMain() { }
+
+private:
+ bool& m_bDestroyed;
+ CMutex m_Mutex;
+ CConditionVariable m_CVThreadReady;
+ bool m_bThreadReady;
+};
+
+TEST(Thread, CancelJobEarly) {
+ bool destroyed = false;
+ CCancelJob *pJob = new CCancelJob(destroyed);
+
+ CThreadPool::Get().addJob(pJob);
+ // Don't wait for the job to run. The idea here is that we are calling
+ // cancelJob() before pJob->runThread() runs, but this is a race.
+ CThreadPool::Get().cancelJob(pJob);
+
+ // cancelJob() should only return after successful cancellation
+ EXPECT_TRUE(destroyed);
+}
+
+TEST(Thread, CancelJobWhileRunning) {
+ bool destroyed = false;
+ CCancelJob *pJob = new CCancelJob(destroyed);
+
+ CThreadPool::Get().addJob(pJob);
+ // Wait for the job to run
+ pJob->wait();
+ CThreadPool::Get().cancelJob(pJob);
+
+ // cancelJob() should only return after successful cancellation
+ EXPECT_TRUE(destroyed);
+}
+
+class CEmptyJob : public CJob {
+public:
+ CEmptyJob(bool& destroyed)
+ : m_bDestroyed(destroyed) {
+ }
+
+ ~CEmptyJob() {
+ EXPECT_FALSE(wasCancelled());
+ m_bDestroyed = true;
+ }
+
+ virtual void runThread() { }
+ virtual void runMain() { }
+
+private:
+ bool& m_bDestroyed;
+};
+
+TEST(Thread, CancelJobWhenDone) {
+ bool destroyed = false;
+ CEmptyJob *pJob = new CEmptyJob(destroyed);
+
+ CThreadPool::Get().addJob(pJob);
+
+ // Wait for the job to finish
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(CThreadPool::Get().getReadFD(), &fds);
+ EXPECT_EQ(1, select(1 + CThreadPool::Get().getReadFD(), &fds, NULL, NULL, NULL));
+
+ // And only cancel it afterwards
+ CThreadPool::Get().cancelJob(pJob);
+
+ // cancelJob() should only return after successful cancellation
+ EXPECT_TRUE(destroyed);
+}

0 comments on commit 1d67e87

Please sign in to comment.