Skip to content

Commit

Permalink
CJob: Add a way to do stuff on the main thread
Browse files Browse the repository at this point in the history
This just moves the pipe from the socket code to the thread pool. However, now
all CJobs can use this and there is a single place for them to get deleted.

Signed-off-by: Uli Schlachter <psychon@znc.in>
  • Loading branch information
psychon committed Mar 15, 2013
1 parent 75f2e3f commit 53c579b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 60 deletions.
20 changes: 9 additions & 11 deletions include/znc/Socket.h
Expand Up @@ -102,11 +102,9 @@ class CSockManager : public TSocketManager<CZNCSock> {
private: private:
void FinishConnect(const CString& sHostname, u_short iPort, const CString& sSockName, int iTimeout, bool bSSL, const CString& sBindHost, CZNCSock *pcSock); void FinishConnect(const CString& sHostname, u_short iPort, const CString& sSockName, int iTimeout, bool bSSL, const CString& sBindHost, CZNCSock *pcSock);


#ifdef HAVE_THREADED_DNS
int m_iTDNSpipe[2];

class CTDNSMonitorFD; class CTDNSMonitorFD;
friend class CTDNSMonitorFD; friend class CTDNSMonitorFD;
#ifdef HAVE_THREADED_DNS
struct TDNSTask { struct TDNSTask {
CString sHostname; CString sHostname;
u_short iPort; u_short iPort;
Expand All @@ -123,19 +121,19 @@ class CSockManager : public TSocketManager<CZNCSock> {
}; };
class CDNSJob : public CJob { class CDNSJob : public CJob {
public: public:
CString sHostname; CString sHostname;
TDNSTask* task; TDNSTask* task;
int fd; CSockManager* pManager;
bool bBind; bool bBind;


int iRes; int iRes;
addrinfo* aiResult; addrinfo* aiResult;


void run(); void runThread();
void runMain();
}; };
void StartTDNSThread(TDNSTask* task, bool bBind); void StartTDNSThread(TDNSTask* task, bool bBind);
void SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* aiResult); void SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* aiResult);
void RetrieveTDNSResult();
static void* TDNSThread(void* argument); static void* TDNSThread(void* argument);
#endif #endif
protected: protected:
Expand Down
19 changes: 15 additions & 4 deletions include/znc/Threads.h
Expand Up @@ -182,22 +182,29 @@ class CThread {
class CJob { class CJob {
public: public:
virtual ~CJob() {} virtual ~CJob() {}
virtual void run() = 0; virtual void runThread() = 0;
virtual void runMain() = 0;
}; };


class CThreadPool { class CThreadPool {
private: private:
CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) { CThreadPool();
}

~CThreadPool(); ~CThreadPool();


public: public:
static CThreadPool& Get(); static CThreadPool& Get();


void addJob(CJob *job); void addJob(CJob *job);


int getReadFD() const {
return m_iJobPipe[0];
}

void handlePipeReadable() const;

private: private:
void jobDone(const CJob* pJob) const;

// Check if the calling thread is still needed, must be called with m_mutex held // Check if the calling thread is still needed, must be called with m_mutex held
bool threadNeeded() const; bool threadNeeded() const;


Expand All @@ -223,6 +230,10 @@ class CThreadPool {
// number of idle threads waiting on the condition variable // number of idle threads waiting on the condition variable
size_t m_num_idle; size_t m_num_idle;


// pipe for waking up the main thread
int m_iJobPipe[2];

// list of pending jobs
std::list<CJob *> m_jobs; std::list<CJob *> m_jobs;
}; };


Expand Down
64 changes: 20 additions & 44 deletions src/Socket.cpp
Expand Up @@ -35,24 +35,24 @@ int CZNCSock::ConvertAddress(const struct sockaddr_storage * pAddr, socklen_t iA
return ret; return ret;
} }


#ifdef HAVE_THREADED_DNS #ifdef HAVE_PTHREAD
class CSockManager::CTDNSMonitorFD : public CSMonitorFD { class CSockManager::CTDNSMonitorFD : public CSMonitorFD {
CSockManager* m_Manager;
public: public:
CTDNSMonitorFD(CSockManager* mgr) { CTDNSMonitorFD() {
m_Manager = mgr; Add(CThreadPool::Get().getReadFD(), ECT_Read);
Add(mgr->m_iTDNSpipe[0], ECT_Read);
} }


virtual bool FDsThatTriggered(const std::map<int, short>& miiReadyFds) { virtual bool FDsThatTriggered(const std::map<int, short>& miiReadyFds) {
if (miiReadyFds.find(m_Manager->m_iTDNSpipe[0])->second) { if (miiReadyFds.find(CThreadPool::Get().getReadFD())->second) {
m_Manager->RetrieveTDNSResult(); CThreadPool::Get().handlePipeReadable();
} }
return true; return true;
} }
}; };
#endif


void CSockManager::CDNSJob::run() { #ifdef HAVE_THREADED_DNS
void CSockManager::CDNSJob::runThread() {
int iCount = 0; int iCount = 0;
while (true) { while (true) {
addrinfo hints; addrinfo hints;
Expand All @@ -73,27 +73,28 @@ void CSockManager::CDNSJob::run() {
} }
sleep(5); // wait 5 seconds before next try sleep(5); // wait 5 seconds before next try
} }
}


