Skip to content

Commit

Permalink
Merge 21f65d2 into 3eb906a
Browse files Browse the repository at this point in the history
  • Loading branch information
srajko committed May 5, 2016
2 parents 3eb906a + 21f65d2 commit 88e6d6e
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 1 deletion.
33 changes: 33 additions & 0 deletions generate/templates/manual/include/async_libgit2_queue_worker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#ifndef ASYNC_LIBGIT2_QUEUE_WORKER_H
#define ASYNC_LIBGIT2_QUEUE_WORKER_H

#include <nan.h>
#include <uv.h>
#include "../include/thread_pool.h"
#include "../include/nodegit.h"


// Runs WorkComplete of the scheduled AsyncWorker,
// and destroys it. This is run in the uv_default_loop event loop.
NAN_INLINE void AsyncLibgit2Complete (void* data) {
Nan::AsyncWorker *worker = static_cast<Nan::AsyncWorker*>(data);
worker->WorkComplete();
worker->Destroy();
}

// Runs Execute of the scheduled AyncWorker on the dedicated libgit2 thread /
// event loop, and schedules the WorkComplete callback to run on the
// uv_default_loop event loop
NAN_INLINE void AsyncLibgit2Execute (void *vworker) {
// execute the worker
Nan::AsyncWorker *worker = static_cast<Nan::AsyncWorker*>(vworker);
worker->Execute();
}

// Schedules the AsyncWorker to run on the dedicated libgit2 thread / event loop,
// and on completion AsyncLibgit2Complete on the default loop
NAN_INLINE void AsyncLibgit2QueueWorker (Nan::AsyncWorker* worker) {
libgit2ThreadPool.QueueWork(AsyncLibgit2Execute, AsyncLibgit2Complete, worker);
}

#endif
8 changes: 8 additions & 0 deletions generate/templates/manual/include/nodegit.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#ifndef NODEGIT_H
#define NODEGIT_H

#include "thread_pool.h"

extern ThreadPool libgit2ThreadPool;

#endif
45 changes: 45 additions & 0 deletions generate/templates/manual/include/thread_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <uv.h>
#include <queue>

class ThreadPool {
typedef void (*Callback) (void *);
struct Work {
Callback workCallback;
Callback loopCallback;
void *data;

Work(Callback workCallback, Callback loopCallback, void *data)
: workCallback(workCallback), loopCallback(loopCallback), data(data) {
}
};

// work to be performed on the threadpool
std::queue<Work> workQueue;
uv_mutex_t workMutex;
uv_sem_t workSemaphore;
int workInProgressCount;

// completion callbacks to be performed on the loop
std::queue<Work> loopQueue;
uv_mutex_t loopMutex;
uv_async_t loopAsync;

static void RunEventQueue(void *threadPool);
void RunEventQueue();
static void RunLoopCallbacks(uv_async_t* handle);
void RunLoopCallbacks();
public:
// Initializes thread pool and spins up the requested number of threads
// The provided loop will be used for completion callbacks, whenever
// queued work is completed
ThreadPool(int numberOfThreads, uv_loop_t *loop);
// Queues work on the thread pool, followed by completion call scheduled
// on the loop provided in the constructor.
// QueueWork should be called on the loop provided in the constructor.
void QueueWork(Callback workCallback, Callback loopCallback, void *data);
};

#endif
85 changes: 85 additions & 0 deletions generate/templates/manual/src/thread_pool.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#include "../include/thread_pool.h"

ThreadPool::ThreadPool(int numberOfThreads, uv_loop_t *loop) {
uv_mutex_init(&workMutex);
uv_sem_init(&workSemaphore, 0);

uv_async_init(loop, &loopAsync, RunLoopCallbacks);
loopAsync.data = this;
uv_unref((uv_handle_t *)&loopAsync);
uv_mutex_init(&loopMutex);

workInProgressCount = 0;

for(int i=0; i<numberOfThreads; i++) {
uv_thread_t thread;
uv_thread_create(&thread, RunEventQueue, this);
}
}

