Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: znc/znc
base: 2db7307ac357
...
head fork: znc/znc
compare: 53c579b296eb
  • 2 commits
  • 6 files changed
  • 0 commit comments
  • 1 contributor
Commits on Mar 15, 2013
@psychon psychon Add a generic threads abstraction
This should make it easier to work with threads. It provides classes for mutexes
and condition variables. Additionally, there is a special CMutexGuard that
automatically unlocks the mutex on destruction and a CThreadPool class.

This thread pool is used to replace the thread pool in the sockets code.

Signed-off-by: Uli Schlachter <psychon@znc.in>
75f2e3f
@psychon psychon CJob: Add a way to do stuff on the main thread
This just moves the pipe from the socket code to the thread pool. However, now
all CJobs can use this and there is a single place for them to get deleted.

Signed-off-by: Uli Schlachter <psychon@znc.in>
53c579b
View
2  Makefile.in
@@ -34,7 +34,7 @@ SED := @SED@
LIB_SRCS := ZNCString.cpp Csocket.cpp znc.cpp IRCNetwork.cpp User.cpp IRCSock.cpp \
Client.cpp Chan.cpp Nick.cpp Server.cpp Modules.cpp MD5.cpp Buffer.cpp Utils.cpp \
FileUtils.cpp HTTPSock.cpp Template.cpp ClientCommand.cpp Socket.cpp SHA256.cpp \
- WebModules.cpp Listener.cpp Config.cpp ZNCDebug.cpp version.cpp
+ WebModules.cpp Listener.cpp Config.cpp ZNCDebug.cpp Threads.cpp version.cpp
LIB_SRCS := $(addprefix src/,$(LIB_SRCS))
BIN_SRCS := src/main.cpp
LIB_OBJS := $(patsubst %cpp,%o,$(LIB_SRCS))
View
1  configure.ac
@@ -220,6 +220,7 @@ DNS_TEXT=blocking
if test "x$TDNS" != "xno"; then
old_TDNS=$TDNS
AX_PTHREAD([
+ AC_DEFINE([HAVE_PTHREAD], [1], [Define if you have POSIX threads libraries and header files.])
AC_MSG_CHECKING([whether getaddrinfo() supports AI_ADDRCONFIG])
AC_COMPILE_IFELSE([
AC_LANG_PROGRAM([[
View
48 include/znc/Socket.h
@@ -11,6 +11,7 @@
#include <znc/zncconfig.h>
#include <znc/Csocket.h>
+#include <znc/Threads.h>
class CModule;
@@ -101,11 +102,9 @@ class CSockManager : public TSocketManager<CZNCSock> {
private:
void FinishConnect(const CString& sHostname, u_short iPort, const CString& sSockName, int iTimeout, bool bSSL, const CString& sBindHost, CZNCSock *pcSock);
-#ifdef HAVE_THREADED_DNS
- int m_iTDNSpipe[2];
-
class CTDNSMonitorFD;
friend class CTDNSMonitorFD;
+#ifdef HAVE_THREADED_DNS
struct TDNSTask {
CString sHostname;
u_short iPort;
@@ -120,41 +119,22 @@ class CSockManager : public TSocketManager<CZNCSock> {
addrinfo* aiTarget;
addrinfo* aiBind;
};
- struct TDNSArg {
- CString sHostname;
- TDNSTask* task;
- int fd;
- bool bBind;
-
- int iRes;
- addrinfo* aiResult;
- };
- struct TDNSStatus {
- /* mutex which protects this whole struct */
- pthread_mutex_t mutex;
- /* condition variable for idle threads */
- pthread_cond_t cond;
- /* When this is true, all threads should exit */
- bool done;
- /* Total number of running DNS threads */
- size_t num_threads;
- /* Number of DNS threads which don't have any work */
- size_t num_idle;
- /* List of pending DNS jobs */
- std::list<TDNSArg *> jobs;
+ class CDNSJob : public CJob {
+ public:
+ CString sHostname;
+ TDNSTask* task;
+ CSockManager* pManager;
+ bool bBind;
+
+ int iRes;
+ addrinfo* aiResult;
+
+ void runThread();
+ void runMain();
};
void StartTDNSThread(TDNSTask* task, bool bBind);
void SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* aiResult);
- void RetrieveTDNSResult();
static void* TDNSThread(void* argument);
- static void DoDNS(TDNSArg *arg);
-
- /** Must be called with threadStatus->mutex held.
- * @returns false when the calling DNS thread should exit.
- */
- static bool ThreadNeeded(struct TDNSStatus* status);
-
- TDNSStatus m_threadStatus;
#endif
protected:
};
View
241 include/znc/Threads.h
@@ -0,0 +1,241 @@
+/*
+ * Copyright (C) 2004-2012 See the AUTHORS file for details.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation.
+ */
+
+#ifndef _THREADS_H
+#define _THREADS_H
+
+#include <znc/zncconfig.h>
+
+#ifdef HAVE_PTHREAD
+
+#include <znc/Utils.h>
+
+#include <cerrno>
+#include <csignal>
+#include <cstdlib>
+#include <cstring>
+#include <list>
+#include <pthread.h>
+
+/**
+ * This class represents a non-recursive mutex. Only a single thread may own the
+ * mutex at any point in time.
+ */
+class CMutex {
+public:
+ friend class CConditionVariable;
+
+ CMutex() {
+ int i = pthread_mutex_init(&m_mutex, NULL);
+ if (i) {
+ CUtils::PrintError("Can't initialize mutex: " + CString(strerror(errno)));
+ exit(1);
+ }
+ }
+
+ ~CMutex() {
+ int i = pthread_mutex_destroy(&m_mutex);
+ if (i) {
+ CUtils::PrintError("Can't destroy mutex: " + CString(strerror(errno)));
+ exit(1);
+ }
+ }
+
+ void lock() {
+ int i = pthread_mutex_lock(&m_mutex);
+ if (i) {
+ CUtils::PrintError("Can't lock mutex: " + CString(strerror(errno)));
+ exit(1);
+ }
+ }
+
+ void unlock() {
+ int i = pthread_mutex_unlock(&m_mutex);
+ if (i) {
+ CUtils::PrintError("Can't unlock mutex: " + CString(strerror(errno)));
+ exit(1);
+ }
+ }
+
+private:
+ pthread_mutex_t m_mutex;
+};
+
+/**
+ * A mutex locker should always be used as an automatic variable. This
+ * class makes sure that the mutex is unlocked when this class is destructed.
+ * For example, this makes it easier to make code exception-safe.
+ */
+class CMutexLocker {
+public:
+ CMutexLocker(CMutex& mutex, bool initiallyLocked = true)
+ : m_mutex(mutex), m_locked(false) {
+ if (initiallyLocked)
+ lock();
+ }
+
+ ~CMutexLocker() {
+ if (m_locked)
+ unlock();
+ }
+
+ void lock() {
+ assert(!m_locked);
+ m_mutex.lock();
+ m_locked = true;
+ }
+
+ void unlock() {
+ assert(m_locked);
+ m_locked = false;
+ m_mutex.unlock();
+ }
+
+private:
+ CMutex &m_mutex;
+ bool m_locked;
+};
+
+/**
+ * A condition variable makes it possible for threads to wait until some
+ * condition is reached at which point the thread can wake up again.
+ */
+class CConditionVariable {
+public:
+ CConditionVariable() {
+ int i = pthread_cond_init(&m_cond, NULL);
+ if (i) {
+ CUtils::PrintError("Can't initialize condition variable: "
+ + CString(strerror(errno)));
+ exit(1);
+ }
+ }
+
+ ~CConditionVariable() {
+ int i = pthread_cond_destroy(&m_cond);
+ if (i) {
+ CUtils::PrintError("Can't destroy condition variable: "
+ + CString(strerror(errno)));
+ exit(1);
+ }
+ }
+
+ void wait(CMutex& mutex) {
+ int i = pthread_cond_wait(&m_cond, &mutex.m_mutex);
+ if (i) {
+ CUtils::PrintError("Can't wait on condition variable: "
+ + CString(strerror(errno)));
+ exit(1);
+ }
+ }
+
+ void signal() {
+ int i = pthread_cond_signal(&m_cond);
+ if (i) {
+ CUtils::PrintError("Can't signal condition variable: "
+ + CString(strerror(errno)));
+ exit(1);
+ }
+ }
+
+ void broadcast() {
+ int i = pthread_cond_broadcast(&m_cond);
+ if (i) {
+ CUtils::PrintError("Can't broadcast condition variable: "
+ + CString(strerror(errno)));
+ exit(1);
+ }
+ }
+
+private:
+ pthread_cond_t m_cond;
+};
+
+class CThread {
+public:
+ typedef void *threadRoutine(void *);
+ static void startThread(threadRoutine *func, void *arg) {
+ pthread_t thr;
+ sigset_t old_sigmask, sigmask;
+
+ /* Block all signals. The thread will inherit our signal mask
+ * and thus won't ever try to handle signals.
+ */
+ int i = sigfillset(&sigmask);
+ i |= pthread_sigmask(SIG_SETMASK, &sigmask, &old_sigmask);
+ i |= pthread_create(&thr, NULL, func, arg);
+ i |= pthread_sigmask(SIG_SETMASK, &old_sigmask, NULL);
+ i |= pthread_detach(thr);
+ if (i) {
+ CUtils::PrintError("Can't start new thread: "
+ + CString(strerror(errno)));
+ exit(1);
+ }
+ }
+};
+
+class CJob {
+public:
+ virtual ~CJob() {}
+ virtual void runThread() = 0;
+ virtual void runMain() = 0;
+};
+
+class CThreadPool {
+private:
+ CThreadPool();
+ ~CThreadPool();
+
+public:
+ static CThreadPool& Get();
+
+ void addJob(CJob *job);
+
+ int getReadFD() const {
+ return m_iJobPipe[0];
+ }
+
+ void handlePipeReadable() const;
+
+private:
+ void jobDone(const CJob* pJob) const;
+
+ // Check if the calling thread is still needed, must be called with m_mutex held
+ bool threadNeeded() const;
+
+ void threadFunc();
+ static void *threadPoolFunc(void *arg) {
+ CThreadPool &pool = *reinterpret_cast<CThreadPool *>(arg);
+ pool.threadFunc();
+ return NULL;
+ }
+
+ // mutex protecting all of these members
+ CMutex m_mutex;
+
+ // condition variable for waiting idle threads
+ CConditionVariable m_cond;
+
+ // when this is true, all threads should exit
+ bool m_done;
+
+ // total number of running threads
+ size_t m_num_threads;
+
+ // number of idle threads waiting on the condition variable
+ size_t m_num_idle;
+
+ // pipe for waking up the main thread
+ int m_iJobPipe[2];
+
+ // list of pending jobs
+ std::list<CJob *> m_jobs;
+};
+
+#endif // HAVE_PTHREAD
+#endif // !_THREADS_H
View
215 src/Socket.cpp
@@ -10,9 +10,6 @@
#include <znc/IRCNetwork.h>
#include <signal.h>
-/* We should need 2 DNS threads (host, bindhost) per IRC connection */
-static const size_t MAX_IDLE_THREADS = 2;
-
unsigned int CSockManager::GetAnonConnectionCount(const CString &sIP) const {
const_iterator it;
unsigned int ret = 0;
@@ -38,72 +35,24 @@ int CZNCSock::ConvertAddress(const struct sockaddr_storage * pAddr, socklen_t iA
return ret;
}
-#ifdef HAVE_THREADED_DNS
+#ifdef HAVE_PTHREAD
class CSockManager::CTDNSMonitorFD : public CSMonitorFD {
- CSockManager* m_Manager;
public:
- CTDNSMonitorFD(CSockManager* mgr) {
- m_Manager = mgr;
- Add(mgr->m_iTDNSpipe[0], ECT_Read);
+ CTDNSMonitorFD() {
+ Add(CThreadPool::Get().getReadFD(), ECT_Read);
}
virtual bool FDsThatTriggered(const std::map<int, short>& miiReadyFds) {
- if (miiReadyFds.find(m_Manager->m_iTDNSpipe[0])->second) {
- m_Manager->RetrieveTDNSResult();
+ if (miiReadyFds.find(CThreadPool::Get().getReadFD())->second) {
+ CThreadPool::Get().handlePipeReadable();
}
return true;
}
};
+#endif
-bool CSockManager::ThreadNeeded(struct TDNSStatus* threadStatus)
-{
- // We should keep a number of idle threads alive
- if (threadStatus->num_idle > MAX_IDLE_THREADS)
- return false;
- // If ZNC is shutting down, all threads should exit
- return !threadStatus->done;
-}
-
-void* CSockManager::TDNSThread(void* argument) {
- TDNSStatus *threadStatus = (TDNSStatus *) argument;
-
- pthread_mutex_lock(&threadStatus->mutex);
- threadStatus->num_threads++;
- threadStatus->num_idle++;
- while (true) {
- /* Wait for a DNS job for us to do. This is a while()-loop
- * because POSIX allows spurious wakeups from pthread_cond_wait.
- */
- while (threadStatus->jobs.empty()) {
- if (!ThreadNeeded(threadStatus))
- break;
- pthread_cond_wait(&threadStatus->cond, &threadStatus->mutex);
- }
-
- if (!ThreadNeeded(threadStatus))
- break;
-
- /* Figure out a DNS job to do */
- assert(threadStatus->num_idle > 0);
- TDNSArg *job = threadStatus->jobs.front();
- threadStatus->jobs.pop_front();
- threadStatus->num_idle--;
- pthread_mutex_unlock(&threadStatus->mutex);
-
- /* Now do the actual work */
- DoDNS(job);
-
- pthread_mutex_lock(&threadStatus->mutex);
- threadStatus->num_idle++;
- }
- assert(threadStatus->num_threads > 0 && threadStatus->num_idle > 0);
- threadStatus->num_threads--;
- threadStatus->num_idle--;
- pthread_mutex_unlock(&threadStatus->mutex);
- return NULL;
-}
-
-void CSockManager::DoDNS(TDNSArg *arg) {
+#ifdef HAVE_THREADED_DNS
+void CSockManager::CDNSJob::runThread() {
int iCount = 0;
while (true) {
addrinfo hints;
@@ -112,96 +61,42 @@ void CSockManager::DoDNS(TDNSArg *arg) {
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_ADDRCONFIG;
- arg->iRes = getaddrinfo(arg->sHostname.c_str(), NULL, &hints, &arg->aiResult);
- if (EAGAIN != arg->iRes) {
+ iRes = getaddrinfo(sHostname.c_str(), NULL, &hints, &aiResult);
+ if (EAGAIN != iRes) {
break;
}
iCount++;
if (iCount > 5) {
- arg->iRes = ETIMEDOUT;
+ iRes = ETIMEDOUT;
break;
}
sleep(5); // wait 5 seconds before next try
}
+}
- size_t need = sizeof(TDNSArg*);
- char* x = (char*)&arg;
- // This write() must succeed because POSIX guarantees that writes of
- // less than PIPE_BUF are atomic (and PIPE_BUF is at least 512).
- size_t w = write(arg->fd, x, need);
- if (w != need) {
- DEBUG("Something bad happened during write() to a pipe from TDNSThread, wrote " << w << " bytes: " << strerror(errno));
- exit(1);
+void CSockManager::CDNSJob::runMain() {
+ if (0 != this->iRes) {
+ DEBUG("Error in threaded DNS: " << gai_strerror(this->iRes));
+ if (this->aiResult) {
+ DEBUG("And aiResult is not NULL...");
+ }
+ this->aiResult = NULL; // just for case. Maybe to call freeaddrinfo()?
}
+ pManager->SetTDNSThreadFinished(this->task, this->bBind, this->aiResult);
}
void CSockManager::StartTDNSThread(TDNSTask* task, bool bBind) {
CString sHostname = bBind ? task->sBindhost : task->sHostname;
- TDNSArg* arg = new TDNSArg;
+ CDNSJob* arg = new CDNSJob;
arg->sHostname = sHostname;
arg->task = task;
- arg->fd = m_iTDNSpipe[1];
arg->bBind = bBind;
arg->iRes = 0;
arg->aiResult = NULL;
+ arg->pManager = this;
- pthread_mutex_lock(&m_threadStatus.mutex);
- m_threadStatus.jobs.push_back(arg);
- /* Do we need a new DNS thread? */
- if (m_threadStatus.num_idle > 0) {
- /* Nope, there is one waiting for a job */
- pthread_cond_signal(&m_threadStatus.cond);
- pthread_mutex_unlock(&m_threadStatus.mutex);
- return;
- }
- pthread_mutex_unlock(&m_threadStatus.mutex);
-
- pthread_attr_t attr;
- if (pthread_attr_init(&attr)) {
- CString sError = "Couldn't init pthread_attr for " + sHostname;
- DEBUG(sError);
- CZNC::Get().Broadcast(sError, /* bAdminOnly = */ true);
- SetTDNSThreadFinished(task, bBind, NULL);
- return;
- }
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-
- pthread_t thr;
- sigset_t old_sigmask;
- sigset_t sigmask;
- sigfillset(&sigmask);
- /* Block all signals. The thread will inherit our signal mask and thus
- * won't ever try to handle any signals.
- */
- if (pthread_sigmask(SIG_SETMASK, &sigmask, &old_sigmask)) {
- CString sError = "Couldn't block signals";
- DEBUG(sError);
- CZNC::Get().Broadcast(sError, /* bAdminOnly = */ true);
- delete arg;
- pthread_attr_destroy(&attr);
- SetTDNSThreadFinished(task, bBind, NULL);
- return;
- }
- if (pthread_create(&thr, &attr, TDNSThread, &m_threadStatus)) {
- CString sError = "Couldn't create thread for " + sHostname;
- DEBUG(sError);
- CZNC::Get().Broadcast(sError, /* bAdminOnly = */ true);
- delete arg;
- pthread_attr_destroy(&attr);
- SetTDNSThreadFinished(task, bBind, NULL);
- return;
- }
- if (pthread_sigmask(SIG_SETMASK, &old_sigmask, NULL)) {
- CString sError = "Couldn't unblock signals";
- DEBUG(sError);
- CZNC::Get().Broadcast(sError, /* bAdminOnly = */ true);
- delete arg;
- pthread_attr_destroy(&attr);
- SetTDNSThreadFinished(task, bBind, NULL);
- return;
- }
- pthread_attr_destroy(&attr);
+ CThreadPool::Get().addJob(arg);
}
void CSockManager::SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* aiResult) {
@@ -283,75 +178,15 @@ void CSockManager::SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* a
delete task;
}
-
-void CSockManager::RetrieveTDNSResult() {
- TDNSArg* a = NULL;
- size_t readed = 0;
- size_t need = sizeof(TDNSArg*);
- char* x = (char*)&a;
- while (readed < need) {
- ssize_t r = read(m_iTDNSpipe[0], x, need - readed);
- if (-1 == r) {
- DEBUG("Something bad happened during read() from a pipe when getting result of TDNSThread: " << strerror(errno));
- exit(1);
- }
- readed += r;
- x += r;
- }
- TDNSTask* task = a->task;
- if (0 != a->iRes) {
- DEBUG("Error in threaded DNS: " << gai_strerror(a->iRes));
- if (a->aiResult) {
- DEBUG("And aiResult is not NULL...");
- }
- a->aiResult = NULL; // just for case. Maybe to call freeaddrinfo()?
- }
- SetTDNSThreadFinished(task, a->bBind, a->aiResult);
- delete a;
-}
#endif /* HAVE_THREADED_DNS */
CSockManager::CSockManager() {
-#ifdef HAVE_THREADED_DNS
- int m = pthread_mutex_init(&m_threadStatus.mutex, NULL);
- if (m) {
- CUtils::PrintError("Can't initialize mutex: " + CString(strerror(errno)));
- exit(1);
- }
- m = pthread_cond_init(&m_threadStatus.cond, NULL);
- if (m) {
- CUtils::PrintError("Can't initialize condition variable: " + CString(strerror(errno)));
- exit(1);
- }
-
- m_threadStatus.num_threads = 0;
- m_threadStatus.num_idle = 0;
- m_threadStatus.done = false;
-
- if (pipe(m_iTDNSpipe)) {
- DEBUG("Ouch, can't open pipe for threaded DNS resolving: " << strerror(errno));
- exit(1);
- }
-
- MonitorFD(new CTDNSMonitorFD(this));
+#ifdef HAVE_PTHREAD
+ MonitorFD(new CTDNSMonitorFD());
#endif
}
CSockManager::~CSockManager() {
-#ifdef HAVE_THREADED_DNS
- /* Anyone has an idea how this can be done less ugly? */
- pthread_mutex_lock(&m_threadStatus.mutex);
- m_threadStatus.done = true;
- while (m_threadStatus.num_threads > 0) {
- pthread_cond_broadcast(&m_threadStatus.cond);
- pthread_mutex_unlock(&m_threadStatus.mutex);
- usleep(100);
- pthread_mutex_lock(&m_threadStatus.mutex);
- }
- pthread_mutex_unlock(&m_threadStatus.mutex);
- pthread_cond_destroy(&m_threadStatus.cond);
- pthread_mutex_destroy(&m_threadStatus.mutex);
-#endif
}
void CSockManager::Connect(const CString& sHostname, u_short iPort, const CString& sSockName, int iTimeout, bool bSSL, const CString& sBindHost, CZNCSock *pcSock) {
View
128 src/Threads.cpp
@@ -0,0 +1,128 @@
+/*
+ * Copyright (C) 2004-2012 See the AUTHORS file for details.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation.
+ */
+
+#include <znc/Threads.h>
+#include <znc/ZNCDebug.h>
+
+#ifdef HAVE_PTHREAD
+
+/* Just an arbitrary limit for the number of idle threads */
+static const size_t MAX_IDLE_THREADS = 3;
+
+/* Just an arbitrary limit for the number of running threads */
+static const size_t MAX_TOTAL_THREADS = 20;
+
+CThreadPool& CThreadPool::Get() {
+ // Beware! The following is not thread-safe! This function must
+ // be called once any thread is started.
+ static CThreadPool pool;
+ return pool;
+}
+
+CThreadPool::CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) {
+ if (pipe(m_iJobPipe)) {
+ DEBUG("Ouch, can't open pipe for thread pool: " << strerror(errno));
+ exit(1);
+ }
+}
+
+void CThreadPool::jobDone(const CJob* job) const {
+ // This write() must succeed because POSIX guarantees that writes of
+ // less than PIPE_BUF are atomic (and PIPE_BUF is at least 512).
+ // (Yes, this really wants to write a pointer(!) to the pipe.
+ size_t w = write(m_iJobPipe[1], &job, sizeof(job));
+ if (w != sizeof(job)) {
+ DEBUG("Something bad happened during write() to a pipe for thread pool, wrote " << w << " bytes: " << strerror(errno));
+ exit(1);
+ }
+}
+
+void CThreadPool::handlePipeReadable() const {
+ CJob* a = NULL;
+ ssize_t need = sizeof(a);
+ ssize_t r = read(m_iJobPipe[0], &a, need);
+ if (r != need) {
+ DEBUG("Something bad happened during read() from a pipe for thread pool: " << strerror(errno));
+ exit(1);
+ }
+ a->runMain();
+ delete a;
+}
+
+CThreadPool::~CThreadPool() {
+ /* Anyone has an idea how this can be done less ugly? */
+ CMutexLocker guard(m_mutex);
+ m_done = true;
+
+ while (m_num_threads > 0) {
+ m_cond.broadcast();
+ guard.unlock();
+ usleep(100);
+ guard.lock();
+ }
+}
+
+bool CThreadPool::threadNeeded() const {
+ if (m_num_idle > MAX_IDLE_THREADS)
+ return false;
+ return !m_done;
+}
+
+void CThreadPool::threadFunc() {
+ CMutexLocker guard(m_mutex);
+ m_num_threads++;
+ m_num_idle++;
+
+ while (true) {
+ while (m_jobs.empty()) {
+ if (!threadNeeded())
+ break;
+ m_cond.wait(m_mutex);
+ }
+ if (!threadNeeded())
+ break;
+
+ // Figure out a job to do
+ CJob *job = m_jobs.front();
+ m_jobs.pop_front();
+
+ // Now do the actual job
+ m_num_idle--;
+ guard.unlock();
+
+ job->runThread();
+ jobDone(job);
+
+ guard.lock();
+ m_num_idle++;
+ }
+ assert(m_num_threads > 0 && m_num_idle > 0);
+ m_num_threads--;
+ m_num_idle--;
+}
+
+void CThreadPool::addJob(CJob *job) {
+ CMutexLocker guard(m_mutex);
+ m_jobs.push_back(job);
+
+ // Do we already have a thread which can handle this job?
+ if (m_num_idle > 0) {
+ m_cond.signal();
+ return;
+ }
+
+ if (m_num_threads >= MAX_TOTAL_THREADS)
+ // We can't start a new thread. The job will be handled once
+ // some thread finishes its current job.
+ return;
+
+ // Start a new thread for our pool
+ CThread::startThread(threadPoolFunc, this);
+}
+
+#endif // HAVE_PTHREAD

No commit comments for this range

Something went wrong with that request. Please try again.