Skip to content

Commit

Permalink
use libuv if available
Browse files Browse the repository at this point in the history
  • Loading branch information
xk committed Jan 30, 2013
1 parent e9411e5 commit 409c470
Showing 1 changed file with 48 additions and 9 deletions.
57 changes: 48 additions & 9 deletions src/threads_a_gogo.cc
Expand Up @@ -3,17 +3,18 @@

#include <v8.h>
#include <node.h>
#include <node_version.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string>

/*
static int debug_threads= 0;
static int debug_allocs= 0;
*/
#define TAGG_USE_LIBUV
#if (NODE_MAJOR_VERSION == 0) && (NODE_MINOR_VERSION <= 5)
#undef TAGG_USE_LIBUV
#endif

#include "queues_a_gogo.cc"

Expand All @@ -29,7 +30,11 @@ static typeQueue* freeThreadsQueue= NULL;

#define kThreadMagicCookie 0x99c0ffee
typedef struct {
#ifdef TAGG_USE_LIBUV
uv_async_t async_watcher; //MUST be the first one
#else
ev_async async_watcher; //MUST be the first one
#endif

long int id;
pthread_t thread;
Expand Down Expand Up @@ -199,7 +204,11 @@ static void* aThread (void* arg) {
thread->isolate->Dispose();

// wake up callback
if (!ev_async_pending(&thread->async_watcher)) ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher);
#ifdef TAGG_USE_LIBUV
uv_async_send(&thread->async_watcher);
#else
ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher);
#endif

return NULL;
}
Expand Down Expand Up @@ -278,7 +287,11 @@ static void eventLoop (typeThread* thread) {
job->typeEval.resultado= new String::Utf8Value(job->typeEval.error ? onError.Exception() : resultado);
queue_push(qitem, &thread->outQueue);
// wake up callback
if (!ev_async_pending(&thread->async_watcher)) ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher);
#ifdef TAGG_USE_LIBUV
uv_async_send(&thread->async_watcher);
#else
ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher);
#endif
}
else {
queue_push(qitem, freeJobsQueue);
Expand Down Expand Up @@ -359,8 +372,13 @@ static void destroyaThread (typeThread* thread) {
thread->JSObject->SetPointerInInternalField(0, NULL);
thread->JSObject.Dispose();

#ifdef TAGG_USE_LIBUV
uv_close((uv_handle_t*) &thread->async_watcher, NULL);
//uv_unref(&thread->async_watcher);
#else
ev_async_stop(EV_DEFAULT_UC_ &thread->async_watcher);
ev_unref(EV_DEFAULT_UC);
#endif

if (freeThreadsQueue) {
queue_push(nuItem(kItemTypePointer, thread), freeThreadsQueue);
Expand All @@ -377,7 +395,13 @@ static void destroyaThread (typeThread* thread) {

// C callback that runs in the main nodejs thread. This is the one responsible for
// calling the thread's JS callback.
static void Callback (EV_P_ ev_async *watcher, int revents) {
static void Callback (
#ifdef TAGG_USE_LIBUV
uv_async_t *watcher
#else
EV_P_ ev_async *watcher
#endif
, int revents) {
typeThread* thread= (typeThread*) watcher;

if (thread->sigkill) {
Expand Down Expand Up @@ -420,7 +444,11 @@ static void Callback (EV_P_ ev_async *watcher, int revents) {

if (onError.HasCaught()) {
if (thread->outQueue.first) {
#ifdef TAGG_USE_LIBUV
uv_async_send(&thread->async_watcher); // wake up callback again
#else
ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); // wake up callback again
#endif
}
node::FatalException(onError);
return;
Expand Down Expand Up @@ -643,7 +671,12 @@ static Handle<Value> threadEmit (const Arguments &args) {
} while (++i <= job->typeEvent.length);

queue_push(qitem, &thread->outQueue);
if (!ev_async_pending(&thread->async_watcher)) ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); // wake up callback

#ifdef TAGG_USE_LIBUV
uv_async_send(&thread->async_watcher); // wake up callback
#else
ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); // wake up callback
#endif

//fprintf(stdout, "*** threadEmit END\n");

Expand Down Expand Up @@ -682,9 +715,13 @@ static Handle<Value> Create (const Arguments &args) {
Local<Value> dispatchEvents= Script::Compile(String::New(kEvents_js))->Run()->ToObject()->CallAsFunction(thread->JSObject, 0, NULL);
thread->dispatchEvents= Persistent<Object>::New(dispatchEvents->ToObject());

#ifdef TAGG_USE_LIBUV
uv_async_init(uv_default_loop(), &thread->async_watcher, Callback);
#else
ev_async_init(&thread->async_watcher, Callback);
ev_async_start(EV_DEFAULT_UC_ &thread->async_watcher);
ev_ref(EV_DEFAULT_UC);
#endif

pthread_cond_init(&thread->IDLE_cv, NULL);
pthread_mutex_init(&thread->IDLE_mutex, NULL);
Expand All @@ -701,7 +738,10 @@ static Handle<Value> Create (const Arguments &args) {
return ThrowException(Exception::TypeError(String::New("create(): error in pthread_create()")));
}

/*
V8::AdjustAmountOfExternalAllocatedMemory(sizeof(typeThread)); //OJO V8 con V mayúscula.
*/

return scope.Close(thread->JSObject);
}

Expand Down Expand Up @@ -729,7 +769,6 @@ void Init (Handle<Object> target) {
threadTemplate->Set(String::NewSymbol("load"), FunctionTemplate::New(Load));
threadTemplate->Set(String::NewSymbol("emit"), FunctionTemplate::New(processEmit));
threadTemplate->Set(String::NewSymbol("destroy"), FunctionTemplate::New(Destroy));

}


Expand Down

0 comments on commit 409c470

Please sign in to comment.