Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(driver-adapters): enable Wasm on request-handlers #4455

Merged
merged 51 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
61ef25b
feat(quaint): allow wasm32-unknown-unknown compilation; currently fai…
jkomyno Nov 10, 2023
055e696
feat(quaint): split postgres connector into native and wasm submodules
jkomyno Nov 10, 2023
12c6ebb
feat(quaint): split mysql connector into native and wasm submodules
jkomyno Nov 10, 2023
060486d
feat(quaint): recover wasm error for mysql
jkomyno Nov 10, 2023
5de1dc0
feat(quaint): split mssql connector into native and wasm submodules
jkomyno Nov 10, 2023
8ecbc5c
feat(quaint): split sqlite connector into native and wasm submodules
jkomyno Nov 13, 2023
45df24f
chore(quaint): fix clippy when compiling natively
jkomyno Nov 13, 2023
6a1f733
chore(quaint): fix clippy when compiling to wasm32-unknown-unknown
jkomyno Nov 13, 2023
e61bf75
chore(quaint): update README
jkomyno Nov 13, 2023
257c4c8
chore(quaint): rename "*-connector" feature flag to "*-native"
jkomyno Nov 14, 2023
5ab6d96
feat(quaint): enable pure Wasm SqliteError
jkomyno Nov 14, 2023
ab65c95
feat(query-connect): allow wasm32-unknown-unknown compilation
jkomyno Nov 14, 2023
485f6dc
Merge branch 'main' into feat/quaint-on-wasm32-unknown-unknown
jkomyno Nov 14, 2023
cfb5507
feat(sql-query-connector): allow wasm32-unknown-unknown compilation
jkomyno Nov 14, 2023
e7df5a3
chore(query-engine-wasm): add currently unused local crates to test w…
jkomyno Nov 14, 2023
8c5d3dc
chore: update Cargo.lock
jkomyno Nov 14, 2023
6648a88
chore: remove leftover comments
jkomyno Nov 14, 2023
754746e
feat(query-core): allow wasm32-unknown-unknown compilation
jkomyno Nov 14, 2023
fe2fb8b
chore(sql-query-connector): fix clipppy on wasm32
jkomyno Nov 14, 2023
2ffe394
Merge branch 'feat/sql-query-connector-on-wasm32-unknown-unknown' of …
jkomyno Nov 14, 2023
e66fb65
Merge branch 'feat/sql-query-connector-on-wasm32-unknown-unknown' int…
jkomyno Nov 14, 2023
9c41dc1
chore: remove leftover comment
jkomyno Nov 14, 2023
b69bb84
feat(driver-adapters): enable Wasm on request-handlers
jkomyno Nov 15, 2023
c987dce
WIP: refactor mysql module to flatten its structure
miguelff Nov 15, 2023
626bc1e
feat(quaint): flatten mssql connector module
jkomyno Nov 15, 2023
a9f8ba8
feat(quaint): flatten postgres connector module
jkomyno Nov 15, 2023
3c1a100
feat(quaint): flatten sqlite connector module
jkomyno Nov 15, 2023
7f4c8f9
chore(quaint): export all public definitions in connector "url" modules
jkomyno Nov 15, 2023
95a4e28
chore(quaint): refactor tests for connectors, addressing feedback
jkomyno Nov 15, 2023
bacb635
chore: add comment on MysqlAsyncError
jkomyno Nov 15, 2023
263bab0
chore: add comment on ffi.rs for sqlite
jkomyno Nov 15, 2023
414ae2b
Merge branch 'feat/quaint-on-wasm32-unknown-unknown' into feat/sql-qu…
jkomyno Nov 15, 2023
d0f783d
Merge branch 'feat/sql-query-connector-on-wasm32-unknown-unknown' int…
jkomyno Nov 15, 2023
76816fd
chore: replace awkward "super::super::" with "crate::..."
jkomyno Nov 15, 2023
7359a08
Merge branch 'feat/sql-query-connector-on-wasm32-unknown-unknown' of …
jkomyno Nov 15, 2023
f61665a
Merge branch 'feat/sql-query-connector-on-wasm32-unknown-unknown' int…
jkomyno Nov 15, 2023
5126a75
chore: add comments around "query_core::executor::task"
jkomyno Nov 15, 2023
de39d9e
chore: add "request-handlers" to "query-engine-wasm"
jkomyno Nov 15, 2023
4ed297e
Merge branch 'feat/query-core-on-wasm32-unknown-unknown' into feat/re…
jkomyno Nov 15, 2023
33cdf77
Merge branch 'main' into feat/sql-query-connector-on-wasm32-unknown-u…
jkomyno Nov 17, 2023
2dd3f02
Merge branch 'main' into feat/query-core-on-wasm32-unknown-unknown
jkomyno Nov 17, 2023
2afeddc
Merge branch 'feat/sql-query-connector-on-wasm32-unknown-unknown' int…
jkomyno Nov 17, 2023
2339b31
chore: move "task" module into its own file
jkomyno Nov 17, 2023
edf0f4e
Merge branch 'main' into feat/query-core-on-wasm32-unknown-unknown
jkomyno Nov 17, 2023
5976ab1
Merge branch 'feat/query-core-on-wasm32-unknown-unknown' into feat/re…
jkomyno Nov 17, 2023
96cd8ca
fix(driver-adapters): ci for "request-handlers"
jkomyno Nov 17, 2023
3541054
fix(driver-adapters): ci for "request-handlers"
jkomyno Nov 17, 2023
e79cb1f
Merge branch 'main' into feat/sql-query-connector-on-wasm32-unknown-u…
jkomyno Nov 17, 2023
e52500f
Merge branch 'feat/sql-query-connector-on-wasm32-unknown-unknown' int…
jkomyno Nov 17, 2023
2c70d12
Merge branch 'feat/query-core-on-wasm32-unknown-unknown' into feat/re…
jkomyno Nov 17, 2023
a82584a
chore: merge main, fix conflicts
jkomyno Nov 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ once_cell = "1"
qe-setup = { path = "../qe-setup" }
request-handlers = { path = "../../request-handlers" }
tokio.workspace = true
query-core = { path = "../../core" }
query-core = { path = "../../core", features = ["metrics"] }
sql-query-connector = { path = "../../connectors/sql-query-connector" }
query-engine = { path = "../../query-engine"}
psl.workspace = true
Expand Down
11 changes: 9 additions & 2 deletions query-engine/connectors/sql-query-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ version = "0.1.0"

