diff --git a/.changes/async-runtime-refactor.md b/.changes/async-runtime-refactor.md new file mode 100644 index 00000000000..4367fe552fa --- /dev/null +++ b/.changes/async-runtime-refactor.md @@ -0,0 +1,5 @@ +--- +"tauri": patch +--- + +**Breaking change:** Refactored the types returned from the `async_runtime` module. diff --git a/.changes/async-runtime-set.md b/.changes/async-runtime-set.md new file mode 100644 index 00000000000..7b00e0127e8 --- /dev/null +++ b/.changes/async-runtime-set.md @@ -0,0 +1,5 @@ +--- +"tauri": patch +--- + +Added `tauri::async_runtime::set` method, allowing to share your tokio runtime with Tauri. diff --git a/.changes/async-runtime-spawn-blocking.md b/.changes/async-runtime-spawn-blocking.md new file mode 100644 index 00000000000..b39939c1e7a --- /dev/null +++ b/.changes/async-runtime-spawn-blocking.md @@ -0,0 +1,5 @@ +--- +"tauri": patch +--- + +Added `tauri::async_runtime::spawn_blocking` API. diff --git a/.changes/fix-block-on-runtime.md b/.changes/fix-block-on-runtime.md new file mode 100644 index 00000000000..44ef3ba2d7a --- /dev/null +++ b/.changes/fix-block-on-runtime.md @@ -0,0 +1,5 @@ +--- +"tauri": patch +--- + +Avoid `async_runtime::block_on` panics when used along another tokio runtime. diff --git a/core/tauri/src/api/process/command.rs b/core/tauri/src/api/process/command.rs index 443bfc384cb..4b822491be0 100644 --- a/core/tauri/src/api/process/command.rs +++ b/core/tauri/src/api/process/command.rs @@ -312,7 +312,7 @@ impl Command { /// Stdin, stdout and stderr are ignored. pub fn status(self) -> crate::api::Result { let (mut rx, _child) = self.spawn()?; - let code = crate::async_runtime::block_on(async move { + let code = crate::async_runtime::safe_block_on(async move { let mut code = None; #[allow(clippy::collapsible_match)] while let Some(event) = rx.recv().await { @@ -330,7 +330,7 @@ impl Command { pub fn output(self) -> crate::api::Result { let (mut rx, _child) = self.spawn()?; - let output = crate::async_runtime::block_on(async move { + let output = crate::async_runtime::safe_block_on(async move { let mut code = None; let mut stdout = String::new(); let mut stderr = String::new(); diff --git a/core/tauri/src/app.rs b/core/tauri/src/app.rs index 52620d85812..2e0913b83ad 100644 --- a/core/tauri/src/app.rs +++ b/core/tauri/src/app.rs @@ -1057,9 +1057,7 @@ impl Builder { } }; let listener = listener.clone(); - crate::async_runtime::spawn(async move { - listener.lock().unwrap()(&app_handle, event); - }); + listener.lock().unwrap()(&app_handle, event); }); } } diff --git a/core/tauri/src/async_runtime.rs b/core/tauri/src/async_runtime.rs index 8d1148eea7e..351064ebb9f 100644 --- a/core/tauri/src/async_runtime.rs +++ b/core/tauri/src/async_runtime.rs @@ -12,9 +12,8 @@ use futures_lite::future::FutureExt; use once_cell::sync::OnceCell; -use tokio::runtime::Runtime; pub use tokio::{ - runtime::Handle, + runtime::{Handle as TokioHandle, Runtime as TokioRuntime}, sync::{ mpsc::{channel, Receiver, Sender}, Mutex, RwLock, @@ -23,74 +22,240 @@ pub use tokio::{ }; use std::{ - fmt, future::Future, pin::Pin, task::{Context, Poll}, }; -static RUNTIME: OnceCell = OnceCell::new(); +static RUNTIME: OnceCell = OnceCell::new(); + +struct GlobalRuntime { + runtime: Option, + handle: RuntimeHandle, +} + +impl GlobalRuntime { + fn handle(&self) -> RuntimeHandle { + if let Some(r) = &self.runtime { + r.handle() + } else { + self.handle.clone() + } + } + + fn spawn(&self, task: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + if let Some(r) = &self.runtime { + r.spawn(task) + } else { + self.handle.spawn(task) + } + } + + pub fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + if let Some(r) = &self.runtime { + r.spawn_blocking(func) + } else { + self.handle.spawn_blocking(func) + } + } + + fn block_on(&self, task: F) -> F::Output { + if let Some(r) = &self.runtime { + r.block_on(task) + } else { + self.handle.block_on(task) + } + } +} + +/// A runtime used to execute asynchronous tasks. +pub enum Runtime { + /// The tokio runtime. + Tokio(TokioRuntime), +} + +impl Runtime { + /// Gets a reference to the [`TokioRuntime`]. + pub fn inner(&self) -> &TokioRuntime { + let Self::Tokio(r) = self; + r + } + + /// Returns a handle of the async runtime. + pub fn handle(&self) -> RuntimeHandle { + match self { + Self::Tokio(r) => RuntimeHandle::Tokio(r.handle().clone()), + } + } + + /// Spawns a future onto the runtime. + pub fn spawn(&self, task: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match self { + Self::Tokio(r) => JoinHandle::Tokio(r.spawn(task)), + } + } + + /// Runs the provided function on an executor dedicated to blocking operations. + pub fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + match self { + Self::Tokio(r) => JoinHandle::Tokio(r.spawn_blocking(func)), + } + } + + /// Runs a future to completion on runtime. + pub fn block_on(&self, task: F) -> F::Output { + match self { + Self::Tokio(r) => r.block_on(task), + } + } +} /// An owned permission to join on a task (await its termination). #[derive(Debug)] -pub struct JoinHandle(TokioJoinHandle); +pub enum JoinHandle { + /// The tokio JoinHandle. + Tokio(TokioJoinHandle), +} impl JoinHandle { + /// Gets a reference to the [`TokioJoinHandle`]. + pub fn inner(&self) -> &TokioJoinHandle { + let Self::Tokio(t) = self; + t + } + /// Abort the task associated with the handle. /// /// Awaiting a cancelled task might complete as usual if the task was /// already completed at the time it was cancelled, but most likely it /// will fail with a cancelled `JoinError`. pub fn abort(&self) { - self.0.abort(); + match self { + Self::Tokio(t) => t.abort(), + } } } impl Future for JoinHandle { type Output = crate::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self - .0 - .poll(cx) - .map_err(|e| crate::Error::JoinError(Box::new(e))) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.get_mut() { + Self::Tokio(t) => t.poll(cx).map_err(|e| crate::Error::JoinError(Box::new(e))), + } } } -/// Runtime handle definition. -pub trait RuntimeHandle: fmt::Debug + Clone + Sync + Sync { - /// Spawns a future onto the runtime. - fn spawn(&self, task: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static; - - /// Runs a future to completion on runtime. - fn block_on(&self, task: F) -> F::Output; +/// A handle to the async runtime +#[derive(Clone)] +pub enum RuntimeHandle { + /// The tokio handle. + Tokio(TokioHandle), } -impl RuntimeHandle for Handle { - fn spawn(&self, task: F) -> JoinHandle +impl RuntimeHandle { + /// Gets a reference to the [`TokioHandle`]. + pub fn inner(&self) -> &TokioHandle { + let Self::Tokio(h) = self; + h + } + + /// Runs the provided function on an executor dedicated to blocking operations. + pub fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + match self { + Self::Tokio(h) => JoinHandle::Tokio(h.spawn_blocking(func)), + } + } + + /// Spawns a future onto the runtime. + pub fn spawn(&self, task: F) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, { - JoinHandle(self.spawn(task)) + match self { + Self::Tokio(h) => JoinHandle::Tokio(h.spawn(task)), + } } - fn block_on(&self, task: F) -> F::Output { - self.block_on(task) + /// Runs a future to completion on runtime. + pub fn block_on(&self, task: F) -> F::Output { + match self { + Self::Tokio(h) => h.block_on(task), + } + } +} + +fn default_runtime() -> GlobalRuntime { + let runtime = Runtime::Tokio(TokioRuntime::new().unwrap()); + let handle = runtime.handle(); + GlobalRuntime { + runtime: Some(runtime), + handle, } } +/// Sets the runtime to use to execute asynchronous tasks. +/// For convinience, this method takes a [`TokioHandle`]. +/// Note that you cannot drop the underlying [`TokioRuntime`]. +/// +/// # Example +/// +/// ```rust +/// #[tokio::main] +/// async fn main() { +/// // perform some async task before initializing the app +/// do_something().await; +/// // share the current runtime with Tauri +/// tauri::async_runtime::set(tokio::runtime::Handle::current()); +/// +/// // bootstrap the tauri app... +/// // tauri::Builder::default().run().unwrap(); +/// } +/// +/// async fn do_something() {} +/// ``` +/// +/// # Panics +/// +/// Panics if the runtime is already set. +pub fn set(handle: TokioHandle) { + RUNTIME + .set(GlobalRuntime { + runtime: None, + handle: RuntimeHandle::Tokio(handle), + }) + .unwrap_or_else(|_| panic!("runtime already initialized")) +} + /// Returns a handle of the async runtime. -pub fn handle() -> impl RuntimeHandle { - let runtime = RUNTIME.get_or_init(|| Runtime::new().unwrap()); - runtime.handle().clone() +pub fn handle() -> RuntimeHandle { + let runtime = RUNTIME.get_or_init(default_runtime); + runtime.handle() } /// Runs a future to completion on runtime. pub fn block_on(task: F) -> F::Output { - let runtime = RUNTIME.get_or_init(|| Runtime::new().unwrap()); + let runtime = RUNTIME.get_or_init(default_runtime); runtime.block_on(task) } @@ -100,13 +265,51 @@ where F: Future + Send + 'static, F::Output: Send + 'static, { - let runtime = RUNTIME.get_or_init(|| Runtime::new().unwrap()); - JoinHandle(runtime.spawn(task)) + let runtime = RUNTIME.get_or_init(default_runtime); + runtime.spawn(task) +} + +/// Runs the provided function on an executor dedicated to blocking operations. +pub fn spawn_blocking(func: F) -> JoinHandle +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + let runtime = RUNTIME.get_or_init(default_runtime); + runtime.spawn_blocking(func) +} + +pub(crate) fn safe_block_on(task: F) -> F::Output +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + if tokio::runtime::Handle::try_current().is_ok() { + let (tx, rx) = std::sync::mpsc::sync_channel(1); + spawn(async move { + tx.send(task.await).unwrap(); + }); + rx.recv().unwrap() + } else { + block_on(task) + } } #[cfg(test)] mod tests { use super::*; + + #[tokio::test] + async fn runtime_spawn() { + let join = spawn(async { 5 }); + assert_eq!(join.await.unwrap(), 5); + } + + #[test] + fn runtime_block_on() { + assert_eq!(block_on(async { 0 }), 0); + } + #[tokio::test] async fn handle_spawn() { let handle = handle(); diff --git a/core/tauri/src/manager.rs b/core/tauri/src/manager.rs index e9ac09c0e3a..8b7e77569de 100644 --- a/core/tauri/src/manager.rs +++ b/core/tauri/src/manager.rs @@ -314,11 +314,12 @@ impl WindowManager { let path_for_data = path.clone(); // handle 206 (partial range) http request - if let Some(range) = request.headers().get("range") { + if let Some(range) = request.headers().get("range").cloned() { let mut status_code = 200; let path_for_data = path_for_data.clone(); let mut response = HttpResponseBuilder::new(); - let (response, status_code, data) = crate::async_runtime::block_on(async move { + let (headers, status_code, data) = crate::async_runtime::safe_block_on(async move { + let mut headers = HashMap::new(); let mut buf = Vec::new(); let mut file = tokio::fs::File::open(path_for_data.clone()).await.unwrap(); // Get the file size @@ -345,22 +346,25 @@ impl WindowManager { // partial content status_code = 206; - response = response - .header("Connection", "Keep-Alive") - .header("Accept-Ranges", "bytes") - .header("Content-Length", real_length) - .header( - "Content-Range", - format!("bytes {}-{}/{}", range.start, last_byte, file_size), - ); + headers.insert("Connection", "Keep-Alive".into()); + headers.insert("Accept-Ranges", "bytes".into()); + headers.insert("Content-Length", real_length.to_string()); + headers.insert( + "Content-Range", + format!("bytes {}-{}/{}", range.start, last_byte, file_size), + ); file.seek(SeekFrom::Start(range.start)).await.unwrap(); file.take(real_length).read_to_end(&mut buf).await.unwrap(); } - (response, status_code, buf) + (headers, status_code, buf) }); + for (k, v) in headers { + response = response.header(k, v); + } + if !data.is_empty() { let mime_type = MimeType::parse(&data, &path); return response.mimetype(&mime_type).status(status_code).body(data); @@ -368,7 +372,7 @@ impl WindowManager { } let data = - crate::async_runtime::block_on(async move { tokio::fs::read(path_for_data).await })?; + crate::async_runtime::safe_block_on(async move { tokio::fs::read(path_for_data).await })?; let mime_type = MimeType::parse(&data, &path); HttpResponseBuilder::new().mimetype(&mime_type).body(data) }); @@ -488,19 +492,13 @@ impl WindowManager { fn prepare_file_drop(&self, app_handle: AppHandle) -> FileDropHandler { let manager = self.clone(); Box::new(move |event, window| { - let manager = manager.clone(); - let app_handle = app_handle.clone(); - crate::async_runtime::block_on(async move { - let window = Window::new(manager.clone(), window, app_handle); - let _ = match event { - FileDropEvent::Hovered(paths) => { - window.emit_and_trigger("tauri://file-drop-hover", paths) - } - FileDropEvent::Dropped(paths) => window.emit_and_trigger("tauri://file-drop", paths), - FileDropEvent::Cancelled => window.emit_and_trigger("tauri://file-drop-cancelled", ()), - _ => unimplemented!(), - }; - }); + let window = Window::new(manager.clone(), window, app_handle.clone()); + let _ = match event { + FileDropEvent::Hovered(paths) => window.emit_and_trigger("tauri://file-drop-hover", paths), + FileDropEvent::Dropped(paths) => window.emit_and_trigger("tauri://file-drop", paths), + FileDropEvent::Cancelled => window.emit_and_trigger("tauri://file-drop-cancelled", ()), + _ => unimplemented!(), + }; true }) } diff --git a/examples/api/src-tauri/Cargo.lock b/examples/api/src-tauri/Cargo.lock index c658f17230d..82722d33115 100644 --- a/examples/api/src-tauri/Cargo.lock +++ b/examples/api/src-tauri/Cargo.lock @@ -46,6 +46,7 @@ dependencies = [ "serde_json", "tauri", "tauri-build", + "tokio", ] [[package]] @@ -1631,6 +1632,28 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mio" +version = "0.7.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi", +] + [[package]] name = "native-tls" version = "0.2.8" @@ -1744,6 +1767,15 @@ dependencies = [ "zvariant_derive", ] +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -2665,6 +2697,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "siphasher" version = "0.3.7" @@ -3177,9 +3218,27 @@ checksum = "70e992e41e0d2fb9f755b37446f20900f64446ef54874f40a60c78f021ac6144" dependencies = [ "autocfg", "bytes", + "libc", "memchr", + "mio", "num_cpus", + "once_cell", + "parking_lot", "pin-project-lite", + "signal-hook-registry", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9efc1aba077437943f7515666aa2b882dfabfbfdf89c819ea75a8d6e9eaba5e" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] diff --git a/examples/api/src-tauri/Cargo.toml b/examples/api/src-tauri/Cargo.toml index 5329a8b4184..5b603b8c013 100644 --- a/examples/api/src-tauri/Cargo.toml +++ b/examples/api/src-tauri/Cargo.toml @@ -13,6 +13,7 @@ tauri-build = { path = "../../../core/tauri-build" } serde_json = "1.0" serde = { version = "1.0", features = [ "derive" ] } tauri = { path = "../../../core/tauri", features = ["api-all", "cli", "system-tray", "updater"] } +tokio = { version = "1", features = ["full"] } [features] default = [ "custom-protocol" ] diff --git a/examples/api/src-tauri/src/main.rs b/examples/api/src-tauri/src/main.rs index aa16f89f0d0..2b0573679e3 100644 --- a/examples/api/src-tauri/src/main.rs +++ b/examples/api/src-tauri/src/main.rs @@ -42,7 +42,10 @@ async fn menu_toggle(window: tauri::Window) { window.menu_handle().toggle().unwrap(); } -fn main() { +#[tokio::main] +async fn main() { + //fn main() { + tauri::async_runtime::set(tokio::runtime::Handle::current()); let tray_menu1 = SystemTrayMenu::new() .add_item(CustomMenuItem::new("toggle", "Toggle")) .add_item(CustomMenuItem::new("new", "New window"))