From 409c47070f6c7e58ee07fad870dcb4b1ecdfbc25 Mon Sep 17 00:00:00 2001 From: "Jorge Chamorro Bieling / @jorgechamorro" Date: Tue, 29 Jan 2013 16:07:01 +0100 Subject: [PATCH] use libuv if available --- src/threads_a_gogo.cc | 57 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/src/threads_a_gogo.cc b/src/threads_a_gogo.cc index 98f474e..e5874f8 100644 --- a/src/threads_a_gogo.cc +++ b/src/threads_a_gogo.cc @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -10,10 +11,10 @@ #include #include -/* -static int debug_threads= 0; -static int debug_allocs= 0; -*/ +#define TAGG_USE_LIBUV +#if (NODE_MAJOR_VERSION == 0) && (NODE_MINOR_VERSION <= 5) +#undef TAGG_USE_LIBUV +#endif #include "queues_a_gogo.cc" @@ -29,7 +30,11 @@ static typeQueue* freeThreadsQueue= NULL; #define kThreadMagicCookie 0x99c0ffee typedef struct { +#ifdef TAGG_USE_LIBUV + uv_async_t async_watcher; //MUST be the first one +#else ev_async async_watcher; //MUST be the first one +#endif long int id; pthread_t thread; @@ -199,7 +204,11 @@ static void* aThread (void* arg) { thread->isolate->Dispose(); // wake up callback - if (!ev_async_pending(&thread->async_watcher)) ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); +#ifdef TAGG_USE_LIBUV + uv_async_send(&thread->async_watcher); +#else + ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); +#endif return NULL; } @@ -278,7 +287,11 @@ static void eventLoop (typeThread* thread) { job->typeEval.resultado= new String::Utf8Value(job->typeEval.error ? onError.Exception() : resultado); queue_push(qitem, &thread->outQueue); // wake up callback - if (!ev_async_pending(&thread->async_watcher)) ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); +#ifdef TAGG_USE_LIBUV + uv_async_send(&thread->async_watcher); +#else + ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); +#endif } else { queue_push(qitem, freeJobsQueue); @@ -359,8 +372,13 @@ static void destroyaThread (typeThread* thread) { thread->JSObject->SetPointerInInternalField(0, NULL); thread->JSObject.Dispose(); +#ifdef TAGG_USE_LIBUV + uv_close((uv_handle_t*) &thread->async_watcher, NULL); + //uv_unref(&thread->async_watcher); +#else ev_async_stop(EV_DEFAULT_UC_ &thread->async_watcher); ev_unref(EV_DEFAULT_UC); +#endif if (freeThreadsQueue) { queue_push(nuItem(kItemTypePointer, thread), freeThreadsQueue); @@ -377,7 +395,13 @@ static void destroyaThread (typeThread* thread) { // C callback that runs in the main nodejs thread. This is the one responsible for // calling the thread's JS callback. -static void Callback (EV_P_ ev_async *watcher, int revents) { +static void Callback ( +#ifdef TAGG_USE_LIBUV + uv_async_t *watcher +#else + EV_P_ ev_async *watcher +#endif +, int revents) { typeThread* thread= (typeThread*) watcher; if (thread->sigkill) { @@ -420,7 +444,11 @@ static void Callback (EV_P_ ev_async *watcher, int revents) { if (onError.HasCaught()) { if (thread->outQueue.first) { +#ifdef TAGG_USE_LIBUV + uv_async_send(&thread->async_watcher); // wake up callback again +#else ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); // wake up callback again +#endif } node::FatalException(onError); return; @@ -643,7 +671,12 @@ static Handle threadEmit (const Arguments &args) { } while (++i <= job->typeEvent.length); queue_push(qitem, &thread->outQueue); - if (!ev_async_pending(&thread->async_watcher)) ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); // wake up callback + +#ifdef TAGG_USE_LIBUV + uv_async_send(&thread->async_watcher); // wake up callback +#else + ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); // wake up callback +#endif //fprintf(stdout, "*** threadEmit END\n"); @@ -682,9 +715,13 @@ static Handle Create (const Arguments &args) { Local dispatchEvents= Script::Compile(String::New(kEvents_js))->Run()->ToObject()->CallAsFunction(thread->JSObject, 0, NULL); thread->dispatchEvents= Persistent::New(dispatchEvents->ToObject()); +#ifdef TAGG_USE_LIBUV + uv_async_init(uv_default_loop(), &thread->async_watcher, Callback); +#else ev_async_init(&thread->async_watcher, Callback); ev_async_start(EV_DEFAULT_UC_ &thread->async_watcher); ev_ref(EV_DEFAULT_UC); +#endif pthread_cond_init(&thread->IDLE_cv, NULL); pthread_mutex_init(&thread->IDLE_mutex, NULL); @@ -701,7 +738,10 @@ static Handle Create (const Arguments &args) { return ThrowException(Exception::TypeError(String::New("create(): error in pthread_create()"))); } + /* V8::AdjustAmountOfExternalAllocatedMemory(sizeof(typeThread)); //OJO V8 con V mayúscula. + */ + return scope.Close(thread->JSObject); } @@ -729,7 +769,6 @@ void Init (Handle target) { threadTemplate->Set(String::NewSymbol("load"), FunctionTemplate::New(Load)); threadTemplate->Set(String::NewSymbol("emit"), FunctionTemplate::New(processEmit)); threadTemplate->Set(String::NewSymbol("destroy"), FunctionTemplate::New(Destroy)); - }