[features]
vendored-openssl = ["quaint/vendored-openssl"]

# Enable Driver Adapters
driver-adapters = []

[dependencies]
Expand All @@ -18,15 +20,20 @@ once_cell = "1.3"
rand = "0.7"
serde_json = {version = "1.0", features = ["float_roundtrip"]}
thiserror = "1.0"
tokio.workspace = true
tokio = { version = "1.0", features = ["macros", "time"] }
tracing = "0.1"
tracing-futures = "0.2"
uuid.workspace = true
opentelemetry = { version = "0.17", features = ["tokio"] }
tracing-opentelemetry = "0.17.3"
quaint.workspace = true
cuid = { git = "https://github.com/prisma/cuid-rust", branch = "wasm32-support" }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
quaint.workspace = true

[target.'cfg(target_arch = "wasm32")'.dependencies]
quaint = { path = "../../../quaint" }

[dependencies.connector-interface]
package = "query-connector"
path = "../query-connector"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg_attr(target_arch = "wasm32", allow(dead_code))]

use super::{catch, transaction::SqlConnectorTransaction};
use crate::{database::operations::*, Context, SqlError};
use async_trait::async_trait;
Expand Down
19 changes: 11 additions & 8 deletions query-engine/connectors/sql-query-connector/src/database/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
mod connection;
#[cfg(feature = "driver-adapters")]
mod js;
mod mssql;
mod mysql;
mod postgresql;
mod sqlite;
mod transaction;

#[cfg(not(target_arch = "wasm32"))]
pub(crate) mod native {
pub(crate) mod mssql;
pub(crate) mod mysql;
pub(crate) mod postgresql;
pub(crate) mod sqlite;
}

pub(crate) mod operations;

use async_trait::async_trait;
use connector_interface::{error::ConnectorError, Connector};

#[cfg(feature = "driver-adapters")]
pub use js::*;
pub use mssql::*;
pub use mysql::*;
pub use postgresql::*;
pub use sqlite::*;

#[cfg(not(target_arch = "wasm32"))]
pub use native::{mssql::*, mysql::*, postgresql::*, sqlite::*};

