Skip to content

Commit

Permalink
Merge pull request #1208 from srajko/callback-fire-forget
Browse files Browse the repository at this point in the history
Make waiting for callback result optional
  • Loading branch information
johnhaley81 committed Feb 28, 2017
2 parents bace8b7 + 921bf45 commit d329e2b
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 104 deletions.
31 changes: 21 additions & 10 deletions generate/templates/manual/include/async_baton.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,48 @@
// or field properties of configuration objects whose values are callbacks)
struct AsyncBaton {
uv_sem_t semaphore;

virtual ~AsyncBaton() {}
};

void deleteBaton(AsyncBaton *baton);

template<typename ResultT>
struct AsyncBatonWithResult : public AsyncBaton {
ResultT result;
ResultT defaultResult; // result returned if the callback doesn't return anything valid
void (*onCompletion)(AsyncBaton *);

AsyncBatonWithResult(const ResultT &defaultResult)
: defaultResult(defaultResult) {
uv_sem_init(&semaphore, 0);
}

~AsyncBatonWithResult() {
uv_sem_destroy(&semaphore);
}

void Done() {
// signal completion
uv_sem_post(&semaphore);
if (onCompletion) {
onCompletion(this);
} else {
// signal completion
uv_sem_post(&semaphore);
}
}

ResultT ExecuteAsync(ThreadPool::Callback asyncCallback) {
ResultT ExecuteAsync(ThreadPool::Callback asyncCallback, void (*onCompletion)(AsyncBaton *) = NULL) {
result = 0;
this->onCompletion = onCompletion;
if (!onCompletion) {
uv_sem_init(&semaphore, 0);
}

{
LockMaster::TemporaryUnlock temporaryUnlock;

libgit2ThreadPool.ExecuteReverseCallback(asyncCallback, this);

// wait for completion
uv_sem_wait(&semaphore);
if (!onCompletion) {
// wait for completion
uv_sem_wait(&semaphore);
uv_sem_destroy(&semaphore);
}
}

return result;
Expand Down
12 changes: 11 additions & 1 deletion generate/templates/manual/include/callback_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ class CallbackWrapper {
int throttle; // in milliseconds - if > 0, calls to the JS callback will be throttled
uint64_t lastCallTime;

// false will trigger the callback and not wait for the callback to finish
// in this case, the underlying libgit2 function will immediately be given
// the default result
bool waitForResult;

public:
CallbackWrapper() {
jsCallback = NULL;
Expand All @@ -33,12 +38,17 @@ class CallbackWrapper {
return jsCallback;
}

void SetCallback(Nan::Callback* callback, int throttle = 0) {
void SetCallback(Nan::Callback* callback, int throttle = 0, bool waitForResult = true) {
if(jsCallback) {
delete jsCallback;
}
jsCallback = callback;
this->throttle = throttle;
this->waitForResult = waitForResult;
}

bool ShouldWaitForResult() {
return waitForResult;
}

bool WillBeThrottled() {
Expand Down
31 changes: 13 additions & 18 deletions generate/templates/manual/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ class ThreadPool {
}
};

struct ReverseCall {
Callback reverseCallback;
struct LoopCallback {
Callback callback;
void *data;
bool isWork;

ReverseCall(Callback reverseCallback, void *data)
: reverseCallback(reverseCallback), data(data) {
LoopCallback(Callback callback, void *data, bool isWork)
: callback(callback), data(data), isWork(isWork) {
}
};

Expand All @@ -34,22 +35,17 @@ class ThreadPool {
uv_sem_t workSemaphore;
int workInProgressCount;

// completion callbacks to be performed on the loop
std::queue<Work> completionQueue;
uv_mutex_t completionMutex;
uv_async_t completionAsync;

// async callback made from the threadpool, executed in the loop
std::queue<ReverseCall> reverseQueue;
uv_mutex_t reverseMutex;
uv_async_t reverseAsync;
// completion and async callbacks to be performed on the loop
std::queue<LoopCallback> loopQueue;
uv_mutex_t loopMutex;
uv_async_t loopAsync;

static void RunEventQueue(void *threadPool);
void RunEventQueue();
static void RunCompletionCallbacks(uv_async_t* handle);
void RunCompletionCallbacks();
static void RunReverseCallbacks(uv_async_t *handle);
void RunReverseCallbacks();
static void RunLoopCallbacks(uv_async_t* handle);
void RunLoopCallbacks();

void QueueLoopCallback(Callback callback, void *data, bool isWork);

public:
// Initializes thread pool and spins up the requested number of threads
Expand All @@ -61,7 +57,6 @@ class ThreadPool {
// QueueWork should be called on the loop provided in the constructor.
void QueueWork(Callback workCallback, Callback completionCallback, void *data);
// Queues a callback on the loop provided in the constructor
// these block the calling thread's execution until the callback completes
void ExecuteReverseCallback(Callback reverseCallback, void *data);
};

Expand Down
5 changes: 5 additions & 0 deletions generate/templates/manual/src/async_baton.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "../include/async_baton.h"

void deleteBaton(AsyncBaton *baton) {
delete baton;
}
112 changes: 44 additions & 68 deletions generate/templates/manual/src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,10 @@ ThreadPool::ThreadPool(int numberOfThreads, uv_loop_t *loop) {
uv_mutex_init(&workMutex);
uv_sem_init(&workSemaphore, 0);

uv_async_init(loop, &completionAsync, RunCompletionCallbacks);
completionAsync.data = this;
uv_unref((uv_handle_t *)&completionAsync);
uv_mutex_init(&completionMutex);

uv_async_init(loop, &reverseAsync, RunReverseCallbacks);
reverseAsync.data = this;
uv_unref((uv_handle_t *)&reverseAsync);
uv_mutex_init(&reverseMutex);
uv_async_init(loop, &loopAsync, RunLoopCallbacks);
loopAsync.data = this;
uv_unref((uv_handle_t *)&loopAsync);
uv_mutex_init(&loopMutex);

workInProgressCount = 0;

Expand All @@ -26,25 +21,29 @@ void ThreadPool::QueueWork(Callback workCallback, Callback completionCallback, v
uv_mutex_lock(&workMutex);
// there is work on the thread pool - reference the handle so
// node doesn't terminate
uv_ref((uv_handle_t *)&completionAsync);
uv_ref((uv_handle_t *)&loopAsync);
workQueue.push(Work(workCallback, completionCallback, data));
workInProgressCount++;
uv_mutex_unlock(&workMutex);
uv_sem_post(&workSemaphore);
}

void ThreadPool::ExecuteReverseCallback(Callback reverseCallback, void *data) {
void ThreadPool::QueueLoopCallback(Callback callback, void *data, bool isWork) {
// push the callback into the queue
uv_mutex_lock(&reverseMutex);
ReverseCall reverseCall(reverseCallback, data);
bool queueWasEmpty = reverseQueue.empty();
reverseQueue.push(reverseCall);
// we only trigger RunReverseCallbacks via the reverseAsync handle if the queue
// was empty. Otherwise, we depend on RunReverseCallbacks to re-trigger itself
uv_mutex_lock(&loopMutex);
LoopCallback loopCallback(callback, data, isWork);
bool queueWasEmpty = loopQueue.empty();
loopQueue.push(loopCallback);
// we only trigger RunLoopCallbacks via the loopAsync handle if the queue
// was empty. Otherwise, we depend on RunLoopCallbacks to re-trigger itself
if (queueWasEmpty) {
uv_async_send(&reverseAsync);
uv_async_send(&loopAsync);
}
uv_mutex_unlock(&reverseMutex);
uv_mutex_unlock(&loopMutex);
}

void ThreadPool::ExecuteReverseCallback(Callback reverseCallback, void *data) {
QueueLoopCallback(reverseCallback, data, false);
}

void ThreadPool::RunEventQueue(void *threadPool) {
Expand All @@ -64,63 +63,40 @@ void ThreadPool::RunEventQueue() {
// perform the queued work
(*work.workCallback)(work.data);

// schedule the callback on the loop
uv_mutex_lock(&completionMutex);
completionQueue.push(work);
uv_mutex_unlock(&completionMutex);
uv_async_send(&completionAsync);
// schedule the completion callback on the loop
QueueLoopCallback(work.completionCallback, work.data, true);
}
}

void ThreadPool::RunCompletionCallbacks(uv_async_t* handle) {
static_cast<ThreadPool *>(handle->data)->RunCompletionCallbacks();
void ThreadPool::RunLoopCallbacks(uv_async_t* handle) {
static_cast<ThreadPool *>(handle->data)->RunLoopCallbacks();
}

void ThreadPool::RunCompletionCallbacks() {
// uv_async_send can coalesce calls, so we are not guaranteed one
// RunCompletionCallbacks per uv_async_send call
// so we always process the entire completionQueue
int callbacksCompleted = 0;
uv_mutex_lock(&completionMutex);
while(!completionQueue.empty()) {
Work work = completionQueue.front();
completionQueue.pop();
uv_mutex_unlock(&completionMutex);
// perform the queued loop callback
(*work.completionCallback)(work.data);
callbacksCompleted++;
uv_mutex_lock(&completionMutex);
void ThreadPool::RunLoopCallbacks() {
// get the next callback to run
uv_mutex_lock(&loopMutex);
LoopCallback loopCallback = loopQueue.front();
uv_mutex_unlock(&loopMutex);

// perform the queued loop callback
(*loopCallback.callback)(loopCallback.data);

// pop the queue, and if necessary, re-trigger RunLoopCallbacks
uv_mutex_lock(&loopMutex);
loopQueue.pop();
if (!loopQueue.empty()) {
uv_async_send(&loopAsync);
}
uv_mutex_unlock(&completionMutex);
uv_mutex_unlock(&loopMutex);

uv_mutex_lock(&workMutex);
// if there is no ongoing work / completion processing, node doesn't need
// to be prevented from terminating
workInProgressCount -= callbacksCompleted;
if(!workInProgressCount) {
uv_unref((uv_handle_t *)&completionAsync);
}
uv_mutex_unlock(&workMutex);
}

void ThreadPool::RunReverseCallbacks(uv_async_t* handle) {
static_cast<ThreadPool *>(handle->data)->RunReverseCallbacks();
}

void ThreadPool::RunReverseCallbacks() {
// get the next callback to run
uv_mutex_lock(&reverseMutex);
ReverseCall reverseCall = reverseQueue.front();
uv_mutex_unlock(&reverseMutex);

// execute callback
(*reverseCall.reverseCallback)(reverseCall.data);

// pop the queue, and if necessary, re-trigger RunReverseCallbacks
uv_mutex_lock(&reverseMutex);
reverseQueue.pop();
if (!reverseQueue.empty()) {
uv_async_send(&reverseAsync);
if (loopCallback.isWork) {
uv_mutex_lock(&workMutex);
workInProgressCount --;
if(!workInProgressCount) {
uv_unref((uv_handle_t *)&loopAsync);
}
uv_mutex_unlock(&workMutex);
}
uv_mutex_unlock(&reverseMutex);
}
31 changes: 24 additions & 7 deletions generate/templates/partials/field_accessors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
{% elsif field.isCallbackFunction %}
Nan::Callback *callback = NULL;
int throttle = {%if field.return.throttle %}{{ field.return.throttle }}{%else%}0{%endif%};
bool waitForResult = true;

if (value->IsFunction()) {
callback = new Nan::Callback(value.As<Function>());
Expand All @@ -59,13 +60,20 @@
v8::Local<Value> objectCallback = maybeObjectCallback.ToLocalChecked();
if (objectCallback->IsFunction()) {
callback = new Nan::Callback(objectCallback.As<Function>());

Nan::MaybeLocal<Value> maybeObjectThrottle = Nan::Get(object, Nan::New("throttle").ToLocalChecked());
if(!maybeObjectThrottle.IsEmpty()) {
v8::Local<Value> objectThrottle = maybeObjectThrottle.ToLocalChecked();
if (objectThrottle->IsNumber()) {
throttle = (int)objectThrottle.As<Number>()->Value();
}
}

Nan::MaybeLocal<Value> maybeObjectWaitForResult = Nan::Get(object, Nan::New("waitForResult").ToLocalChecked());
if(!maybeObjectWaitForResult.IsEmpty()) {
Local<Value> objectWaitForResult = maybeObjectWaitForResult.ToLocalChecked();
waitForResult = (bool)objectWaitForResult->BooleanValue();
}
}
}
}
Expand All @@ -74,7 +82,7 @@
wrapper->raw->{{ field.name }} = ({{ field.cType }}){{ field.name }}_cppCallback;
}

wrapper->{{ field.name }}.SetCallback(callback, throttle);
wrapper->{{ field.name }}.SetCallback(callback, throttle, waitForResult);
}

{% elsif field.payloadFor %}
Expand Down Expand Up @@ -111,19 +119,28 @@
{{ arg.cType }} {{ arg.name}}{% if not arg.lastArg %},{% endif %}
{% endeach %}
) {
{{ field.name|titleCase }}Baton baton({{ field.return.noResults }});
{{ field.name|titleCase }}Baton *baton =
new {{ field.name|titleCase }}Baton({{ field.return.noResults }});

{% each field.args|argsInfo as arg %}
baton.{{ arg.name }} = {{ arg.name }};
baton->{{ arg.name }} = {{ arg.name }};
{% endeach %}

{{ cppClassName }}* instance = {{ field.name }}_getInstanceFromBaton(&baton);
{{ cppClassName }}* instance = {{ field.name }}_getInstanceFromBaton(baton);

{{ field.return.type }} result;

if (instance->{{ field.name }}.WillBeThrottled()) {
return baton.defaultResult;
result = baton->defaultResult;
delete baton;
} else if (instance->{{ field.name }}.ShouldWaitForResult()) {
result = baton->ExecuteAsync({{ field.name }}_async);
delete baton;
} else {
result = baton->defaultResult;
baton->ExecuteAsync({{ field.name }}_async, deleteBaton);
}

return baton.ExecuteAsync({{ field.name }}_async);
return result;
}

void {{ cppClassName }}::{{ field.name }}_async(void *untypedBaton) {
Expand Down
1 change: 1 addition & 0 deletions generate/templates/templates/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
},

"sources": [
"src/async_baton.cc",
"src/lock_master.cc",
"src/nodegit.cc",
"src/init_ssh2.cc",
Expand Down
Loading

0 comments on commit d329e2b

Please sign in to comment.