void ThreadPool::QueueWork(Callback workCallback, Callback loopCallback, void *data) {
uv_mutex_lock(&workMutex);
// there is work on the thread pool - reference the handle so
// node doesn't terminate
uv_ref((uv_handle_t *)&loopAsync);
workQueue.push(Work(workCallback, loopCallback, data));
workInProgressCount++;
uv_mutex_unlock(&workMutex);
uv_sem_post(&workSemaphore);
}

void ThreadPool::RunEventQueue(void *threadPool) {
static_cast<ThreadPool *>(threadPool)->RunEventQueue();
}

void ThreadPool::RunEventQueue() {
for ( ; ; ) {
// wait until there is work to do
uv_sem_wait(&workSemaphore);
uv_mutex_lock(&workMutex);
// the semaphore should guarantee that queue is not empty
Work work = workQueue.front();
workQueue.pop();
uv_mutex_unlock(&workMutex);

// perform the queued work
(*work.workCallback)(work.data);

// schedule the callback on the loop
uv_mutex_lock(&loopMutex);
loopQueue.push(work);
uv_mutex_unlock(&loopMutex);
uv_async_send(&loopAsync);
}
}

void ThreadPool::RunLoopCallbacks(uv_async_t* handle) {
static_cast<ThreadPool *>(handle->data)->RunLoopCallbacks();
}

void ThreadPool::RunLoopCallbacks() {
// uv_async_send can coalesce calls, so we are not guaranteed one
// RunLoopCallbacks per uv_async_send call
// so we always process the entire loopQueue
int callbacksCompleted = 0;
uv_mutex_lock(&loopMutex);
while(!loopQueue.empty()) {
Work work = loopQueue.front();
loopQueue.pop();
uv_mutex_unlock(&loopMutex);
// perform the queued loop callback
(*work.loopCallback)(work.data);
callbacksCompleted++;
uv_mutex_lock(&loopMutex);
}
uv_mutex_unlock(&loopMutex);

uv_mutex_lock(&workMutex);
// if there is no ongoing work / completion processing, node doesn't need
// to be prevented from terminating
workInProgressCount -= callbacksCompleted;
if(!workInProgressCount) {
uv_unref((uv_handle_t *)&loopAsync);
}
uv_mutex_unlock(&workMutex);
}
2 changes: 1 addition & 1 deletion generate/templates/partials/async_function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ NAN_METHOD({{ cppClassName }}::{{ cppFunctionName }}) {
{%endif%}
{%endeach%}

Nan::AsyncQueueWorker(worker);
AsyncLibgit2QueueWorker(worker);
return;
}

Expand Down
1 change: 1 addition & 0 deletions generate/templates/templates/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"src/convenient_patch.cc",
"src/convenient_hunk.cc",
"src/str_array_converter.cc",
"src/thread_pool.cc",
{% each %}
{% if type != "enum" %}
"src/{{ name }}.cc",
Expand Down
1 change: 1 addition & 0 deletions generate/templates/templates/class_content.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ extern "C" {
#include "../include/functions/copy.h"
#include "../include/{{ filename }}.h"
#include "nodegit_wrapper.cc"
#include "../include/async_libgit2_queue_worker.h"

{% each dependencies as dependency %}
#include "{{ dependency }}"
Expand Down
3 changes: 3 additions & 0 deletions generate/templates/templates/nodegit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "../include/init_ssh2.h"
#include "../include/lock_master.h"
#include "../include/nodegit.h"
#include "../include/wrapper.h"
#include "../include/promise_completion.h"
#include "../include/functions/copy.h"
Expand Down Expand Up @@ -80,6 +81,8 @@ void OpenSSL_ThreadSetup() {
CRYPTO_set_id_callback(OpenSSL_IDCallback);
}

ThreadPool libgit2ThreadPool(10, uv_default_loop());

extern "C" void init(Local<v8::Object> target) {
// Initialize thread safety in openssl and libssh2
OpenSSL_ThreadSetup();
Expand Down

0 comments on commit 88e6d6e

Please sign in to comment.