// This write() must succeed because POSIX guarantees that writes of void CSockManager::CDNSJob::runMain() {
// less than PIPE_BUF are atomic (and PIPE_BUF is at least 512). if (0 != this->iRes) {
// (Yes, this really wants to write a pointer(!) to the pipe. DEBUG("Error in threaded DNS: " << gai_strerror(this->iRes));
CDNSJob *job = this; if (this->aiResult) {
size_t w = write(fd, &job, sizeof(job)); DEBUG("And aiResult is not NULL...");
if (w != sizeof(job)) { }
DEBUG("Something bad happened during write() to a pipe from TDNSThread, wrote " << w << " bytes: " << strerror(errno)); this->aiResult = NULL; // just for case. Maybe to call freeaddrinfo()?
exit(1);
} }
pManager->SetTDNSThreadFinished(this->task, this->bBind, this->aiResult);
} }


void CSockManager::StartTDNSThread(TDNSTask* task, bool bBind) { void CSockManager::StartTDNSThread(TDNSTask* task, bool bBind) {
CString sHostname = bBind ? task->sBindhost : task->sHostname; CString sHostname = bBind ? task->sBindhost : task->sHostname;
CDNSJob* arg = new CDNSJob; CDNSJob* arg = new CDNSJob;
arg->sHostname = sHostname; arg->sHostname = sHostname;
arg->task = task; arg->task = task;
arg->fd = m_iTDNSpipe[1];
arg->bBind = bBind; arg->bBind = bBind;
arg->iRes = 0; arg->iRes = 0;
arg->aiResult = NULL; arg->aiResult = NULL;
arg->pManager = this;


CThreadPool::Get().addJob(arg); CThreadPool::Get().addJob(arg);
} }
Expand Down Expand Up @@ -177,36 +178,11 @@ void CSockManager::SetTDNSThreadFinished(TDNSTask* task, bool bBind, addrinfo* a


delete task; delete task;
} }

void CSockManager::RetrieveTDNSResult() {
CDNSJob* a = NULL;
ssize_t need = sizeof(a);
ssize_t r = read(m_iTDNSpipe[0], &a, need);
if (r != need) {
DEBUG("Something bad happened during read() from a pipe when getting result of TDNSThread: " << strerror(errno));
exit(1);
}
TDNSTask* task = a->task;
if (0 != a->iRes) {
DEBUG("Error in threaded DNS: " << gai_strerror(a->iRes));
if (a->aiResult) {
DEBUG("And aiResult is not NULL...");
}
a->aiResult = NULL; // just for case. Maybe to call freeaddrinfo()?
}
SetTDNSThreadFinished(task, a->bBind, a->aiResult);
delete a;
}
#endif /* HAVE_THREADED_DNS */ #endif /* HAVE_THREADED_DNS */


CSockManager::CSockManager() { CSockManager::CSockManager() {
#ifdef HAVE_THREADED_DNS #ifdef HAVE_PTHREAD
if (pipe(m_iTDNSpipe)) { MonitorFD(new CTDNSMonitorFD());
DEBUG("Ouch, can't open pipe for threaded DNS resolving: " << strerror(errno));
exit(1);
}

MonitorFD(new CTDNSMonitorFD(this));
#endif #endif
} }


Expand Down
34 changes: 33 additions & 1 deletion src/Threads.cpp
Expand Up @@ -7,6 +7,7 @@
*/ */


#include <znc/Threads.h> #include <znc/Threads.h>
#include <znc/ZNCDebug.h>


#ifdef HAVE_PTHREAD #ifdef HAVE_PTHREAD


Expand All @@ -23,6 +24,36 @@ CThreadPool& CThreadPool::Get() {
return pool; return pool;
} }


CThreadPool::CThreadPool() : m_done(false), m_num_threads(0), m_num_idle(0) {
if (pipe(m_iJobPipe)) {
DEBUG("Ouch, can't open pipe for thread pool: " << strerror(errno));
exit(1);
}
}

void CThreadPool::jobDone(const CJob* job) const {
// This write() must succeed because POSIX guarantees that writes of
// less than PIPE_BUF are atomic (and PIPE_BUF is at least 512).
// (Yes, this really wants to write a pointer(!) to the pipe.
size_t w = write(m_iJobPipe[1], &job, sizeof(job));
if (w != sizeof(job)) {
DEBUG("Something bad happened during write() to a pipe for thread pool, wrote " << w << " bytes: " << strerror(errno));
exit(1);
}
}

void CThreadPool::handlePipeReadable() const {
CJob* a = NULL;
ssize_t need = sizeof(a);
ssize_t r = read(m_iJobPipe[0], &a, need);
if (r != need) {
DEBUG("Something bad happened during read() from a pipe for thread pool: " << strerror(errno));
exit(1);
}
a->runMain();
delete a;
}

CThreadPool::~CThreadPool() { CThreadPool::~CThreadPool() {
/* Anyone has an idea how this can be done less ugly? */ /* Anyone has an idea how this can be done less ugly? */
CMutexLocker guard(m_mutex); CMutexLocker guard(m_mutex);
Expand Down Expand Up @@ -64,7 +95,8 @@ void CThreadPool::threadFunc() {
m_num_idle--; m_num_idle--;
guard.unlock(); guard.unlock();


job->run(); job->runThread();
jobDone(job);


guard.lock(); guard.lock();
m_num_idle++; m_num_idle++;
Expand Down

0 comments on commit 53c579b

Please sign in to comment.