Skip to content

Commit

Permalink
chore(review): move wasm/napi-specific task JoinHandle stuff to cross…
Browse files Browse the repository at this point in the history
…target-utils
  • Loading branch information
jkomyno committed Dec 5, 2023
1 parent 7f92ada commit 4100e5b
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 138 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions libs/crosstarget-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures = "0.3"

[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys.workspace = true
wasm-bindgen.workspace = true
wasm-bindgen-futures.workspace = true
tokio = { version = "1.25", features = ["macros"] }

tokio = { version = "1.25", features = ["macros", "sync"] }
pin-project = "1"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio.workspace = true
1 change: 1 addition & 0 deletions libs/crosstarget-utils/src/native/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod spawn;
pub mod task;
pub mod time;
46 changes: 46 additions & 0 deletions libs/crosstarget-utils/src/native/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use futures::Future;
use tokio::sync::broadcast::{self};

pub struct JoinHandle<T> {
handle: tokio::task::JoinHandle<T>,

sx_exit: Option<broadcast::Sender<()>>,
}

impl<T> JoinHandle<T> {
pub fn abort(&mut self) {
if let Some(sx_exit) = self.sx_exit.as_ref() {
sx_exit.send(()).ok();
}

self.handle.abort();
}
}

pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
spawn_with_sx_exit::<T>(future, None)
}

pub fn spawn_controlled<T>(future_fn: Box<dyn FnOnce(broadcast::Receiver<()>) -> T>) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (sx_exit, rx_exit) = tokio::sync::broadcast::channel::<()>(1);
let future = future_fn(rx_exit);

spawn_with_sx_exit::<T>(future, Some(sx_exit))
}

fn spawn_with_sx_exit<T>(future: T, sx_exit: Option<broadcast::Sender<()>>) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let handle = tokio::spawn(future);
JoinHandle { handle, sx_exit }
}
1 change: 1 addition & 0 deletions libs/crosstarget-utils/src/wasm/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod spawn;
pub mod task;
pub mod time;
64 changes: 64 additions & 0 deletions libs/crosstarget-utils/src/wasm/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use futures::Future;
use tokio::sync::{
broadcast::{self},
oneshot::{self},
};

// Wasm-compatible alternative to `tokio::task::JoinHandle<T>`.
// `pin_project` enables pin-projection and a `Pin`-compatible implementation of the `Future` trait.
#[pin_project::pin_project]
pub struct JoinHandle<T> {
#[pin]
receiver: oneshot::Receiver<T>,

sx_exit: Option<broadcast::Sender<()>>,
}

impl<T> Future for JoinHandle<T> {
type Output = Result<T, oneshot::error::RecvError>;

fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
// the `self.project()` method is provided by the `pin_project` macro
core::pin::Pin::new(&mut self.receiver).poll(cx)
}
}

impl<T> JoinHandle<T> {
pub fn abort(&mut self) {
if let Some(sx_exit) = self.sx_exit.as_ref() {
sx_exit.send(()).ok();
}
}
}

pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + 'static,
T::Output: Send + 'static,
{
spawn_with_sx_exit::<T>(future, None)
}

pub fn spawn_controlled<T>(future_fn: Box<dyn FnOnce(broadcast::Receiver<()>) -> T>) -> JoinHandle<T::Output>
where
T: Future + 'static,
T::Output: Send + 'static,
{
let (sx_exit, rx_exit) = tokio::sync::broadcast::channel::<()>(1);
let future = future_fn(rx_exit);
spawn_with_sx_exit::<T>(future, Some(sx_exit))
}

fn spawn_with_sx_exit<T>(future: T, sx_exit: Option<broadcast::Sender<()>>) -> JoinHandle<T::Output>
where
T: Future + 'static,
T::Output: Send + 'static,
{
let (sender, receiver) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let result = future.await;
sender.send(result).ok();
});

JoinHandle { receiver, sx_exit }
}
7 changes: 0 additions & 7 deletions query-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,3 @@ schema = { path = "../schema" }
crosstarget-utils = { path = "../../libs/crosstarget-utils" }
lru = "0.7.7"
enumflags2 = "0.7"

pin-project = "1"
wasm-bindgen-futures = "0.4"

[target.'cfg(target_arch = "wasm32")'.dependencies]
pin-project = "1"
wasm-bindgen-futures.workspace = true
1 change: 0 additions & 1 deletion query-engine/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ mod execute_operation;
mod interpreting_executor;
mod pipeline;
mod request_context;
pub(crate) mod task;

pub use self::{execute_operation::*, interpreting_executor::InterpretingExecutor};

Expand Down
124 changes: 0 additions & 124 deletions query-engine/core/src/executor/task.rs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::executor::task::JoinHandle;
use crate::{protocol::EngineProtocol, ClosedTx, Operation, ResponseData};
use connector::Connection;
use crosstarget_utils::task::JoinHandle;
use lru::LruCache;
use once_cell::sync::Lazy;
use schema::QuerySchemaRef;
Expand Down
2 changes: 1 addition & 1 deletion query-engine/core/src/interactive_transactions/actors.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::{CachedTx, TransactionError, TxOpRequest, TxOpRequestMsg, TxOpResponse};
use crate::executor::task::{spawn, spawn_controlled, JoinHandle};
use crate::{
execute_many_operations, execute_single_operation, protocol::EngineProtocol, ClosedTx, Operation, ResponseData,
TxId,
};
use connector::Connection;
use crosstarget_utils::task::{spawn, spawn_controlled, JoinHandle};
use crosstarget_utils::time::ElapsedTimeCounter;
use schema::QuerySchemaRef;
use std::{collections::HashMap, sync::Arc};
Expand Down

0 comments on commit 4100e5b

Please sign in to comment.