diff --git a/crates/napi/src/env.rs b/crates/napi/src/env.rs index b5ec8f7582..6f0587d8a0 100644 --- a/crates/napi/src/env.rs +++ b/crates/napi/src/env.rs @@ -1116,6 +1116,23 @@ impl Env { Ok(unsafe { JsObject::from_raw_unchecked(self.0, promise) }) } + #[cfg(all(feature = "tokio_rt", feature = "napi4"))] + pub fn spawn_future< + T: 'static + Send + ToNapiValue, + F: 'static + Send + Future>, + >( + &self, + fut: F, + ) -> Result { + use crate::tokio_runtime; + + let promise = tokio_runtime::execute_tokio_future(self.0, fut, |env, val| unsafe { + ToNapiValue::to_napi_value(env, val) + })?; + + Ok(unsafe { JsObject::from_raw_unchecked(self.0, promise) }) + } + /// Creates a deferred promise, which can be resolved or rejected from a background thread. #[cfg(feature = "napi4")] pub fn create_deferred Result>( diff --git a/crates/napi/src/threadsafe_function.rs b/crates/napi/src/threadsafe_function.rs index 4912d6e6c2..369b69a0b8 100644 --- a/crates/napi/src/threadsafe_function.rs +++ b/crates/napi/src/threadsafe_function.rs @@ -5,11 +5,11 @@ use std::ffi::CString; use std::marker::PhantomData; use std::os::raw::c_void; use std::ptr; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; -use crate::bindgen_runtime::ToNapiValue; -use crate::{check_status, sys, Env, Error, JsError, Result, Status}; +use crate::bindgen_runtime::{FromNapiValue, ToNapiValue}; +use crate::{check_status, sys, Env, JsError, JsUnknown, Result, Status}; /// ThreadSafeFunction Context object /// the `value` is the value passed to `call` method @@ -96,6 +96,46 @@ type_level_enum! { } } +struct ThreadsafeFunctionHandle { + raw: sys::napi_threadsafe_function, + aborted: RwLock, + referred: AtomicBool, +} + +unsafe impl Send for ThreadsafeFunctionHandle {} +unsafe impl Sync for ThreadsafeFunctionHandle {} + +impl Drop for ThreadsafeFunctionHandle { + fn drop(&mut self) { + let aborted_guard = self + .aborted + .read() + .expect("Threadsafe Function aborted lock failed"); + if !*aborted_guard && self.referred.load(Ordering::Acquire) { + let release_status = unsafe { + sys::napi_release_threadsafe_function(self.raw, sys::ThreadsafeFunctionReleaseMode::release) + }; + assert!( + release_status == sys::Status::napi_ok, + "Threadsafe Function release failed {}", + Status::from(release_status) + ); + } + } +} + +#[repr(u8)] +enum ThreadsafeFunctionCallVariant { + Direct, + WithCallback, +} + +struct ThreadsafeFunctionCallJsBackData { + data: T, + call_variant: ThreadsafeFunctionCallVariant, + callback: Box Result<()>>, +} + /// Communicate with the addon's main thread by invoking a JavaScript function from other threads. /// /// ## Example @@ -146,41 +186,28 @@ type_level_enum! { /// } /// ``` pub struct ThreadsafeFunction { - raw_tsfn: sys::napi_threadsafe_function, - aborted: Arc>, - ref_count: Arc, + handle: Arc, _phantom: PhantomData<(T, ES)>, } impl Clone for ThreadsafeFunction { fn clone(&self) -> Self { - let is_aborted = self.aborted.lock().unwrap(); - if !*is_aborted { - let acquire_status = unsafe { sys::napi_acquire_threadsafe_function(self.raw_tsfn) }; - debug_assert!( - acquire_status == sys::Status::napi_ok, - "Acquire threadsafe function failed in clone" - ); - } else { + let aborted_guard = self + .handle + .aborted + .read() + .expect("Threadsafe Function aborted lock failed"); + if *aborted_guard { panic!("ThreadsafeFunction was aborted, can not clone it"); } - self.ref_count.fetch_add(1, Ordering::AcqRel); - - drop(is_aborted); - Self { - raw_tsfn: self.raw_tsfn, - aborted: Arc::clone(&self.aborted), - ref_count: Arc::clone(&self.ref_count), + handle: self.handle.clone(), _phantom: PhantomData, } } } -unsafe impl Send for ThreadsafeFunction {} -unsafe impl Sync for ThreadsafeFunction {} - impl ThreadsafeFunction { /// See [napi_create_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_create_threadsafe_function) /// for more information. @@ -201,11 +228,8 @@ impl ThreadsafeFunction { sys::napi_create_string_utf8(env, s.as_ptr(), len, &mut async_resource_name) })?; - let initial_thread_count = 1usize; let mut raw_tsfn = ptr::null_mut(); - let ptr = Box::into_raw(Box::new(callback)) as *mut c_void; - let aborted = Arc::new(Mutex::new(false)); - let aborted_ptr = Arc::into_raw(aborted.clone()); + let callback_ptr = Box::into_raw(Box::new(callback)); check_status!(unsafe { sys::napi_create_threadsafe_function( env, @@ -213,19 +237,23 @@ impl ThreadsafeFunction { ptr::null_mut(), async_resource_name, max_queue_size, - initial_thread_count, - aborted_ptr as *mut c_void, + 1, + ptr::null_mut(), Some(thread_finalize_cb::), - ptr, + callback_ptr.cast(), Some(call_js_cb::), &mut raw_tsfn, ) })?; + check_status!(unsafe { sys::napi_acquire_threadsafe_function(raw_tsfn) })?; + Ok(ThreadsafeFunction { - raw_tsfn, - aborted, - ref_count: Arc::new(AtomicUsize::new(initial_thread_count)), + handle: Arc::new(ThreadsafeFunctionHandle { + raw: raw_tsfn, + aborted: RwLock::new(false), + referred: AtomicBool::new(true), + }), _phantom: PhantomData, }) } @@ -235,54 +263,63 @@ impl ThreadsafeFunction { /// /// "ref" is a keyword so that we use "refer" here. pub fn refer(&mut self, env: &Env) -> Result<()> { - let is_aborted = self.aborted.lock().unwrap(); - if *is_aborted { - return Err(Error::new( - Status::Closing, - "Can not ref, Thread safe function already aborted".to_string(), - )); + let aborted_guard = self + .handle + .aborted + .read() + .expect("Threadsafe Function aborted lock failed"); + if !*aborted_guard && !self.handle.referred.load(Ordering::Acquire) { + check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.handle.raw) })?; + self.handle.referred.store(true, Ordering::Release); } - drop(is_aborted); - self.ref_count.fetch_add(1, Ordering::AcqRel); - check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.raw_tsfn) }) + Ok(()) } /// See [napi_unref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_unref_threadsafe_function) /// for more information. pub fn unref(&mut self, env: &Env) -> Result<()> { - let is_aborted = self.aborted.lock().unwrap(); - if *is_aborted { - return Err(Error::new( - Status::Closing, - "Can not unref, Thread safe function already aborted".to_string(), - )); + let aborted_guard = self + .handle + .aborted + .read() + .expect("Threadsafe Function aborted lock failed"); + if !*aborted_guard && self.handle.referred.load(Ordering::Acquire) { + check_status!(unsafe { sys::napi_unref_threadsafe_function(env.0, self.handle.raw) })?; + self.handle.referred.store(false, Ordering::Release); } - self.ref_count.fetch_sub(1, Ordering::AcqRel); - check_status!(unsafe { sys::napi_unref_threadsafe_function(env.0, self.raw_tsfn) }) + Ok(()) } pub fn aborted(&self) -> bool { - let is_aborted = self.aborted.lock().unwrap(); - *is_aborted + let aborted_guard = self + .handle + .aborted + .read() + .expect("Threadsafe Function aborted lock failed"); + *aborted_guard } pub fn abort(self) -> Result<()> { - let mut is_aborted = self.aborted.lock().unwrap(); - if !*is_aborted { + let mut aborted_guard = self + .handle + .aborted + .write() + .expect("Threadsafe Function aborted lock failed"); + if !*aborted_guard { check_status!(unsafe { sys::napi_release_threadsafe_function( - self.raw_tsfn, + self.handle.raw, sys::ThreadsafeFunctionReleaseMode::abort, ) })?; + *aborted_guard = true; } - *is_aborted = true; Ok(()) } /// Get the raw `ThreadSafeFunction` pointer pub fn raw(&self) -> sys::napi_threadsafe_function { - self.raw_tsfn + self.handle.raw } } @@ -290,65 +327,149 @@ impl ThreadsafeFunction { /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function) /// for more information. pub fn call(&self, value: Result, mode: ThreadsafeFunctionCallMode) -> Status { - let is_aborted = self.aborted.lock().unwrap(); - if *is_aborted { - return Status::Closing; + unsafe { + sys::napi_call_threadsafe_function( + self.handle.raw, + Box::into_raw(Box::new(value.map(|data| { + ThreadsafeFunctionCallJsBackData { + data, + call_variant: ThreadsafeFunctionCallVariant::Direct, + callback: Box::new(|_d: JsUnknown| Ok(())), + } + }))) + .cast(), + mode.into(), + ) } + .into() + } + + pub fn call_with_return_value Result<()>>( + &self, + value: Result, + mode: ThreadsafeFunctionCallMode, + cb: F, + ) -> Status { unsafe { sys::napi_call_threadsafe_function( - self.raw_tsfn, - Box::into_raw(Box::new(value)) as *mut c_void, + self.handle.raw, + Box::into_raw(Box::new(value.map(|data| { + ThreadsafeFunctionCallJsBackData { + data, + call_variant: ThreadsafeFunctionCallVariant::WithCallback, + callback: Box::new(move |d: JsUnknown| { + D::from_napi_value(d.0.env, d.0.value).and_then(cb) + }), + } + }))) + .cast(), mode.into(), ) } .into() } + + #[cfg(feature = "tokio_rt")] + pub async fn call_async(&self, value: Result) -> Result { + let (sender, receiver) = tokio::sync::oneshot::channel::(); + check_status!(unsafe { + sys::napi_call_threadsafe_function( + self.handle.raw, + Box::into_raw(Box::new(value.map(|data| { + ThreadsafeFunctionCallJsBackData { + data, + call_variant: ThreadsafeFunctionCallVariant::WithCallback, + callback: Box::new(move |d: JsUnknown| { + D::from_napi_value(d.0.env, d.0.value).and_then(move |d| { + sender.send(d).map_err(|_| { + crate::Error::from_reason("Failed to send return value to tokio sender") + }) + }) + }), + } + }))) + .cast(), + ThreadsafeFunctionCallMode::NonBlocking.into(), + ) + })?; + receiver + .await + .map_err(|err| crate::Error::new(Status::GenericFailure, format!("{}", err))) + } } impl ThreadsafeFunction { /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function) /// for more information. pub fn call(&self, value: T, mode: ThreadsafeFunctionCallMode) -> Status { - let is_aborted = self.aborted.lock().unwrap(); - if *is_aborted { - return Status::Closing; - } unsafe { sys::napi_call_threadsafe_function( - self.raw_tsfn, - Box::into_raw(Box::new(value)) as *mut c_void, + self.handle.raw, + Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData { + data: value, + call_variant: ThreadsafeFunctionCallVariant::Direct, + callback: Box::new(|_d: JsUnknown| Ok(())), + })) + .cast(), mode.into(), ) } .into() } -} -impl Drop for ThreadsafeFunction { - fn drop(&mut self) { - let mut is_aborted = self.aborted.lock().unwrap(); - if !*is_aborted && self.ref_count.load(Ordering::Acquire) <= 1 { - let release_status = unsafe { - sys::napi_release_threadsafe_function( - self.raw_tsfn, - sys::ThreadsafeFunctionReleaseMode::release, - ) - }; - assert!( - release_status == sys::Status::napi_ok, - "Threadsafe Function release failed {:?}", - Status::from(release_status) - ); - *is_aborted = true; - } else { - self.ref_count.fetch_sub(1, Ordering::Release); + pub fn call_with_return_value Result<()>>( + &self, + value: T, + mode: ThreadsafeFunctionCallMode, + cb: F, + ) -> Status { + unsafe { + sys::napi_call_threadsafe_function( + self.handle.raw, + Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData { + data: value, + call_variant: ThreadsafeFunctionCallVariant::WithCallback, + callback: Box::new(move |d: JsUnknown| { + D::from_napi_value(d.0.env, d.0.value).and_then(cb) + }), + })) + .cast(), + mode.into(), + ) } - drop(is_aborted); + .into() + } + + #[cfg(feature = "tokio_rt")] + pub async fn call_async(&self, value: T) -> Result { + let (sender, receiver) = tokio::sync::oneshot::channel::(); + check_status!(unsafe { + sys::napi_call_threadsafe_function( + self.handle.raw, + Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData { + data: value, + call_variant: ThreadsafeFunctionCallVariant::WithCallback, + callback: Box::new(move |d: JsUnknown| { + D::from_napi_value(d.0.env, d.0.value).and_then(move |d| { + sender.send(d).map_err(|_| { + crate::Error::from_reason("Failed to send return value to tokio sender") + }) + }) + }), + })) + .cast(), + ThreadsafeFunctionCallMode::NonBlocking.into(), + ) + })?; + receiver + .await + .map_err(|err| crate::Error::new(Status::GenericFailure, format!("{}", err))) } } +#[allow(unused_variables)] unsafe extern "C" fn thread_finalize_cb( - _raw_env: sys::napi_env, + env: sys::napi_env, finalize_data: *mut c_void, finalize_hint: *mut c_void, ) where @@ -356,9 +477,6 @@ unsafe extern "C" fn thread_finalize_cb( { // cleanup drop(unsafe { Box::::from_raw(finalize_hint.cast()) }); - let aborted = unsafe { Arc::>::from_raw(finalize_data.cast()) }; - let mut is_aborted = aborted.lock().unwrap(); - *is_aborted = true; } unsafe extern "C" fn call_js_cb( @@ -375,11 +493,15 @@ unsafe extern "C" fn call_js_cb( return; } - let ctx: &mut R = unsafe { &mut *context.cast::() }; - let val: Result = unsafe { + let ctx: &mut R = unsafe { Box::leak(Box::from_raw(context.cast())) }; + let val = unsafe { match ES::VALUE { - ErrorStrategy::CalleeHandled::VALUE => *Box::>::from_raw(data.cast()), - ErrorStrategy::Fatal::VALUE => Ok(*Box::::from_raw(data.cast())), + ErrorStrategy::CalleeHandled::VALUE => { + *Box::>>::from_raw(data.cast()) + } + ErrorStrategy::Fatal::VALUE => Ok(*Box::>::from_raw( + data.cast(), + )), } }; @@ -389,15 +511,16 @@ unsafe extern "C" fn call_js_cb( let ret = val.and_then(|v| { (ctx)(ThreadSafeCallContext { env: unsafe { Env::from_raw(raw_env) }, - value: v, + value: v.data, }) + .map(|ret| (ret, v.call_variant, v.callback)) }); // Follow async callback conventions: https://nodejs.org/en/knowledge/errors/what-are-the-error-conventions/ // Check if the Result is okay, if so, pass a null as the first (error) argument automatically. // If the Result is an error, pass that as the first argument. let status = match ret { - Ok(values) => { + Ok((values, call_variant, callback)) => { let values = values .into_iter() .map(|v| unsafe { ToNapiValue::to_napi_value(raw_env, v) }); @@ -408,7 +531,8 @@ unsafe extern "C" fn call_js_cb( } else { values.collect() }; - match args { + let mut return_value = ptr::null_mut(); + let status = match args { Ok(args) => unsafe { sys::napi_call_function( raw_env, @@ -416,7 +540,7 @@ unsafe extern "C" fn call_js_cb( js_callback, args.len(), args.as_ptr(), - ptr::null_mut(), + &mut return_value, ) }, Err(e) => match ES::VALUE { @@ -430,11 +554,21 @@ unsafe extern "C" fn call_js_cb( js_callback, 1, [JsError::from(e).into_value(raw_env)].as_mut_ptr(), - ptr::null_mut(), + &mut return_value, ) }, }, + }; + if let ThreadsafeFunctionCallVariant::WithCallback = call_variant { + if let Err(err) = callback(JsUnknown(crate::Value { + env: raw_env, + value: return_value, + value_type: crate::ValueType::Unknown, + })) { + unsafe { sys::napi_throw(raw_env, JsError::from(err).into_value(raw_env)) }; + } } + status } Err(e) if ES::VALUE == ErrorStrategy::Fatal::VALUE => unsafe { sys::napi_fatal_exception(raw_env, JsError::from(e).into_value(raw_env)) diff --git a/examples/napi-compat-mode/__test__/napi4/tsfn-dua-instance.js b/examples/napi-compat-mode/__test__/napi4/tsfn-dua-instance.js index 8950b0199b..abd7283fe2 100644 --- a/examples/napi-compat-mode/__test__/napi4/tsfn-dua-instance.js +++ b/examples/napi-compat-mode/__test__/napi4/tsfn-dua-instance.js @@ -2,8 +2,16 @@ const bindings = require('../../index.node') async function main() { await Promise.resolve() - new bindings.A((s) => console.info(s)) - new bindings.A((s) => console.info(s)) + const a1 = new bindings.A((err, s) => { + console.info(s) + a1.unref() + }) + const a2 = new bindings.A((err, s) => { + console.info(s) + a2.unref() + }) + a1.call() + a2.call() } main().catch((e) => { diff --git a/examples/napi-compat-mode/src/napi4/mod.rs b/examples/napi-compat-mode/src/napi4/mod.rs index 844ef940b5..27b375d170 100644 --- a/examples/napi-compat-mode/src/napi4/mod.rs +++ b/examples/napi-compat-mode/src/napi4/mod.rs @@ -1,11 +1,11 @@ -use napi::{Env, JsObject, Result}; +use napi::{Env, JsObject, Property, Result}; mod deferred; mod tsfn; mod tsfn_dua_instance; use tsfn::*; -use tsfn_dua_instance::constructor; +use tsfn_dua_instance::*; pub fn register_js(exports: &mut JsObject, env: &Env) -> Result<()> { exports.create_named_method("testThreadsafeFunction", test_threadsafe_function)?; @@ -26,7 +26,14 @@ pub fn register_js(exports: &mut JsObject, env: &Env) -> Result<()> { exports.create_named_method("testTsfnWithRef", test_tsfn_with_ref)?; exports.create_named_method("testDeferred", deferred::test_deferred)?; - let obj = env.define_class("A", constructor, &[])?; + let obj = env.define_class( + "A", + constructor, + &[ + Property::new("call")?.with_method(call), + Property::new("unref")?.with_method(unref), + ], + )?; exports.set_named_property("A", obj)?; Ok(()) diff --git a/examples/napi-compat-mode/src/napi4/tsfn.rs b/examples/napi-compat-mode/src/napi4/tsfn.rs index dd464e3803..5b8455612a 100644 --- a/examples/napi-compat-mode/src/napi4/tsfn.rs +++ b/examples/napi-compat-mode/src/napi4/tsfn.rs @@ -95,7 +95,7 @@ pub fn test_call_aborted_threadsafe_function(ctx: CallContext) -> Result napi::Result { let callback = ctx.get::(0)?; - let mut cb = + let cb = ctx .env .create_threadsafe_function(&callback, 0, |ctx: ThreadSafeCallContext| { @@ -23,11 +23,28 @@ pub fn constructor(ctx: CallContext) -> napi::Result { .map(|js_string| vec![js_string]) })?; - cb.unref(ctx.env)?; - let mut this: JsObject = ctx.this_unchecked(); let obj = A { cb }; ctx.env.wrap(&mut this, obj)?; ctx.env.get_undefined() } + +#[js_function] +pub fn call(ctx: CallContext) -> napi::Result { + let this = ctx.this_unchecked(); + let obj = ctx.env.unwrap::(&this)?; + obj.cb.call( + Ok("ThreadsafeFunction NonBlocking Call".to_owned()), + napi::threadsafe_function::ThreadsafeFunctionCallMode::NonBlocking, + ); + ctx.env.get_undefined() +} + +#[js_function] +pub fn unref(ctx: CallContext) -> napi::Result { + let this = ctx.this_unchecked(); + let obj = ctx.env.unwrap::(&this)?; + obj.cb.unref(ctx.env)?; + ctx.env.get_undefined() +} diff --git a/examples/napi/__test__/typegen.spec.ts.md b/examples/napi/__test__/typegen.spec.ts.md index 1e758c2ba7..2c99d8f393 100644 --- a/examples/napi/__test__/typegen.spec.ts.md +++ b/examples/napi/__test__/typegen.spec.ts.md @@ -195,6 +195,8 @@ Generated by [AVA](https://avajs.dev). export function threadsafeFunctionFatalMode(cb: (...args: any[]) => any): void␊ export function threadsafeFunctionFatalModeError(cb: (...args: any[]) => any): void␊ export function threadsafeFunctionClosureCapture(func: (...args: any[]) => any): void␊ + export function tsfnCallWithCallback(func: (...args: any[]) => any): void␊ + export function tsfnAsyncCall(func: (...args: any[]) => any): Promise␊ export function getBuffer(): Buffer␊ export function appendBuffer(buf: Buffer): Buffer␊ export function getEmptyBuffer(): Buffer␊ diff --git a/examples/napi/__test__/typegen.spec.ts.snap b/examples/napi/__test__/typegen.spec.ts.snap index d6b3df0b07..031a546009 100644 Binary files a/examples/napi/__test__/typegen.spec.ts.snap and b/examples/napi/__test__/typegen.spec.ts.snap differ diff --git a/examples/napi/__test__/values.spec.ts b/examples/napi/__test__/values.spec.ts index af98615a87..b6f38072fd 100644 --- a/examples/napi/__test__/values.spec.ts +++ b/examples/napi/__test__/values.spec.ts @@ -51,6 +51,8 @@ import { callThreadsafeFunction, threadsafeFunctionThrowError, threadsafeFunctionClosureCapture, + tsfnCallWithCallback, + tsfnAsyncCall, asyncPlus100, getGlobal, getUndefined, @@ -759,6 +761,30 @@ Napi4Test('Promise should reject raw error in rust', async (t) => { t.is(err, fxError) }) +Napi4Test('call ThreadsafeFunction with callback', async (t) => { + await t.notThrowsAsync( + () => + new Promise((resolve) => { + tsfnCallWithCallback(() => { + resolve() + return 'ReturnFromJavaScriptRawCallback' + }) + }), + ) +}) + +Napi4Test('async call ThreadsafeFunction', async (t) => { + await t.notThrowsAsync(() => + tsfnAsyncCall((err, arg1, arg2, arg3) => { + t.is(err, null) + t.is(arg1, 0) + t.is(arg2, 1) + t.is(arg3, 2) + return 'ReturnFromJavaScriptRawCallback' + }), + ) +}) + const Napi5Test = Number(process.versions.napi) >= 5 ? test : test.skip Napi5Test('Date test', (t) => { diff --git a/examples/napi/index.d.ts b/examples/napi/index.d.ts index 9cadb8a71f..a9c5ca4a46 100644 --- a/examples/napi/index.d.ts +++ b/examples/napi/index.d.ts @@ -185,6 +185,8 @@ export function threadsafeFunctionThrowError(cb: (...args: any[]) => any): void export function threadsafeFunctionFatalMode(cb: (...args: any[]) => any): void export function threadsafeFunctionFatalModeError(cb: (...args: any[]) => any): void export function threadsafeFunctionClosureCapture(func: (...args: any[]) => any): void +export function tsfnCallWithCallback(func: (...args: any[]) => any): void +export function tsfnAsyncCall(func: (...args: any[]) => any): Promise export function getBuffer(): Buffer export function appendBuffer(buf: Buffer): Buffer export function getEmptyBuffer(): Buffer diff --git a/examples/napi/src/threadsafe_function.rs b/examples/napi/src/threadsafe_function.rs index 089fa85862..546e870ecb 100644 --- a/examples/napi/src/threadsafe_function.rs +++ b/examples/napi/src/threadsafe_function.rs @@ -76,3 +76,31 @@ fn threadsafe_function_closure_capture(func: JsFunction) -> napi::Result<()> { Ok(()) } + +#[napi] +pub fn tsfn_call_with_callback(func: JsFunction) -> napi::Result<()> { + let tsfn: ThreadsafeFunction<()> = + func.create_threadsafe_function(0, move |_| Ok(Vec::::new()))?; + tsfn.call_with_return_value( + Ok(()), + ThreadsafeFunctionCallMode::NonBlocking, + |value: String| { + println!("{}", value); + assert_eq!(value, "ReturnFromJavaScriptRawCallback".to_owned()); + Ok(()) + }, + ); + Ok(()) +} + +#[napi(ts_return_type = "Promise")] +pub fn tsfn_async_call(env: Env, func: JsFunction) -> napi::Result { + let tsfn: ThreadsafeFunction<()> = + func.create_threadsafe_function(0, move |_| Ok(vec![0u32, 1u32, 2u32]))?; + + env.spawn_future(async move { + let msg: String = tsfn.call_async(Ok(())).await?; + assert_eq!(msg, "ReturnFromJavaScriptRawCallback".to_owned()); + Ok(()) + }) +}