Windows support #28

Closed
wants to merge 7 commits into
from
View
@@ -0,0 +1,8 @@
+{
+ 'targets': [
+ {
+ 'target_name': 'threads_a_gogo',
+ 'sources': [ 'src/threads_a_gogo.cc' ]
+ }
+ ]
+}
View
@@ -26,11 +26,8 @@
"type": "git",
"url": "http://github.com/xk/node-threads-a-gogo.git"
},
- "scripts": {
- "install": "node-waf configure install"
- },
- "os": ["macos", "linux", "darwin"],
+ "gypfile": true,
"engines": {
- "node": ">=0.5.1"
+ "node": ">=0.8.0"
}
}
View
@@ -1,11 +1,13 @@
//2011-11 Proyectos Equis Ka, s.l., jorge@jorgechamorro.com
//queues_a_gogo.cc
-#include <unistd.h>
#include <string.h>
#include <stdio.h>
-#include <pthread.h>
+#include <uv.h>
#include <stdlib.h>
+#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) || defined(_AIX)
+#include <unistd.h>
+#endif
enum types {
kItemTypeNONE,
@@ -27,7 +29,7 @@ typedef struct typeQueueItem typeQueueItem;
typedef struct {
typeQueueItem* first;
typeQueueItem* last;
- pthread_mutex_t queueLock;
+ uv_mutex_t queueLock;
long int id;
volatile long int length;
} typeQueue;
@@ -41,7 +43,7 @@ static typeQueue* freeItemsQueue= NULL;
static void queue_push (typeQueueItem* qitem, typeQueue* queue) {
qitem->next= NULL;
- pthread_mutex_lock(&queue->queueLock);
+ uv_mutex_lock(&queue->queueLock);
if (queue->last) {
queue->last->next= qitem;
}
@@ -50,7 +52,7 @@ static void queue_push (typeQueueItem* qitem, typeQueue* queue) {
}
queue->last= qitem;
queue->length++;
- pthread_mutex_unlock(&queue->queueLock);
+ uv_mutex_unlock(&queue->queueLock);
}
@@ -59,7 +61,7 @@ static void queue_push (typeQueueItem* qitem, typeQueue* queue) {
static typeQueueItem* queue_pull (typeQueue* queue) {
typeQueueItem* qitem;
- pthread_mutex_lock(&queue->queueLock);
+ uv_mutex_lock(&queue->queueLock);
if ((qitem= queue->first)) {
if (queue->last == qitem) {
queue->first= queue->last= NULL;
@@ -70,7 +72,7 @@ static typeQueueItem* queue_pull (typeQueue* queue) {
queue->length--;
qitem->next= NULL;
}
- pthread_mutex_unlock(&queue->queueLock);
+ uv_mutex_unlock(&queue->queueLock);
return qitem;
}
@@ -124,7 +126,7 @@ static typeQueue* nuQueue (long int id) {
}
else {
queue= (typeQueue*) calloc(1, sizeof(typeQueue));
- pthread_mutex_init(&queue->queueLock, NULL);
+ uv_mutex_init(&queue->queueLock);
}
queue->id= id;
View
@@ -1,15 +1,30 @@
//2011-11 Proyectos Equis Ka, s.l., jorge@jorgechamorro.com
//threads_a_gogo.cc
+
#include <v8.h>
#include <node.h>
-#include <unistd.h>
+#include <uv.h>
#include <string.h>
#include <stdio.h>
-#include <pthread.h>
#include <stdlib.h>
#include <string>
+#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) || defined(_AIX)
+#define WWT_PTHREAD 1
+#include <pthread.h>
+#include <unistd.h>
+#ifndef uv_cond_t
+#define uv_cond_signal(x) pthread_cond_signal(x)
+#define uv_cond_init(x) pthread_cond_init(x, NULL)
+#define uv_cond_wait(x,y) pthread_cond_wait(x, y)
+typedef pthread_cond_t uv_cond_t;
+#endif
+#else
+#define pthread_setcancelstate(x,y) NULL
+#define pthread_setcanceltype(x,y) NULL
+#endif
+
/*
static int debug_threads= 0;
static int debug_allocs= 0;
@@ -29,19 +44,19 @@ static typeQueue* freeThreadsQueue= NULL;
#define kThreadMagicCookie 0x99c0ffee
typedef struct {
- ev_async async_watcher; //MUST be the first one
+ uv_async_t async_watcher; //MUST be the first one
long int id;
- pthread_t thread;
+ uv_thread_t thread;
volatile int sigkill;
typeQueue inQueue; //Jobs to run
typeQueue outQueue; //Jobs done
volatile int IDLE;
- pthread_cond_t IDLE_cv;
- pthread_mutex_t IDLE_mutex;
-
+ uv_cond_t IDLE_cv;
+ uv_mutex_t IDLE_mutex;
+
Isolate* isolate;
Persistent<Context> context;
Persistent<Object> JSObject;
@@ -138,12 +153,12 @@ static typeThread* isAThread (Handle<Object> receiver) {
static void pushToInQueue (typeQueueItem* qitem, typeThread* thread) {
- pthread_mutex_lock(&thread->IDLE_mutex);
+ uv_mutex_lock(&thread->IDLE_mutex);
queue_push(qitem, &thread->inQueue);
if (thread->IDLE) {
- pthread_cond_signal(&thread->IDLE_cv);
+ uv_cond_signal(&thread->IDLE_cv);
}
- pthread_mutex_unlock(&thread->IDLE_mutex);
+ uv_mutex_unlock(&thread->IDLE_mutex);
}
@@ -167,15 +182,34 @@ static Handle<Value> Puts (const Arguments &args) {
return Undefined();
}
+static Handle<Value> Print (const Arguments &args) {
+ HandleScope scope;
+ int i= 0;
+ while (i < args.Length()) {
+ String::Utf8Value c_str(args[i]);
+ fputs(*c_str, stdout);
+ i++;
+ }
+ static char end = '\n';
+ fputs(&end, stdout);
+ fflush(stdout);
+
+ //fprintf(stdout, "*** Puts END\n");
+ return Undefined();
+}
static void eventLoop (typeThread* thread);
// A background thread
+#ifdef WWT_PTHREAD
static void* aThread (void* arg) {
-
+#else
+static void aThread (void* arg) {
+#endif
+
int dummy;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &dummy);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &dummy);
@@ -199,9 +233,10 @@ static void* aThread (void* arg) {
thread->isolate->Dispose();
// wake up callback
- if (!ev_async_pending(&thread->async_watcher)) ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher);
-
+ if (!(thread->inQueue.length)) uv_async_send(&thread->async_watcher);
+#ifdef WWT_PTHREAD
return NULL;
+#endif
}
@@ -220,6 +255,7 @@ static void eventLoop (typeThread* thread) {
Local<Object> global= thread->context->Global();
global->Set(String::NewSymbol("puts"), FunctionTemplate::New(Puts)->GetFunction());
+ global->Set(String::NewSymbol("print"), FunctionTemplate::New(Print)->GetFunction());
Local<Object> threadObject= Object::New();
global->Set(String::NewSymbol("thread"), threadObject);
@@ -278,7 +314,7 @@ static void eventLoop (typeThread* thread) {
job->typeEval.resultado= new String::Utf8Value(job->typeEval.error ? onError.Exception() : resultado);
queue_push(qitem, &thread->outQueue);
// wake up callback
- if (!ev_async_pending(&thread->async_watcher)) ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher);
+ if (!(thread->inQueue.length)) uv_async_send(&thread->async_watcher);
}
else {
queue_push(qitem, freeJobsQueue);
@@ -331,14 +367,14 @@ static void eventLoop (typeThread* thread) {
if (nextTickQueueLength || thread->inQueue.length) continue;
if (thread->sigkill) break;
-
- pthread_mutex_lock(&thread->IDLE_mutex);
+
+ uv_mutex_lock(&thread->IDLE_mutex);
if (!thread->inQueue.length) {
thread->IDLE= 1;
- pthread_cond_wait(&thread->IDLE_cv, &thread->IDLE_mutex);
+ uv_cond_wait(&thread->IDLE_cv, &thread->IDLE_mutex);
thread->IDLE= 0;
}
- pthread_mutex_unlock(&thread->IDLE_mutex);
+ uv_mutex_unlock(&thread->IDLE_mutex);
}
}
@@ -359,8 +395,7 @@ static void destroyaThread (typeThread* thread) {
thread->JSObject->SetPointerInInternalField(0, NULL);
thread->JSObject.Dispose();
- ev_async_stop(EV_DEFAULT_UC_ &thread->async_watcher);
- ev_unref(EV_DEFAULT_UC);
+ uv_unref((uv_handle_t*)&thread->async_watcher);
if (freeThreadsQueue) {
queue_push(nuItem(kItemTypePointer, thread), freeThreadsQueue);
@@ -377,7 +412,7 @@ static void destroyaThread (typeThread* thread) {
// C callback that runs in the main nodejs thread. This is the one responsible for
// calling the thread's JS callback.
-static void Callback (EV_P_ ev_async *watcher, int revents) {
+static void Callback (uv_async_t *watcher, int revents) {
typeThread* thread= (typeThread*) watcher;
if (thread->sigkill) {
@@ -420,7 +455,7 @@ static void Callback (EV_P_ ev_async *watcher, int revents) {
if (onError.HasCaught()) {
if (thread->outQueue.first) {
- ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); // wake up callback again
+ uv_async_send(&thread->async_watcher); // wake up callback again
}
node::FatalException(onError);
return;
@@ -474,11 +509,11 @@ static Handle<Value> Destroy (const Arguments &args) {
if (!thread->sigkill) {
//pthread_cancel(thread->thread);
thread->sigkill= 1;
- pthread_mutex_lock(&thread->IDLE_mutex);
+ uv_mutex_lock(&thread->IDLE_mutex);
if (thread->IDLE) {
- pthread_cond_signal(&thread->IDLE_cv);
+ uv_cond_signal(&thread->IDLE_cv);
}
- pthread_mutex_unlock(&thread->IDLE_mutex);
+ uv_mutex_unlock(&thread->IDLE_mutex);
}
return Undefined();
@@ -643,7 +678,7 @@ static Handle<Value> threadEmit (const Arguments &args) {
} while (++i <= job->typeEvent.length);
queue_push(qitem, &thread->outQueue);
- if (!ev_async_pending(&thread->async_watcher)) ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); // wake up callback
+ if (!(thread->inQueue.length)) uv_async_send(&thread->async_watcher); // wake up callback
//fprintf(stdout, "*** threadEmit END\n");
@@ -682,19 +717,23 @@ static Handle<Value> Create (const Arguments &args) {
Local<Value> dispatchEvents= Script::Compile(String::New(kEvents_js))->Run()->ToObject()->CallAsFunction(thread->JSObject, 0, NULL);
thread->dispatchEvents= Persistent<Object>::New(dispatchEvents->ToObject());
- ev_async_init(&thread->async_watcher, Callback);
- ev_async_start(EV_DEFAULT_UC_ &thread->async_watcher);
- ev_ref(EV_DEFAULT_UC);
-
- pthread_cond_init(&thread->IDLE_cv, NULL);
- pthread_mutex_init(&thread->IDLE_mutex, NULL);
- pthread_mutex_init(&thread->inQueue.queueLock, NULL);
- pthread_mutex_init(&thread->outQueue.queueLock, NULL);
+ uv_async_init(uv_default_loop(), &thread->async_watcher, Callback);
+ uv_ref((uv_handle_t*)&thread->async_watcher);
+
+ uv_cond_init(&thread->IDLE_cv);
+ uv_mutex_init(&thread->IDLE_mutex);
+ uv_mutex_init(&thread->inQueue.queueLock);
+ uv_mutex_init(&thread->outQueue.queueLock);
+
+#ifdef WWT_PTHREAD
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int err= pthread_create(&thread->thread, &attr, aThread, thread);
pthread_attr_destroy(&attr);
+#else
+ int err= uv_thread_create(&thread->thread, aThread, thread);
+#endif
if (err) {
//Ha habido un error no se ha arrancado esta thread
destroyaThread(thread);
@@ -739,4 +778,4 @@ NODE_MODULE(threads_a_gogo, Init)
/*
gcc -E -I /Users/jorge/JAVASCRIPT/binarios/include/node -o /o.c /Users/jorge/JAVASCRIPT/threads_a_gogo/src/threads_a_gogo.cc && mate /o.c
-*/
+*/