#[async_trait]
pub trait FromSource {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::connection::SqlConnection;
use crate::database::{catch, connection::SqlConnection};
use crate::{FromSource, SqlError};
use async_trait::async_trait;
use connector_interface::{
Expand Down Expand Up @@ -60,7 +60,7 @@ impl FromSource for Mssql {
#[async_trait]
impl Connector for Mssql {
async fn get_connection<'a>(&'a self) -> connector::Result<Box<dyn Connection + Send + Sync + 'static>> {
super::catch(self.connection_info.clone(), async move {
catch(self.connection_info.clone(), async move {
let conn = self.pool.check_out().await.map_err(SqlError::from)?;
let conn = SqlConnection::new(conn, &self.connection_info, self.features);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::connection::SqlConnection;
use crate::database::{catch, connection::SqlConnection};
use crate::{FromSource, SqlError};
use async_trait::async_trait;
use connector_interface::{
Expand Down Expand Up @@ -65,7 +65,7 @@ impl FromSource for Mysql {
#[async_trait]
impl Connector for Mysql {
async fn get_connection<'a>(&'a self) -> connector::Result<Box<dyn Connection + Send + Sync + 'static>> {
super::catch(self.connection_info.clone(), async move {
catch(self.connection_info.clone(), async move {
let runtime_conn = self.pool.check_out().await?;

// Note: `runtime_conn` must be `Sized`, as that's required by `TransactionCapable`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::connection::SqlConnection;
use crate::database::{catch, connection::SqlConnection};
use crate::{FromSource, SqlError};
use async_trait::async_trait;
use connector_interface::{
Expand Down Expand Up @@ -67,7 +67,7 @@ impl FromSource for PostgreSql {
#[async_trait]
impl Connector for PostgreSql {
async fn get_connection<'a>(&'a self) -> connector_interface::Result<Box<dyn Connection + Send + Sync + 'static>> {
super::catch(self.connection_info.clone(), async move {
catch(self.connection_info.clone(), async move {
let conn = self.pool.check_out().await.map_err(SqlError::from)?;
let conn = SqlConnection::new(conn, &self.connection_info, self.features);
Ok(Box::new(conn) as Box<dyn Connection + Send + Sync + 'static>)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::connection::SqlConnection;
use crate::database::{catch, connection::SqlConnection};
use crate::{FromSource, SqlError};
use async_trait::async_trait;
use connector_interface::{
Expand Down Expand Up @@ -80,7 +80,7 @@ fn invalid_file_path_error(file_path: &str, connection_info: &ConnectionInfo) ->
#[async_trait]
impl Connector for Sqlite {
async fn get_connection<'a>(&'a self) -> connector::Result<Box<dyn Connection + Send + Sync + 'static>> {
super::catch(self.connection_info().clone(), async move {
catch(self.connection_info().clone(), async move {
let conn = self.pool.check_out().await.map_err(SqlError::from)?;
let conn = SqlConnection::new(conn, self.connection_info(), self.features);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,28 @@ use std::{
ops::Deref,
usize,
};
use tracing::log::trace;
use user_facing_errors::query_engine::DatabaseConstraint;

#[cfg(target_arch = "wasm32")]
macro_rules! trace {
(target: $target:expr, $($arg:tt)+) => {{
// No-op in WebAssembly
}};
($($arg:tt)+) => {{
// No-op in WebAssembly
}};
}

#[cfg(not(target_arch = "wasm32"))]
macro_rules! trace {
(target: $target:expr, $($arg:tt)+) => {
tracing::log::trace!(target: $target, $($arg)+);
};
($($arg:tt)+) => {
tracing::log::trace!($($arg)+);
};
}

async fn generate_id(
conn: &dyn Queryable,
id_field: &FieldSelection,
Expand Down
5 changes: 4 additions & 1 deletion query-engine/connectors/sql-query-connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ mod value_ext;
use self::{column_metadata::*, context::Context, query_ext::QueryExt, row::*};
use quaint::prelude::Queryable;

pub use database::FromSource;
#[cfg(feature = "driver-adapters")]
pub use database::{activate_driver_adapter, Js};
pub use database::{FromSource, Mssql, Mysql, PostgreSql, Sqlite};
pub use error::SqlError;

#[cfg(not(target_arch = "wasm32"))]
pub use database::{Mssql, Mysql, PostgreSql, Sqlite};

type Result<T> = std::result::Result<T, error::SqlError>;
2 changes: 1 addition & 1 deletion query-engine/core-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2021"
dissimilar = "1.0.4"
user-facing-errors = { path = "../../libs/user-facing-errors" }
request-handlers = { path = "../request-handlers" }
query-core = { path = "../core" }
query-core = { path = "../core", features = ["metrics"] }
schema = { path = "../schema" }
psl.workspace = true
serde_json.workspace = true
13 changes: 11 additions & 2 deletions query-engine/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ edition = "2021"
name = "query-core"
version = "0.1.0"

[features]
metrics = ["query-engine-metrics"]

[dependencies]
async-trait = "0.1"
bigdecimal = "0.3"
Expand All @@ -18,11 +21,11 @@ once_cell = "1"
petgraph = "0.4"
prisma-models = { path = "../prisma-models", features = ["default_generators"] }
opentelemetry = { version = "0.17.0", features = ["rt-tokio", "serialize"] }
query-engine-metrics = {path = "../metrics"}
query-engine-metrics = { path = "../metrics", optional = true }
serde.workspace = true
serde_json.workspace = true
thiserror = "1.0"
tokio.workspace = true
tokio = { version = "1.0", features = ["macros", "time"] }
tracing = { version = "0.1", features = ["attributes"] }
tracing-futures = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand All @@ -34,3 +37,9 @@ schema = { path = "../schema" }
lru = "0.7.7"
enumflags2 = "0.7"

pin-project = "1"
wasm-bindgen-futures = "0.4"

[target.'cfg(target_arch = "wasm32")'.dependencies]
pin-project = "1"
wasm-bindgen-futures = "0.4"
11 changes: 11 additions & 0 deletions query-engine/core/src/executor/execute_operation.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
#![cfg_attr(target_arch = "wasm32", allow(unused_variables))]

use super::pipeline::QueryPipeline;
use crate::{
executor::request_context, protocol::EngineProtocol, CoreError, IrSerializer, Operation, QueryGraph,
QueryGraphBuilder, QueryInterpreter, ResponseData,
};
use connector::{Connection, ConnectionLike, Connector};
use futures::future;

#[cfg(feature = "metrics")]
use query_engine_metrics::{
histogram, increment_counter, metrics, PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS, PRISMA_CLIENT_QUERIES_TOTAL,
};

use schema::{QuerySchema, QuerySchemaRef};
use std::time::{Duration, Instant};
use tracing::Instrument;
Expand All @@ -24,6 +29,7 @@ pub async fn execute_single_operation(
let (graph, serializer) = build_graph(&query_schema, operation.clone())?;
let result = execute_on(conn, graph, serializer, query_schema.as_ref(), trace_id).await;

#[cfg(feature = "metrics")]
histogram!(PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS, operation_timer.elapsed());

result
Expand All @@ -45,6 +51,8 @@ pub async fn execute_many_operations(
for (i, (graph, serializer)) in queries.into_iter().enumerate() {
let operation_timer = Instant::now();
let result = execute_on(conn, graph, serializer, query_schema.as_ref(), trace_id.clone()).await;

#[cfg(feature = "metrics")]
histogram!(PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS, operation_timer.elapsed());

match result {
Expand Down Expand Up @@ -98,6 +106,7 @@ pub async fn execute_many_self_contained<C: Connector + Send + Sync>(

let dispatcher = crate::get_current_dispatcher();
for op in operations {
#[cfg(feature = "metrics")]
increment_counter!(PRISMA_CLIENT_QUERIES_TOTAL);

let conn_span = info_span!(
Expand Down Expand Up @@ -158,6 +167,7 @@ async fn execute_self_contained(
execute_self_contained_without_retry(conn, graph, serializer, force_transactions, &query_schema, trace_id).await
};

#[cfg(feature = "metrics")]
histogram!(PRISMA_CLIENT_QUERIES_DURATION_HISTOGRAM_MS, operation_timer.elapsed());

result
Expand Down Expand Up @@ -259,6 +269,7 @@ async fn execute_on<'a>(
query_schema: &'a QuerySchema,
trace_id: Option<String>,
) -> crate::Result<ResponseData> {
#[cfg(feature = "metrics")]
increment_counter!(PRISMA_CLIENT_QUERIES_TOTAL);

let interpreter = QueryInterpreter::new(conn);
Expand Down
1 change: 1 addition & 0 deletions query-engine/core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod execute_operation;
mod interpreting_executor;
mod pipeline;
mod request_context;
pub(crate) mod task;

pub use self::{execute_operation::*, interpreting_executor::InterpretingExecutor};

Expand Down
59 changes: 59 additions & 0 deletions query-engine/core/src/executor/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! This module provides a unified interface for spawning asynchronous tasks, regardless of the target platform.

pub use arch::{spawn, JoinHandle};
use futures::Future;

// On native targets, `tokio::spawn` spawns a new asynchronous task.
#[cfg(not(target_arch = "wasm32"))]
mod arch {
use super::*;

pub type JoinHandle<T> = tokio::task::JoinHandle<T>;

pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(future)
}
}

// On Wasm targets, `wasm_bindgen_futures::spawn_local` spawns a new asynchronous task.
#[cfg(target_arch = "wasm32")]
mod arch {
use super::*;
use tokio::sync::oneshot::{self};

// Wasm-compatible alternative to `tokio::task::JoinHandle<T>`.
// `pin_project` enables pin-projection and a `Pin`-compatible implementation of the `Future` trait.
pub struct JoinHandle<T>(oneshot::Receiver<T>);

impl<T> Future for JoinHandle<T> {
type Output = Result<T, oneshot::error::RecvError>;

fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
// the `self.project()` method is provided by the `pin_project` macro
core::pin::Pin::new(&mut self.0).poll(cx)
}
}

impl<T> JoinHandle<T> {
pub fn abort(&mut self) {
// abort is noop on Wasm targets
}
}

pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (sender, receiver) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let result = future.await;
sender.send(result).ok();
});
JoinHandle(receiver)
}
}
Loading
Loading