Skip to content

Commit

Permalink
thread fix
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanocasazza committed Sep 3, 2015
1 parent 4fcfb40 commit c97f456
Show file tree
Hide file tree
Showing 16 changed files with 507 additions and 252 deletions.
2 changes: 1 addition & 1 deletion include/ulib/base/win32/system.h
Expand Up @@ -382,7 +382,6 @@ U_EXPORT int inet_aton(const char *cp, struct in_addr *addr);
U_EXPORT int setrlimit(int resource, const struct rlimit* rlim);
U_EXPORT int socketpair(int d, int type, int protocol, int sv[2]);
U_EXPORT ssize_t writev(int fd, const struct iovec* vector, int count);
U_EXPORT const char* inet_ntop(int af, const void* src, char* dst, size_t size);
U_EXPORT int sigprocmask(int how, const sigset_t* set, sigset_t* oldset);
U_EXPORT void* mmap(void* start, size_t length, int prot, int flags, int fd, off_t offset);
U_EXPORT int sigaction(int signum, const struct sigaction* act, struct sigaction* oldact);
Expand All @@ -392,6 +391,7 @@ U_EXPORT int setitimer(int which, const struct itimerval* value, stru
*
* U_EXPORT int gettimeofday(struct timeval* tv, void* tz);
* U_EXPORT int truncate(const char* fname, off_t distance);
* U_EXPORT const char* inet_ntop(int af, const void* src, char* dst, size_t size);
*/

U_EXPORT int raise_w32(int nsig);
Expand Down
5 changes: 5 additions & 0 deletions include/ulib/container/vector.h
Expand Up @@ -25,6 +25,7 @@
*/

class UHTTP;
class UThreadPool;
class UHttpPlugIn;
class UFileConfig;
class UNoCatPlugIn;
Expand Down Expand Up @@ -392,6 +393,8 @@ template <> class U_EXPORT UVector<void*> {
UVector<void*>& operator=(const UVector<void*>&) { return *this; }
#endif

friend class UThreadPool;

template <class T> friend class UOrmTypeHandler;
template <class T> friend class UJsonTypeHandler;
};
Expand Down Expand Up @@ -904,6 +907,8 @@ template <class T> class U_EXPORT UVector<T*> : public UVector<void*> {
#endif

private:
friend class UThreadPool;

#ifdef U_COMPILER_DELETE_MEMBERS
UVector<T*>& operator=(const UVector<T*>&) = delete;
#else
Expand Down
2 changes: 1 addition & 1 deletion include/ulib/internal/platform.h
Expand Up @@ -210,7 +210,7 @@
# endif
/* Require for compiling with critical sections */
# ifndef _WIN32_WINNT
# define _WIN32_WINNT 0x0501
# define _WIN32_WINNT 0x0600
# endif
/* Make sure we're consistent with _WIN32_WINNT */
# ifndef WINVER
Expand Down
181 changes: 148 additions & 33 deletions include/ulib/thread.h
Expand Up @@ -18,15 +18,20 @@
#include <ulib/container/vector.h>

#ifdef _MSWINDOWS_
# include <synchapi.h>
# undef sleep
# undef signal
# define PTHREAD_CREATE_DETACHED 1
#else
# ifdef HAVE_SYS_SYSCALL_H
# include <sys/syscall.h>
# endif
# define U_SIGSTOP (SIGRTMIN+5)
# define U_SIGCONT (SIGRTMIN+6)
#endif

class UNotifier;
class UThreadPool;
class UServer_Base;

class U_EXPORT UThread {
Expand All @@ -40,8 +45,35 @@ class U_EXPORT UThread {

// COSTRUTTORI

UThread(int detachstate);
virtual ~UThread();
UThread(int _detachstate)
{
U_TRACE_REGISTER_OBJECT(0, UThread, "%d", _detachstate)

next = 0;
detachstate = _detachstate;
cancel = 0;
sid = 0;
tid = 0;
# ifdef _MSWINDOWS_
cancellation = 0;
# else
suspendCount = 0;
# endif
}

virtual ~UThread()
{
U_TRACE_UNREGISTER_OBJECT(0, UThread)

if (tid)
{
# ifndef _MSWINDOWS_
if (isDetached()) suspend();
# endif

close();
}
}

// SERVICES

Expand All @@ -61,6 +93,27 @@ class U_EXPORT UThread {

LeaveCriticalSection(pmutex);
}

static void wait(CRITICAL_SECTION* pmutex, CONDITION_VARIABLE* pcond)
{
U_TRACE(0, "UThread::wait(%p,%p)", pmutex, pcond)

SleepConditionVariableCS(pcond, pmutex, INFINITE); // block until we are signalled from other...
}

static void signal(CONDITION_VARIABLE* pcond)
{
U_TRACE(0, "UThread::signal(%p)", pcond)

WakeConditionVariable(pcond); // signal to waiting thread...
}

static void signalAll(CONDITION_VARIABLE* pcond)
{
U_TRACE(0, "UThread::signalAll(%p)", pcond)

WakeAllConditionVariable(pcond); // signal to waiting thread...
}
#else
static pid_t getTID();

Expand All @@ -78,6 +131,27 @@ class U_EXPORT UThread {
(void) U_SYSCALL(pthread_mutex_unlock, "%p", pmutex);
}

static void wait(pthread_mutex_t* pmutex, pthread_cond_t* pcond)
{
U_TRACE(0, "UThread::wait(%p,%p)", pmutex, pcond)

(void) U_SYSCALL(pthread_cond_wait, "%p,%p", pcond, pmutex); // block until we are signalled from other...
}

static void signal(pthread_cond_t* pcond)
{
U_TRACE(0, "UThread::signal(%p)", pcond)

(void) U_SYSCALL(pthread_cond_signal, "%p", pcond); // signal to waiting thread...
}

static void signalAll(pthread_cond_t* pcond)
{
U_TRACE(0, "UThread::signalAll(%p)", pcond)

(void) U_SYSCALL(pthread_cond_broadcast, "%p", pcond); // signal to waiting thread...
}

static bool initRwLock(pthread_rwlock_t* prwlock)
{
U_TRACE(1, "UThread::initRwLock(%p)", prwlock)
Expand Down Expand Up @@ -151,7 +225,7 @@ class U_EXPORT UThread {
return;
}

UTimeVal(timeoutMS / 1000L, (timeoutMS % 1000L) * 1000L).nanosleep();
UTimeVal::nanosleep(timeoutMS);
}

/**
Expand Down Expand Up @@ -281,40 +355,47 @@ class U_EXPORT UThread {
UThread* next;
int detachstate, cancel;
pid_t sid;

#ifdef _MSWINDOWS_
DWORD tid;
HANDLE cancellation;
#else
pthread_t tid;
pthread_attr_t attr;
int suspendCount;
#endif

static UThread* first;

void close();

static void threadStart(UThread* th)
void threadStart()
{
U_TRACE(0, "UThread::threadStart(%p)", th)
U_TRACE(0, "UThread::threadStart()")

U_INTERNAL_ASSERT_POINTER(th)
U_INTERNAL_DUMP("tid = %p sid = %u", tid, sid)

U_INTERNAL_DUMP("th->tid = %p th->sid = %u", th->tid, th->sid)
setCancel(cancelDeferred);

th->setCancel(cancelDeferred);
run();

th->run();

U_INTERNAL_DUMP("th->tid = %p th->sid = %u", th->tid, th->sid)
U_INTERNAL_DUMP("tid = %p sid = %u", tid, sid)

if (th->tid) th->close();
if (tid) close();
}

#ifdef _MSWINDOWS_
static unsigned __stdcall execHandler(void* th);
static unsigned __stdcall execHandler(void* th)
{
U_TRACE(0, "UThread::::execHandler(%p)", th)

U_INTERNAL_ASSERT_POINTER(th)
// U_INTERNAL_ASSERT_EQUALS(GetCurrentThreadId(), th->tid)

((UThread*)th)->threadStart();

U_RETURN(0);
}
#else
void maskSignal();
void sigInstall(int signo);
void manageSignal(int signo);

Expand All @@ -327,7 +408,22 @@ class U_EXPORT UThread {
if (th) th->manageSignal(signo);
}

static void execHandler(UThread* th);
static void execHandler(UThread* th)
{
U_TRACE(0, "UThread::execHandler(%p)", th)

U_INTERNAL_ASSERT_POINTER(th)

# ifdef HAVE_SYS_SYSCALL_H
th->sid = syscall(SYS_gettid);
# endif

// U_INTERNAL_ASSERT_EQUALS(pthread_self(), th->tid)

th->maskSignal();

th->threadStart();
}

static void threadCleanup(UThread* th)
{
Expand Down Expand Up @@ -361,17 +457,19 @@ class U_EXPORT UThread {

private:
friend class UNotifier;
friend class UThreadPool;
friend class UServer_Base;

#ifdef U_COMPILER_DELETE_MEMBERS
UThread(const UThread&) = delete;
UThread& operator=(const UThread&) = delete;
#else
UThread(const UThread&) {}
UThread& operator=(const UThread&) { return *this; }
#endif
};

// UThreadPool class manages all the UThreadPool related activities. This includes keeping track of idle threads and snchronizations between all threads.
// Using UThreadPool is advantageous only when the work to be done is really time consuming. (at least 1 or 2 seconds)

class U_EXPORT UThreadPool : public UThread {
public:
// Check for memory error
Expand All @@ -386,23 +484,40 @@ class U_EXPORT UThreadPool : public UThread {
UThreadPool(uint32_t size);
~UThreadPool();

// define method VIRTUAL of class UThread
// SERVICES

virtual void run() U_DECL_FINAL
void addTask(UThread* task)
{
U_TRACE(0, "UThreadPool::run()")
U_TRACE(0, "UThreadPool::addTask(%p)", task)

/*
for (task in queue)
{
if (task == STOP_WORKING) break;
U_INTERNAL_ASSERT(active)

do work;
}
*/
lock(&tasks_mutex);

queue.push(task);

unlock(&tasks_mutex);

signal(&condition); // Waking up the threads so they will know there is a job to do
}

// SERVICES
// This function gives the user the ability to send 10 tasks to the thread pool then to wait till
// all the tasks completed, and give the next 10 which are dependand on the result of the previous ones

void waitForWorkToBeFinished()
{
U_TRACE(0, "UThreadPool::waitForWorkToBeFinished()")

lock(&tasks_mutex);

while (queue._length != 0) wait(&tasks_mutex, &condition_task_finished);

unlock(&tasks_mutex);
}

// define method VIRTUAL of class UThread

virtual void run() U_DECL_OVERRIDE;

// DEBUG

Expand All @@ -416,11 +531,11 @@ class U_EXPORT UThreadPool : public UThread {
bool active;

#ifdef _MSWINDOWS_
CRITICAL_SECTION tasksMutex; // Task queue mutex
CONDITION_VARIABLE condition; // Condition variable
CRITICAL_SECTION tasks_mutex; // Task queue mutex
CONDITION_VARIABLE condition, condition_task_finished; // Condition variable
#else
pthread_mutex_t tasksMutex; // Task queue mutex
pthread_cond_t condition; // Condition variable
pthread_mutex_t tasks_mutex; // Task queue mutex
pthread_cond_t condition, condition_task_finished; // Condition variable
#endif

private:
Expand Down
3 changes: 2 additions & 1 deletion include/ulib/timeval.h
Expand Up @@ -347,7 +347,8 @@ class U_EXPORT UTimeVal : public timeval {

// SERVICES

void nanosleep();
void nanosleep();
static void nanosleep(time_t timeoutMS) { UTimeVal(timeoutMS / 1000L, (timeoutMS % 1000L) * 1000L).nanosleep(); }

// CHRONOMETER

Expand Down
4 changes: 2 additions & 2 deletions src/ulib/base/base_trace.c
Expand Up @@ -86,10 +86,10 @@ void u_trace_lock(void)

#ifdef ENABLE_THREAD
# ifdef _MSWINDOWS_
if (old_tid == 0) InitializeCriticalSection(&mutex);

DWORD tid = GetCurrentThreadId();

if (old_tid == 0) InitializeCriticalSection(&mutex);

EnterCriticalSection(&mutex);
# else
pthread_t tid;
Expand Down
2 changes: 2 additions & 0 deletions src/ulib/base/win32/mingw32.c
Expand Up @@ -86,6 +86,7 @@ int inet_aton(const char* src, struct in_addr* addr)
return 1;
}

/*
const char* inet_ntop(int af, const void* src, char* dst, size_t size)
{
U_INTERNAL_TRACE("inet_ntop(%d,%p,%s,%d)", af, src, dst, size)
Expand Down Expand Up @@ -122,6 +123,7 @@ const char* inet_ntop(int af, const void* src, char* dst, size_t size)
return 0;
}
*/

#define isWindow9x() (version.dwPlatformId == VER_PLATFORM_WIN32_WINDOWS)
#define isWindowNT() (version.dwPlatformId == VER_PLATFORM_WIN32_NT)
Expand Down
2 changes: 1 addition & 1 deletion src/ulib/command.cpp
Expand Up @@ -378,7 +378,7 @@ U_NO_EXPORT bool UCommand::postCommand(UString* input, UString* output)

UProcess::kill(pid, SIGTERM);

UTimeVal(1L).nanosleep();
UTimeVal::nanosleep(1L);

UProcess::kill(pid, SIGKILL);
}
Expand Down

0 comments on commit c97f456

Please sign in to comment.