Skip to content

Commit

Permalink
feat: polyfill instants, use wasm spawn, merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
skyzh authored and jkomyno committed Aug 14, 2023
1 parent 2ce94ed commit 74af6f1
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 7 deletions.
81 changes: 80 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions query-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ cuid = { version = "1.2", optional = true }
schema = { path = "../schema" }
lru = "0.7.7"
enumflags2 = "0.7"
instant = { version = "0.1", features = ["stdweb", "wasm-bindgen", "inaccurate"] } # Vercel Edge Function does not have performance API
pin-project = "1"

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }
wasm-bindgen-futures = "0.4"
5 changes: 3 additions & 2 deletions query-engine/core/src/executor/execute_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use query_engine_metrics::{
histogram, increment_counter, metrics, PRISMA_CLIENT_QUERIES_HISTOGRAM_MS, PRISMA_CLIENT_QUERIES_TOTAL,
};

use instant::Instant;
use schema::{QuerySchema, QuerySchemaRef};
use std::time::{Duration, Instant};
use std::time::Duration;
use tracing::Instrument;
use tracing_futures::WithSubscriber;

Expand Down Expand Up @@ -117,7 +118,7 @@ pub async fn execute_many_self_contained<C: Connector + Send + Sync>(
);
let conn = connector.get_connection().instrument(conn_span).await?;

futures.push(tokio::spawn(
futures.push(super::spawn(
request_context::with_request_context(
engine_protocol,
execute_self_contained(
Expand Down
47 changes: 47 additions & 0 deletions query-engine/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod request_context;

pub use self::{execute_operation::*, interpreting_executor::InterpretingExecutor};

use futures::Future;
pub(crate) use request_context::*;

use crate::{
Expand Down Expand Up @@ -131,3 +132,49 @@ pub trait TransactionManager {
pub fn get_current_dispatcher() -> Dispatch {
tracing::dispatcher::get_default(|current| current.clone())
}

#[cfg(not(target_arch = "wasm32"))]
pub type JoinHandle<T> = tokio::task::JoinHandle<T>;

#[cfg(not(target_arch = "wasm32"))]
pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(future)
}

#[cfg(target_arch = "wasm32")]
#[pin_project::pin_project]
pub struct JoinHandle<T>(#[pin] tokio::sync::oneshot::Receiver<T>);

#[cfg(target_arch = "wasm32")]
impl<T> Future for JoinHandle<T> {
type Output = Result<T, tokio::sync::oneshot::error::RecvError>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
let this = self.project();
this.0.poll(cx)
}
}

#[cfg(target_arch = "wasm32")]
impl<T> JoinHandle<T> {
pub fn abort(&mut self) {
// abort is noop for WASM builds
}
}

#[cfg(target_arch = "wasm32")]
pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let result = future.await;
tx.send(result).ok();
});
JoinHandle(rx)
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::executor::JoinHandle;
use crate::{protocol::EngineProtocol, ClosedTx, Operation, ResponseData};
use connector::Connection;
use lru::LruCache;
Expand All @@ -9,7 +10,6 @@ use tokio::{
mpsc::{channel, Sender},
RwLock,
},
task::JoinHandle,
time::Duration,
};

Expand Down
6 changes: 3 additions & 3 deletions query-engine/core/src/interactive_transactions/actors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{CachedTx, TransactionError, TxOpRequest, TxOpRequestMsg, TxOpResponse};
use crate::executor::{spawn, JoinHandle};
use crate::{
execute_many_operations, execute_single_operation, protocol::EngineProtocol, ClosedTx, Operation, ResponseData,
TxId,
Expand All @@ -11,7 +12,6 @@ use tokio::{
mpsc::{channel, Receiver, Sender},
oneshot, RwLock,
},
task::JoinHandle,
time::{self, Duration, Instant},
};
use tracing::Span;
Expand Down Expand Up @@ -272,7 +272,7 @@ pub(crate) async fn spawn_itx_actor(
};
let (open_transaction_send, open_transaction_rcv) = oneshot::channel();

tokio::task::spawn(
spawn(
crate::executor::with_request_context(engine_protocol, async move {
// We match on the result in order to send the error to the parent task and abort this
// task, on error. This is a separate task (actor), not a function where we can just bubble up the
Expand Down Expand Up @@ -385,7 +385,7 @@ pub(crate) fn spawn_client_list_clear_actor(
closed_txs: Arc<RwLock<lru::LruCache<TxId, Option<ClosedTx>>>>,
mut rx: Receiver<(TxId, Option<ClosedTx>)>,
) -> JoinHandle<()> {
tokio::task::spawn(async move {
spawn(async move {
loop {
if let Some((id, closed_tx)) = rx.recv().await {
trace!("removing {} from client list", id);
Expand Down

0 comments on commit 74af6f1

Please sign in to comment.