Skip to content

Commit a353707

Browse files
authored
feat(core): allow running along another tokio runtime, closes #2838 (#2973)
1 parent 50c6390 commit a353707

8 files changed

Lines changed: 281 additions & 62 deletions

File tree

.changes/async-runtime-refactor.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"tauri": patch
3+
---
4+
5+
**Breaking change:** Refactored the types returned from the `async_runtime` module.

.changes/async-runtime-set.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"tauri": patch
3+
---
4+
5+
Added `tauri::async_runtime::set` method, allowing to share your tokio runtime with Tauri.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"tauri": patch
3+
---
4+
5+
Added `tauri::async_runtime::spawn_blocking` API.

.changes/fix-block-on-runtime.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"tauri": patch
3+
---
4+
5+
Avoid `async_runtime::block_on` panics when used along another tokio runtime.

core/tauri/src/api/process/command.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ impl Command {
312312
/// Stdin, stdout and stderr are ignored.
313313
pub fn status(self) -> crate::api::Result<ExitStatus> {
314314
let (mut rx, _child) = self.spawn()?;
315-
let code = crate::async_runtime::block_on(async move {
315+
let code = crate::async_runtime::safe_block_on(async move {
316316
let mut code = None;
317317
#[allow(clippy::collapsible_match)]
318318
while let Some(event) = rx.recv().await {
@@ -330,7 +330,7 @@ impl Command {
330330
pub fn output(self) -> crate::api::Result<Output> {
331331
let (mut rx, _child) = self.spawn()?;
332332

333-
let output = crate::async_runtime::block_on(async move {
333+
let output = crate::async_runtime::safe_block_on(async move {
334334
let mut code = None;
335335
let mut stdout = String::new();
336336
let mut stderr = String::new();

core/tauri/src/app.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,9 +1057,7 @@ impl<R: Runtime> Builder<R> {
10571057
}
10581058
};
10591059
let listener = listener.clone();
1060-
crate::async_runtime::spawn(async move {
1061-
listener.lock().unwrap()(&app_handle, event);
1062-
});
1060+
listener.lock().unwrap()(&app_handle, event);
10631061
});
10641062
}
10651063
}

core/tauri/src/async_runtime.rs

Lines changed: 235 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@
1212
1313
use futures_lite::future::FutureExt;
1414
use once_cell::sync::OnceCell;
15-
use tokio::runtime::Runtime;
1615
pub use tokio::{
17-
runtime::Handle,
16+
runtime::{Handle as TokioHandle, Runtime as TokioRuntime},
1817
sync::{
1918
mpsc::{channel, Receiver, Sender},
2019
Mutex, RwLock,
@@ -23,74 +22,240 @@ pub use tokio::{
2322
};
2423

2524
use std::{
26-
fmt,
2725
future::Future,
2826
pin::Pin,
2927
task::{Context, Poll},
3028
};
3129

32-
static RUNTIME: OnceCell<Runtime> = OnceCell::new();
30+
static RUNTIME: OnceCell<GlobalRuntime> = OnceCell::new();
31+
32+
struct GlobalRuntime {
33+
runtime: Option<Runtime>,
34+
handle: RuntimeHandle,
35+
}
36+
37+
impl GlobalRuntime {
38+
fn handle(&self) -> RuntimeHandle {
39+
if let Some(r) = &self.runtime {
40+
r.handle()
41+
} else {
42+
self.handle.clone()
43+
}
44+
}
45+
46+
fn spawn<F: Future>(&self, task: F) -> JoinHandle<F::Output>
47+
where
48+
F: Future + Send + 'static,
49+
F::Output: Send + 'static,
50+
{
51+
if let Some(r) = &self.runtime {
52+
r.spawn(task)
53+
} else {
54+
self.handle.spawn(task)
55+
}
56+
}
57+
58+
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
59+
where
60+
F: FnOnce() -> R + Send + 'static,
61+
R: Send + 'static,
62+
{
63+
if let Some(r) = &self.runtime {
64+
r.spawn_blocking(func)
65+
} else {
66+
self.handle.spawn_blocking(func)
67+
}
68+
}
69+
70+
fn block_on<F: Future>(&self, task: F) -> F::Output {
71+
if let Some(r) = &self.runtime {
72+
r.block_on(task)
73+
} else {
74+
self.handle.block_on(task)
75+
}
76+
}
77+
}
78+
79+
/// A runtime used to execute asynchronous tasks.
80+
pub enum Runtime {
81+
/// The tokio runtime.
82+
Tokio(TokioRuntime),
83+
}
84+
85+
impl Runtime {
86+
/// Gets a reference to the [`TokioRuntime`].
87+
pub fn inner(&self) -> &TokioRuntime {
88+
let Self::Tokio(r) = self;
89+
r
90+
}
91+
92+
/// Returns a handle of the async runtime.
93+
pub fn handle(&self) -> RuntimeHandle {
94+
match self {
95+
Self::Tokio(r) => RuntimeHandle::Tokio(r.handle().clone()),
96+
}
97+
}
98+
99+
/// Spawns a future onto the runtime.
100+
pub fn spawn<F: Future>(&self, task: F) -> JoinHandle<F::Output>
101+
where
102+
F: Future + Send + 'static,
103+
F::Output: Send + 'static,
104+
{
105+
match self {
106+
Self::Tokio(r) => JoinHandle::Tokio(r.spawn(task)),
107+
}
108+
}
109+
110+
/// Runs the provided function on an executor dedicated to blocking operations.
111+
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
112+
where
113+
F: FnOnce() -> R + Send + 'static,
114+
R: Send + 'static,
115+
{
116+
match self {
117+
Self::Tokio(r) => JoinHandle::Tokio(r.spawn_blocking(func)),
118+
}
119+
}
120+
121+
/// Runs a future to completion on runtime.
122+
pub fn block_on<F: Future>(&self, task: F) -> F::Output {
123+
match self {
124+
Self::Tokio(r) => r.block_on(task),
125+
}
126+
}
127+
}
33128

34129
/// An owned permission to join on a task (await its termination).
35130
#[derive(Debug)]
36-
pub struct JoinHandle<T>(TokioJoinHandle<T>);
131+
pub enum JoinHandle<T> {
132+
/// The tokio JoinHandle.
133+
Tokio(TokioJoinHandle<T>),
134+
}
37135

38136
impl<T> JoinHandle<T> {
137+
/// Gets a reference to the [`TokioJoinHandle`].
138+
pub fn inner(&self) -> &TokioJoinHandle<T> {
139+
let Self::Tokio(t) = self;
140+
t
141+
}
142+
39143
/// Abort the task associated with the handle.
40144
///
41145
/// Awaiting a cancelled task might complete as usual if the task was
42146
/// already completed at the time it was cancelled, but most likely it
43147
/// will fail with a cancelled `JoinError`.
44148
pub fn abort(&self) {
45-
self.0.abort();
149+
match self {
150+
Self::Tokio(t) => t.abort(),
151+
}
46152
}
47153
}
48154

49155
impl<T> Future for JoinHandle<T> {
50156
type Output = crate::Result<T>;
51-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
52-
self
53-
.0
54-
.poll(cx)
55-
.map_err(|e| crate::Error::JoinError(Box::new(e)))
157+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
158+
match self.get_mut() {
159+
Self::Tokio(t) => t.poll(cx).map_err(|e| crate::Error::JoinError(Box::new(e))),
160+
}
56161
}
57162
}
58163

59-
/// Runtime handle definition.
60-
pub trait RuntimeHandle: fmt::Debug + Clone + Sync + Sync {
61-
/// Spawns a future onto the runtime.
62-
fn spawn<F: Future>(&self, task: F) -> JoinHandle<F::Output>
63-
where
64-
F: Future + Send + 'static,
65-
F::Output: Send + 'static;
66-
67-
/// Runs a future to completion on runtime.
68-
fn block_on<F: Future>(&self, task: F) -> F::Output;
164+
/// A handle to the async runtime
165+
#[derive(Clone)]
166+
pub enum RuntimeHandle {
167+
/// The tokio handle.
168+
Tokio(TokioHandle),
69169
}
70170

71-
impl RuntimeHandle for Handle {
72-
fn spawn<F: Future>(&self, task: F) -> JoinHandle<F::Output>
171+
impl RuntimeHandle {
172+
/// Gets a reference to the [`TokioHandle`].
173+
pub fn inner(&self) -> &TokioHandle {
174+
let Self::Tokio(h) = self;
175+
h
176+
}
177+
178+
/// Runs the provided function on an executor dedicated to blocking operations.
179+
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
180+
where
181+
F: FnOnce() -> R + Send + 'static,
182+
R: Send + 'static,
183+
{
184+
match self {
185+
Self::Tokio(h) => JoinHandle::Tokio(h.spawn_blocking(func)),
186+
}
187+
}
188+
189+
/// Spawns a future onto the runtime.
190+
pub fn spawn<F: Future>(&self, task: F) -> JoinHandle<F::Output>
73191
where
74192
F: Future + Send + 'static,
75193
F::Output: Send + 'static,
76194
{
77-
JoinHandle(self.spawn(task))
195+
match self {
196+
Self::Tokio(h) => JoinHandle::Tokio(h.spawn(task)),
197+
}
78198
}
79199

80-
fn block_on<F: Future>(&self, task: F) -> F::Output {
81-
self.block_on(task)
200+
/// Runs a future to completion on runtime.
201+
pub fn block_on<F: Future>(&self, task: F) -> F::Output {
202+
match self {
203+
Self::Tokio(h) => h.block_on(task),
204+
}
205+
}
206+
}
207+
208+
fn default_runtime() -> GlobalRuntime {
209+
let runtime = Runtime::Tokio(TokioRuntime::new().unwrap());
210+
let handle = runtime.handle();
211+
GlobalRuntime {
212+
runtime: Some(runtime),
213+
handle,
82214
}
83215
}
84216

217+
/// Sets the runtime to use to execute asynchronous tasks.
218+
/// For convinience, this method takes a [`TokioHandle`].
219+
/// Note that you cannot drop the underlying [`TokioRuntime`].
220+
///
221+
/// # Example
222+
///
223+
/// ```rust
224+
/// #[tokio::main]
225+
/// async fn main() {
226+
/// // perform some async task before initializing the app
227+
/// do_something().await;
228+
/// // share the current runtime with Tauri
229+
/// tauri::async_runtime::set(tokio::runtime::Handle::current());
230+
///
231+
/// // bootstrap the tauri app...
232+
/// // tauri::Builder::default().run().unwrap();
233+
/// }
234+
///
235+
/// async fn do_something() {}
236+
/// ```
237+
///
238+
/// # Panics
239+
///
240+
/// Panics if the runtime is already set.
241+
pub fn set(handle: TokioHandle) {
242+
RUNTIME
243+
.set(GlobalRuntime {
244+
runtime: None,
245+
handle: RuntimeHandle::Tokio(handle),
246+
})
247+
.unwrap_or_else(|_| panic!("runtime already initialized"))
248+
}
249+
85250
/// Returns a handle of the async runtime.
86-
pub fn handle() -> impl RuntimeHandle {
87-
let runtime = RUNTIME.get_or_init(|| Runtime::new().unwrap());
88-
runtime.handle().clone()
251+
pub fn handle() -> RuntimeHandle {
252+
let runtime = RUNTIME.get_or_init(default_runtime);
253+
runtime.handle()
89254
}
90255

91256
/// Runs a future to completion on runtime.
92257
pub fn block_on<F: Future>(task: F) -> F::Output {
93-
let runtime = RUNTIME.get_or_init(|| Runtime::new().unwrap());
258+
let runtime = RUNTIME.get_or_init(default_runtime);
94259
runtime.block_on(task)
95260
}
96261

@@ -100,13 +265,51 @@ where
100265
F: Future + Send + 'static,
101266
F::Output: Send + 'static,
102267
{
103-
let runtime = RUNTIME.get_or_init(|| Runtime::new().unwrap());
104-
JoinHandle(runtime.spawn(task))
268+
let runtime = RUNTIME.get_or_init(default_runtime);
269+
runtime.spawn(task)
270+
}
271+
272+
/// Runs the provided function on an executor dedicated to blocking operations.
273+
pub fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
274+
where
275+
F: FnOnce() -> R + Send + 'static,
276+
R: Send + 'static,
277+
{
278+
let runtime = RUNTIME.get_or_init(default_runtime);
279+
runtime.spawn_blocking(func)
280+
}
281+
282+
pub(crate) fn safe_block_on<F>(task: F) -> F::Output
283+
where
284+
F: Future + Send + 'static,
285+
F::Output: Send + 'static,
286+
{
287+
if tokio::runtime::Handle::try_current().is_ok() {
288+
let (tx, rx) = std::sync::mpsc::sync_channel(1);
289+
spawn(async move {
290+
tx.send(task.await).unwrap();
291+
});
292+
rx.recv().unwrap()
293+
} else {
294+
block_on(task)
295+
}
105296
}
106297

107298
#[cfg(test)]
108299
mod tests {
109300
use super::*;
301+
302+
#[tokio::test]
303+
async fn runtime_spawn() {
304+
let join = spawn(async { 5 });
305+
assert_eq!(join.await.unwrap(), 5);
306+
}
307+
308+
#[test]
309+
fn runtime_block_on() {
310+
assert_eq!(block_on(async { 0 }), 0);
311+
}
312+
110313
#[tokio::test]
111314
async fn handle_spawn() {
112315
let handle = handle();

0 commit comments

Comments
 (0)