Skip to content

Commit

Permalink
Migrate to napi threadsafe functions (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
devongovett committed Jul 30, 2023
1 parent c0101fa commit 9b7c657
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 196 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@parcel/watcher",
"version": "2.2.0",
"version": "2.2.1-alpha.0",
"main": "index.js",
"types": "index.d.ts",
"repository": {
Expand Down
181 changes: 91 additions & 90 deletions src/Watcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ void removeShared(Watcher *watcher) {
Watcher::Watcher(std::string dir, std::unordered_set<std::string> ignorePaths, std::unordered_set<Glob> ignoreGlobs)
: mDir(dir),
mIgnorePaths(ignorePaths),
mIgnoreGlobs(ignoreGlobs),
mWatched(false),
mAsync(NULL),
mCallingCallbacks(false) {
mIgnoreGlobs(ignoreGlobs) {
mDebounce = Debounce::getShared();
mDebounce->add(this, [this] () {
triggerCallbacks();
Expand All @@ -68,112 +65,109 @@ void Watcher::notify() {
}
}

void Watcher::notifyError(std::exception &err) {
std::unique_lock<std::mutex> lk(mMutex);
if (mCallingCallbacks) {
mCallbackSignal.wait();
mCallbackSignal.reset();
}

mError = err.what();
triggerCallbacks();
}

void Watcher::triggerCallbacks() {
std::lock_guard<std::mutex> l(mCallbackEventsMutex);
if (mCallbacks.size() > 0 && (mEvents.size() > 0 || mError.size() > 0)) {
if (mCallingCallbacks) {
mCallbackSignal.wait();
mCallbackSignal.reset();
}

mCallbackEvents = mEvents.getEvents();
mEvents.clear();

uv_async_send(mAsync);
}
}
struct CallbackData {
std::string error;
std::vector<Event> events;
CallbackData(std::string error, std::vector<Event> events) : error(error), events(events) {}
};

Value Watcher::callbackEventsToJS(const Env& env) {
std::lock_guard<std::mutex> l(mCallbackEventsMutex);
Value callbackEventsToJS(const Env &env, std::vector<Event> &events) {
EscapableHandleScope scope(env);
Array arr = Array::New(env, mCallbackEvents.size());
Array arr = Array::New(env, events.size());
size_t currentEventIndex = 0;
for (auto eventIterator = mCallbackEvents.begin(); eventIterator != mCallbackEvents.end(); eventIterator++) {
for (auto eventIterator = events.begin(); eventIterator != events.end(); eventIterator++) {
arr.Set(currentEventIndex++, eventIterator->toJS(env));
}
return scope.Escape(arr);
}

// TODO: Doesn't this need some kind of locking?
void Watcher::clearCallbacks() {
mCallbacks.clear();
void callJSFunction(Napi::Env env, Function jsCallback, CallbackData *data) {
HandleScope scope(env);
auto err = data->error.size() > 0 ? Error::New(env, data->error).Value() : env.Null();
auto events = callbackEventsToJS(env, data->events);
jsCallback.Call({err, events});
delete data;

// Throw errors from the callback as fatal exceptions
// If we don't handle these node segfaults...
if (env.IsExceptionPending()) {
Napi::Error err = env.GetAndClearPendingException();
napi_fatal_exception(env, err.Value());
}
}

void Watcher::fireCallbacks(uv_async_t *handle) {
Watcher *watcher = (Watcher *)handle->data;
watcher->mCallingCallbacks = true;
void Watcher::notifyError(std::exception &err) {
std::unique_lock<std::mutex> lk(mMutex);
for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
CallbackData *data = new CallbackData(err.what(), {});
it->tsfn.BlockingCall(data, callJSFunction);
}

watcher->mCallbacksIterator = watcher->mCallbacks.begin();
while (watcher->mCallbacksIterator != watcher->mCallbacks.end()) {
auto it = watcher->mCallbacksIterator;
HandleScope scope(it->Env());
auto err = watcher->mError.size() > 0 ? Error::New(it->Env(), watcher->mError).Value() : it->Env().Null();
auto events = watcher->callbackEventsToJS(it->Env());
clearCallbacks();
}

it->MakeCallback(it->Env().Global(), std::initializer_list<napi_value>{err, events});
// Throw errors from the callback as fatal exceptions
// If we don't handle these node segfaults...
if (it->Env().IsExceptionPending()) {
Napi::Error err = it->Env().GetAndClearPendingException();
napi_fatal_exception(it->Env(), err.Value());
}
// This function is called from the debounce thread.
void Watcher::triggerCallbacks() {
std::unique_lock<std::mutex> lk(mMutex);
if (mCallbacks.size() > 0 && mEvents.size() > 0) {
auto events = mEvents.getEvents();
mEvents.clear();

// If the iterator was changed, then the callback trigged an unwatch.
// The iterator will have been set to the next valid callback.
// If it is the same as before, increment it.
if (watcher->mCallbacksIterator == it) {
watcher->mCallbacksIterator++;
for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
it->tsfn.BlockingCall(new CallbackData("", events), callJSFunction);
}
}
}

watcher->mCallingCallbacks = false;
// This should be called from the JavaScript thread.
bool Watcher::watch(Function callback) {
std::unique_lock<std::mutex> lk(mMutex);

if (watcher->mError.size() > 0) {
watcher->clearCallbacks();
auto it = findCallback(callback);
if (it != mCallbacks.end()) {
return false;
}

if (watcher->mCallbacks.size() == 0) {
watcher->unref();
} else {
watcher->mCallbackSignal.notify();
}
auto tsfn = ThreadSafeFunction::New(
callback.Env(),
callback,
"Watcher callback",
0, // Unlimited queue
1 // Initial thread count
);

mCallbacks.push_back(Callback {
tsfn,
Napi::Persistent(callback),
std::this_thread::get_id()
});

return true;
}

bool Watcher::watch(FunctionReference callback) {
std::unique_lock<std::mutex> lk(mMutex);
auto res = mCallbacks.insert(std::move(callback));
if (res.second && !mWatched) {
mAsync = new uv_async_t;
mAsync->data = (void *)this;
uv_async_init(uv_default_loop(), mAsync, Watcher::fireCallbacks);
mWatched = true;
return true;
// This should be called from the JavaScript thread.
std::vector<Callback>::iterator Watcher::findCallback(Function callback) {
for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
// Only consider callbacks created by the same thread, or V8 will panic.
if (it->threadId == std::this_thread::get_id() && it->ref.Value() == callback) {
return it;
}
}

return false;
return mCallbacks.end();
}

// This should be called from the JavaScript thread.
bool Watcher::unwatch(Function callback) {
std::unique_lock<std::mutex> lk(mMutex);

bool removed = false;
for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
if (it->Value() == callback) {
mCallbacksIterator = mCallbacks.erase(it);
removed = true;
break;
}
auto it = findCallback(callback);
if (it != mCallbacks.end()) {
it->tsfn.Release();
it->ref.Unref();
mCallbacks.erase(it);
removed = true;
}

if (removed && mCallbacks.size() == 0) {
Expand All @@ -185,18 +179,25 @@ bool Watcher::unwatch(Function callback) {
}

void Watcher::unref() {
if (mCallbacks.size() == 0 && !mCallingCallbacks) {
if (mWatched) {
mWatched = false;
uv_close((uv_handle_t *)mAsync, Watcher::onClose);
}

if (mCallbacks.size() == 0) {
removeShared(this);
}
}

void Watcher::onClose(uv_handle_t *handle) {
delete (uv_async_t *)handle;
void Watcher::destroy() {
std::unique_lock<std::mutex> lk(mMutex);
clearCallbacks();
}

// Private because it doesn't lock.
void Watcher::clearCallbacks() {
for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
it->tsfn.Release();
it->ref.Unref();
}

mCallbacks.clear();
unref();
}

bool Watcher::isIgnored(std::string path) {
Expand All @@ -208,7 +209,7 @@ bool Watcher::isIgnored(std::string path) {
}

auto basePath = mDir + DIR_SEP;

if (path.rfind(basePath, 0) != 0) {
return false;
}
Expand Down
24 changes: 10 additions & 14 deletions src/Watcher.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <condition_variable>
#include <unordered_set>
#include <set>
#include <uv.h>
#include <node_api.h>
#include "Glob.hh"
#include "Event.hh"
Expand All @@ -14,13 +13,18 @@

using namespace Napi;

struct Callback {
Napi::ThreadSafeFunction tsfn;
Napi::FunctionReference ref;
std::thread::id threadId;
};

struct Watcher {
std::string mDir;
std::unordered_set<std::string> mIgnorePaths;
std::unordered_set<Glob> mIgnoreGlobs;
EventList mEvents;
void *state;
bool mWatched;

Watcher(std::string dir, std::unordered_set<std::string> ignorePaths, std::unordered_set<Glob> ignoreGlobs);
~Watcher();
Expand All @@ -32,31 +36,23 @@ struct Watcher {
void wait();
void notify();
void notifyError(std::exception &err);
bool watch(FunctionReference callback);
bool watch(Function callback);
bool unwatch(Function callback);
void unref();
bool isIgnored(std::string path);
void destroy();

static std::shared_ptr<Watcher> getShared(std::string dir, std::unordered_set<std::string> ignorePaths, std::unordered_set<Glob> ignoreGlobs);

private:
std::mutex mMutex;
std::mutex mCallbackEventsMutex;
std::condition_variable mCond;
uv_async_t *mAsync;
std::set<FunctionReference> mCallbacks;
std::set<FunctionReference>::iterator mCallbacksIterator;
bool mCallingCallbacks;
std::vector<Event> mCallbackEvents;
std::vector<Callback> mCallbacks;
std::shared_ptr<Debounce> mDebounce;
Signal mCallbackSignal;
std::string mError;

Value callbackEventsToJS(const Env& env);
std::vector<Callback>::iterator findCallback(Function callback);
void clearCallbacks();
void triggerCallbacks();
static void fireCallbacks(uv_async_t *handle);
static void onClose(uv_handle_t *handle);
};

class WatcherError : public std::runtime_error {
Expand Down
12 changes: 8 additions & 4 deletions src/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ std::unordered_set<std::string> getIgnorePaths(Env env, Value opts) {

std::unordered_set<Glob> getIgnoreGlobs(Env env, Value opts) {
std::unordered_set<Glob> result;

if (opts.IsObject()) {
Value v = opts.As<Object>().Get(String::New(env, "ignoreGlobs"));
if (v.IsArray()) {
Expand Down Expand Up @@ -165,7 +165,7 @@ class SubscribeRunner : public PromiseRunner {
);

backend = getBackend(env, opts);
callback = Persistent(fn.As<Function>());
watcher->watch(fn.As<Function>());
}

private:
Expand All @@ -174,8 +174,12 @@ class SubscribeRunner : public PromiseRunner {
FunctionReference callback;

void execute() override {
backend->watch(*watcher);
watcher->watch(std::move(callback));
try {
backend->watch(*watcher);
} catch (std::exception &err) {
watcher->destroy();
throw;
}
}
};

Expand Down
Loading

0 comments on commit 9b7c657

Please sign in to comment.