Skip to content

Commit

Permalink
Implement ThreadSafeFunction class
Browse files Browse the repository at this point in the history
This PR is implementing ThreadSafeFunction class wraps
napi_threadsafe_function features.

FYI, the test files that included in this PR have come from Node.js
repo[1]. They've been rewritten based on C++ and node-addon-api.

Fixes nodejs#312.

[1] https://github.com/nodejs/node/tree/master/test/node-api/test_threadsafe_function
  • Loading branch information
romandev committed Mar 18, 2019
1 parent fcfc612 commit f04ba10
Show file tree
Hide file tree
Showing 7 changed files with 757 additions and 0 deletions.
269 changes: 269 additions & 0 deletions napi-inl.h
Expand Up @@ -3618,6 +3618,275 @@ inline void AsyncWorker::OnWorkComplete(
delete self;
}

////////////////////////////////////////////////////////////////////////////////
// ThreadSafeFunction class
////////////////////////////////////////////////////////////////////////////////

// static
template <typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, nullptr);
}

// static
template <typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount) {
return New(env, callback, resource, resourceName, maxQueueSize,
initialThreadCount, nullptr);
}

// static
template <typename DataType, typename Finalizer, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
DataType* data,
Finalizer finalizeCallback) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, data, finalizeCallback);
}

// static
template <typename Context, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
Context* context) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, context);
}

// static
template <typename DataType, typename Finalizer,
typename Context, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
DataType* data,
Finalizer finalizeCallback,
Context* context) {
return New(env, callback, Object(), resourceName, maxQueueSize,
initialThreadCount, data, finalizeCallback, context);
}

// static
template <typename DataType, typename Finalizer, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
DataType* data,
Finalizer finalizeCallback) {
static_assert(details::can_make_string<ResourceString>::value
|| std::is_convertible<ResourceString, napi_value>::value,
"Resource name should be string convertible type");

napi_threadsafe_function tsFunctionValue;
auto* finalizeData = new details::FinalizeData<DataType, Finalizer>({
finalizeCallback, nullptr });
napi_status status = napi_create_threadsafe_function(env, callback, resource,
Value::From(env, resourceName), maxQueueSize, initialThreadCount, data,
details::FinalizeData<DataType, Finalizer>::Wrapper,
finalizeData, CallJS, &tsFunctionValue);
if (status != napi_ok) {
delete finalizeData;
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());
}

return ThreadSafeFunction(env, tsFunctionValue);
}

// static
template <typename Context, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
Context* context) {
static_assert(details::can_make_string<ResourceString>::value
|| std::is_convertible<ResourceString, napi_value>::value,
"Resource name should be string convertible type");

napi_threadsafe_function tsFunctionValue;
napi_status status = napi_create_threadsafe_function(env, callback, resource,
Value::From(env, resourceName), maxQueueSize, initialThreadCount, nullptr,
nullptr, context, CallJS, &tsFunctionValue);
if (status != napi_ok) {
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());
}

return ThreadSafeFunction(env, tsFunctionValue);
}

// static
template <typename DataType, typename Finalizer,
typename Context, typename ResourceString>
inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
DataType* data,
Finalizer finalizeCallback,
Context* context) {
static_assert(details::can_make_string<ResourceString>::value
|| std::is_convertible<ResourceString, napi_value>::value,
"Resource name should be string convertible type");

napi_threadsafe_function tsFunctionValue;
auto* finalizeData = new details::FinalizeData<DataType, Finalizer>({
finalizeCallback, context });
napi_status status = napi_create_threadsafe_function(env, callback, resource,
Value::From(env, resourceName), maxQueueSize, initialThreadCount, data,
details::FinalizeData<DataType, Finalizer, Context>::WrapperWithHint,
finalizeData, CallJS, &tsFunctionValue);
if (status != napi_ok) {
delete finalizeData;
NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction());
}

return ThreadSafeFunction(env, tsFunctionValue);
}

inline ThreadSafeFunction::Status ThreadSafeFunction::BlockingCall() const {
return CallInternal(nullptr, napi_tsfn_blocking);
}

template <typename Callback>
inline ThreadSafeFunction::Status ThreadSafeFunction::BlockingCall(
Callback callback) const {
return CallInternal(new CallbackWrapper(callback), napi_tsfn_blocking);
}

template <typename DataType, typename Callback>
inline ThreadSafeFunction::Status ThreadSafeFunction::BlockingCall(
DataType* data, Callback callback) const {
auto wrapper = [data, callback](Env env, Function jsCallback) {
callback(env, jsCallback, data);
};
return CallInternal(new CallbackWrapper(wrapper), napi_tsfn_blocking);
}

inline ThreadSafeFunction::Status ThreadSafeFunction::NonBlockingCall() const {
return CallInternal(nullptr, napi_tsfn_nonblocking);
}

template <typename Callback>
inline ThreadSafeFunction::Status ThreadSafeFunction::NonBlockingCall(
Callback callback) const {
return CallInternal(new CallbackWrapper(callback), napi_tsfn_nonblocking);
}

template <typename DataType, typename Callback>
inline ThreadSafeFunction::Status ThreadSafeFunction::NonBlockingCall(
DataType* data, Callback callback) const {
auto wrapper = [data, callback](Env env, Function jsCallback) {
callback(env, jsCallback, data);
};
return CallInternal(new CallbackWrapper(wrapper), napi_tsfn_nonblocking);
}

inline bool ThreadSafeFunction::Acquire() const {
return !IsAborted() && napi_acquire_threadsafe_function(
_tsFunctionValue) == napi_ok;
}

inline bool ThreadSafeFunction::Release() {
return !IsAborted() && napi_release_threadsafe_function(
_tsFunctionValue, napi_tsfn_release) == napi_ok;
}

inline bool ThreadSafeFunction::Abort() {
if (IsAborted()) {
return false;
}

napi_status status = napi_release_threadsafe_function(
_tsFunctionValue, napi_tsfn_abort);

_tsFunctionValue = nullptr;
_env = nullptr;

return status == napi_ok;
}

inline bool ThreadSafeFunction::IsAborted() const {
return _env == nullptr || _tsFunctionValue == nullptr;
}

inline ThreadSafeFunction::ThreadSafeFunction()
: _env(nullptr),
_tsFunctionValue(nullptr) {
}

inline ThreadSafeFunction::ThreadSafeFunction(
napi_env env, napi_threadsafe_function tsFunctionValue)
: _env(env),
_tsFunctionValue(tsFunctionValue) {
}

inline ThreadSafeFunction::Status ThreadSafeFunction::CallInternal(
CallbackWrapper* callbackWrapper,
napi_threadsafe_function_call_mode mode) const {
if (IsAborted()) {
return CLOSE;
}
napi_status status = napi_call_threadsafe_function(
_tsFunctionValue, callbackWrapper, mode);
if (status != napi_ok && callbackWrapper != nullptr) {
delete callbackWrapper;
}

switch (status) {
case napi_ok:
return OK;
case napi_closing:
return CLOSE;
case napi_queue_full:
return FULL;
default:
return ERROR;
}
}

// static
inline void ThreadSafeFunction::CallJS(napi_env env,
napi_value jsCallback,
void* /* context */,
void* data) {
if (env == nullptr && jsCallback == nullptr)
return;

if (data != nullptr) {
auto* callbackWrapper = static_cast<CallbackWrapper*>(data);
(*callbackWrapper)(env, Function(env, jsCallback));
delete callbackWrapper;
} else {
Function(env, jsCallback).Call({});
}
}

////////////////////////////////////////////////////////////////////////////////
// Memory Management class
////////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit f04ba10

Please sign in to comment.