diff --git a/Cargo.lock b/Cargo.lock index e3a56425f0ce..f5874f0e640f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3730,6 +3730,31 @@ dependencies = [ "user-facing-errors", ] +[[package]] +name = "query-engine-common" +version = "0.1.0" +dependencies = [ + "async-trait", + "connection-string", + "napi", + "opentelemetry", + "psl", + "query-connector", + "query-core", + "query-engine-metrics", + "serde", + "serde_json", + "thiserror", + "tracing", + "tracing-futures", + "tracing-opentelemetry", + "tracing-subscriber", + "tsify", + "url", + "user-facing-errors", + "wasm-bindgen", +] + [[package]] name = "query-engine-metrics" version = "0.1.0" @@ -3765,6 +3790,7 @@ dependencies = [ "quaint", "query-connector", "query-core", + "query-engine-common", "query-engine-metrics", "query-structure", "request-handlers", @@ -3822,6 +3848,7 @@ dependencies = [ "quaint", "query-connector", "query-core", + "query-engine-common", "query-structure", "request-handlers", "serde", diff --git a/libs/query-engine-common/Cargo.toml b/libs/query-engine-common/Cargo.toml new file mode 100644 index 000000000000..215897a3aa45 --- /dev/null +++ b/libs/query-engine-common/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "query-engine-common" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +thiserror = "1" +url = "2" +query-connector = { path = "../../query-engine/connectors/query-connector" } +query-core = { path = "../../query-engine/core" } +user-facing-errors = { path = "../user-facing-errors" } +serde_json.workspace = true +serde.workspace = true +connection-string.workspace = true +psl.workspace = true +async-trait = "0.1" +tracing = "0.1" +tracing-subscriber = { version = "0.3" } +tracing-futures = "0.2" +tracing-opentelemetry = "0.17.3" +opentelemetry = { version = "0.17"} + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +napi.workspace = true +query-engine-metrics = { path = "../../query-engine/metrics" } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen.workspace = true +tsify.workspace = true diff --git a/libs/query-engine-common/src/engine.rs b/libs/query-engine-common/src/engine.rs new file mode 100644 index 000000000000..2ba69eb72fab --- /dev/null +++ b/libs/query-engine-common/src/engine.rs @@ -0,0 +1,158 @@ +#![allow(unused_imports)] + +use crate::error::ApiError; +use query_core::{protocol::EngineProtocol, schema::QuerySchema, QueryExecutor}; +use serde::Deserialize; +use std::{ + collections::{BTreeMap, HashMap}, + path::PathBuf, + sync::Arc, +}; + +#[cfg(target_arch = "wasm32")] +use tsify::Tsify; + +/// The state of the engine. +pub enum Inner { + /// Not connected, holding all data to form a connection. + Builder(EngineBuilder), + /// A connected engine, holding all data to disconnect and form a new + /// connection. Allows querying when on this state. + Connected(ConnectedEngine), +} + +impl Inner { + /// Returns a builder if the engine is not connected + pub fn as_builder(&self) -> crate::Result<&EngineBuilder> { + match self { + Inner::Builder(ref builder) => Ok(builder), + Inner::Connected(_) => Err(ApiError::AlreadyConnected), + } + } + + /// Returns the engine if connected + pub fn as_engine(&self) -> crate::Result<&ConnectedEngine> { + match self { + Inner::Builder(_) => Err(ApiError::NotConnected), + Inner::Connected(ref engine) => Ok(engine), + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +pub struct EngineBuilderNative { + pub config_dir: PathBuf, + pub env: HashMap, +} + +/// Everything needed to connect to the database and have the core running. +pub struct EngineBuilder { + pub schema: Arc, + pub engine_protocol: EngineProtocol, + + #[cfg(not(target_arch = "wasm32"))] + pub native: EngineBuilderNative, +} + +#[cfg(not(target_arch = "wasm32"))] +pub struct ConnectedEngineNative { + pub config_dir: PathBuf, + pub env: HashMap, + pub metrics: Option, +} + +/// Internal structure for querying and reconnecting with the engine. +pub struct ConnectedEngine { + pub schema: Arc, + pub query_schema: Arc, + pub executor: crate::Executor, + pub engine_protocol: EngineProtocol, + + #[cfg(not(target_arch = "wasm32"))] + pub native: ConnectedEngineNative, +} + +impl ConnectedEngine { + /// The schema AST for Query Engine core. + pub fn query_schema(&self) -> &Arc { + &self.query_schema + } + + /// The query executor. + pub fn executor(&self) -> &(dyn QueryExecutor + Send + Sync) { + self.executor.as_ref() + } + + pub fn engine_protocol(&self) -> EngineProtocol { + self.engine_protocol + } +} + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ConstructorOptionsNative { + #[serde(default)] + pub datasource_overrides: BTreeMap, + pub config_dir: PathBuf, + #[serde(default)] + pub env: serde_json::Value, + #[serde(default)] + pub ignore_env_var_errors: bool, +} + +/// Parameters defining the construction of an engine. +#[derive(Debug, Deserialize)] +#[cfg_attr(target_arch = "wasm32", derive(Tsify))] +#[cfg_attr(target_arch = "wasm32", tsify(from_wasm_abi))] +#[serde(rename_all = "camelCase")] +pub struct ConstructorOptions { + pub datamodel: String, + pub log_level: String, + #[serde(default)] + pub log_queries: bool, + #[serde(default)] + pub engine_protocol: Option, + + #[cfg(not(target_arch = "wasm32"))] + #[serde(flatten)] + pub native: ConstructorOptionsNative, +} + +pub fn map_known_error(err: query_core::CoreError) -> crate::Result { + let user_error: user_facing_errors::Error = err.into(); + let value = serde_json::to_string(&user_error)?; + + Ok(value) +} + +pub fn stringify_env_values(origin: serde_json::Value) -> crate::Result> { + use serde_json::Value; + + let msg = match origin { + Value::Object(map) => { + let mut result: HashMap = HashMap::new(); + + for (key, val) in map.into_iter() { + match val { + Value::Null => continue, + Value::String(val) => { + result.insert(key, val); + } + val => { + result.insert(key, val.to_string()); + } + } + } + + return Ok(result); + } + Value::Null => return Ok(Default::default()), + Value::Bool(_) => "Expected an object for the env constructor parameter, got a boolean.", + Value::Number(_) => "Expected an object for the env constructor parameter, got a number.", + Value::String(_) => "Expected an object for the env constructor parameter, got a string.", + Value::Array(_) => "Expected an object for the env constructor parameter, got an array.", + }; + + Err(ApiError::JsonDecode(msg.to_string())) +} diff --git a/libs/query-engine-common/src/error.rs b/libs/query-engine-common/src/error.rs new file mode 100644 index 000000000000..f7c9712af8a7 --- /dev/null +++ b/libs/query-engine-common/src/error.rs @@ -0,0 +1,104 @@ +use psl::diagnostics::Diagnostics; +use query_connector::error::ConnectorError; +use query_core::CoreError; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ApiError { + #[error("{:?}", _0)] + Conversion(Diagnostics, String), + + #[error("{}", _0)] + Configuration(String), + + #[error("{}", _0)] + Core(CoreError), + + #[error("{}", _0)] + Connector(ConnectorError), + + #[error("Can't modify an already connected engine.")] + AlreadyConnected, + + #[error("Engine is not yet connected.")] + NotConnected, + + #[error("{}", _0)] + JsonDecode(String), +} + +impl From for user_facing_errors::Error { + fn from(err: ApiError) -> Self { + use std::fmt::Write as _; + + match err { + ApiError::Connector(ConnectorError { + user_facing_error: Some(err), + .. + }) => err.into(), + ApiError::Conversion(errors, dml_string) => { + let mut full_error = errors.to_pretty_string("schema.prisma", &dml_string); + write!(full_error, "\nValidation Error Count: {}", errors.errors().len()).unwrap(); + + user_facing_errors::Error::from(user_facing_errors::KnownError::new( + user_facing_errors::common::SchemaParserError { full_error }, + )) + } + ApiError::Core(error) => user_facing_errors::Error::from(error), + other => user_facing_errors::Error::new_non_panic_with_current_backtrace(other.to_string()), + } + } +} + +impl ApiError { + pub fn conversion(diagnostics: Diagnostics, dml: impl ToString) -> Self { + Self::Conversion(diagnostics, dml.to_string()) + } + + pub fn configuration(msg: impl ToString) -> Self { + Self::Configuration(msg.to_string()) + } +} + +impl From for ApiError { + fn from(e: CoreError) -> Self { + match e { + CoreError::ConfigurationError(message) => Self::Configuration(message), + core_error => Self::Core(core_error), + } + } +} + +impl From for ApiError { + fn from(e: ConnectorError) -> Self { + Self::Connector(e) + } +} + +impl From for ApiError { + fn from(e: url::ParseError) -> Self { + Self::configuration(format!("Error parsing connection string: {e}")) + } +} + +impl From for ApiError { + fn from(e: connection_string::Error) -> Self { + Self::configuration(format!("Error parsing connection string: {e}")) + } +} + +impl From for ApiError { + fn from(e: serde_json::Error) -> Self { + Self::JsonDecode(format!("{e}")) + } +} + +#[cfg(not(target_arch = "wasm32"))] +impl From for napi::Error { + fn from(e: ApiError) -> Self { + let user_facing = user_facing_errors::Error::from(e); + let message = serde_json::to_string(&user_facing).unwrap(); + + napi::Error::from_reason(message) + } +} diff --git a/libs/query-engine-common/src/lib.rs b/libs/query-engine-common/src/lib.rs new file mode 100644 index 000000000000..6b8008c0ada9 --- /dev/null +++ b/libs/query-engine-common/src/lib.rs @@ -0,0 +1,9 @@ +//! Common definitions and functions for the Query Engine library. + +pub mod engine; +pub mod error; +pub mod logger; +pub mod tracer; + +pub type Result = std::result::Result; +pub type Executor = Box; diff --git a/libs/query-engine-common/src/logger.rs b/libs/query-engine-common/src/logger.rs new file mode 100644 index 000000000000..744bb6690fa0 --- /dev/null +++ b/libs/query-engine-common/src/logger.rs @@ -0,0 +1,3 @@ +pub trait StringCallback { + fn call(&self, message: String) -> Result<(), String>; +} diff --git a/libs/query-engine-common/src/tracer.rs b/libs/query-engine-common/src/tracer.rs new file mode 100644 index 000000000000..19d17cf13a05 --- /dev/null +++ b/libs/query-engine-common/src/tracer.rs @@ -0,0 +1,86 @@ +use crate::logger::StringCallback; +use async_trait::async_trait; +use opentelemetry::{ + global, sdk, + sdk::{ + export::trace::{ExportResult, SpanData, SpanExporter}, + propagation::TraceContextPropagator, + }, + trace::{TraceError, TracerProvider}, +}; +use query_core::telemetry; +use std::fmt::{self, Debug}; + +/// Pipeline builder +#[derive(Debug)] +pub struct PipelineBuilder { + trace_config: Option, +} + +/// Create a new stdout exporter pipeline builder. +pub fn new_pipeline() -> PipelineBuilder { + PipelineBuilder::default() +} + +impl Default for PipelineBuilder { + /// Return the default pipeline builder. + fn default() -> Self { + Self { trace_config: None } + } +} + +impl PipelineBuilder { + /// Assign the SDK trace configuration. + #[allow(dead_code)] + pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { + self.trace_config = Some(config); + self + } +} + +impl PipelineBuilder { + pub fn install_simple(mut self, log_callback: Box) -> sdk::trace::Tracer { + global::set_text_map_propagator(TraceContextPropagator::new()); + let exporter = ClientSpanExporter::new(log_callback); + + let mut provider_builder = sdk::trace::TracerProvider::builder().with_simple_exporter(exporter); + // This doesn't work at the moment because we create the logger outside of an async runtime + // we could later move the creation of logger into the `connect` function + // let mut provider_builder = sdk::trace::TracerProvider::builder().with_batch_exporter(exporter, runtime::Tokio); + // remember to add features = ["rt-tokio"] to the cargo.toml + if let Some(config) = self.trace_config.take() { + provider_builder = provider_builder.with_config(config); + } + let provider = provider_builder.build(); + let tracer = provider.tracer("opentelemetry"); + global::set_tracer_provider(provider); + + tracer + } +} + +/// A [`ClientSpanExporter`] that sends spans to the JS callback. +pub struct ClientSpanExporter { + callback: Box, +} + +impl ClientSpanExporter { + pub fn new(callback: Box) -> Self { + Self { callback } + } +} + +impl Debug for ClientSpanExporter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ClientSpanExporter").finish() + } +} + +#[async_trait] +impl SpanExporter for ClientSpanExporter { + /// Export spans to stdout + async fn export(&mut self, batch: Vec) -> ExportResult { + let result = telemetry::helpers::spans_to_json(batch); + self.callback.call(result).map_err(TraceError::from) + } +} diff --git a/query-engine/query-engine-node-api/Cargo.toml b/query-engine/query-engine-node-api/Cargo.toml index ec5783466f7b..10168eafa25d 100644 --- a/query-engine/query-engine-node-api/Cargo.toml +++ b/query-engine/query-engine-node-api/Cargo.toml @@ -19,6 +19,7 @@ async-trait = "0.1" query-core = { path = "../core", features = ["metrics"] } request-handlers = { path = "../request-handlers" } query-connector = { path = "../connectors/query-connector" } +query-engine-common = { path = "../../libs/query-engine-common" } user-facing-errors = { path = "../../libs/user-facing-errors" } psl.workspace = true sql-connector = { path = "../connectors/sql-query-connector", package = "sql-query-connector" } diff --git a/query-engine/query-engine-node-api/src/engine.rs b/query-engine/query-engine-node-api/src/engine.rs index 761b8bcd8cba..081b41972c50 100644 --- a/query-engine/query-engine-node-api/src/engine.rs +++ b/query-engine/query-engine-node-api/src/engine.rs @@ -5,20 +5,18 @@ use napi_derive::napi; use psl::PreviewFeature; use query_core::{ protocol::EngineProtocol, - schema::{self, QuerySchema}, - telemetry, QueryExecutor, TransactionOptions, TxId, + schema::{self}, + telemetry, TransactionOptions, TxId, }; -use query_engine_metrics::{MetricFormat, MetricRegistry}; +use query_engine_common::engine::{ + map_known_error, stringify_env_values, ConnectedEngine, ConnectedEngineNative, ConstructorOptions, + ConstructorOptionsNative, EngineBuilder, EngineBuilderNative, Inner, +}; +use query_engine_metrics::MetricFormat; use request_handlers::{dmmf, load_executor, render_graphql_schema, ConnectorKind, RequestBody, RequestHandler}; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use serde_json::json; -use std::{ - collections::{BTreeMap, HashMap}, - future::Future, - panic::AssertUnwindSafe, - path::PathBuf, - sync::Arc, -}; +use std::{collections::HashMap, future::Future, panic::AssertUnwindSafe, sync::Arc}; use tokio::sync::RwLock; use tracing::{field, instrument::WithSubscriber, Instrument, Span}; use tracing_subscriber::filter::LevelFilter; @@ -37,43 +35,6 @@ pub struct QueryEngine { logger: Logger, } -/// The state of the engine. -enum Inner { - /// Not connected, holding all data to form a connection. - Builder(EngineBuilder), - /// A connected engine, holding all data to disconnect and form a new - /// connection. Allows querying when on this state. - Connected(ConnectedEngine), -} - -/// Everything needed to connect to the database and have the core running. -struct EngineBuilder { - schema: Arc, - config_dir: PathBuf, - env: HashMap, - engine_protocol: EngineProtocol, -} - -/// Internal structure for querying and reconnecting with the engine. -struct ConnectedEngine { - schema: Arc, - query_schema: Arc, - executor: crate::Executor, - config_dir: PathBuf, - env: HashMap, - metrics: Option, - engine_protocol: EngineProtocol, -} - -/// Returned from the `serverInfo` method in javascript. -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct ServerInfo { - commit: String, - version: String, - primary_connector: Option, -} - #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] struct MetricOptions { @@ -88,59 +49,6 @@ impl MetricOptions { } } -impl ConnectedEngine { - /// The schema AST for Query Engine core. - pub fn query_schema(&self) -> &Arc { - &self.query_schema - } - - /// The query executor. - pub fn executor(&self) -> &(dyn QueryExecutor + Send + Sync) { - self.executor.as_ref() - } - - pub fn engine_protocol(&self) -> EngineProtocol { - self.engine_protocol - } -} - -/// Parameters defining the construction of an engine. -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct ConstructorOptions { - datamodel: String, - log_level: String, - #[serde(default)] - log_queries: bool, - #[serde(default)] - datasource_overrides: BTreeMap, - #[serde(default)] - env: serde_json::Value, - config_dir: PathBuf, - #[serde(default)] - ignore_env_var_errors: bool, - #[serde(default)] - engine_protocol: Option, -} - -impl Inner { - /// Returns a builder if the engine is not connected - fn as_builder(&self) -> crate::Result<&EngineBuilder> { - match self { - Inner::Builder(ref builder) => Ok(builder), - Inner::Connected(_) => Err(ApiError::AlreadyConnected), - } - } - - /// Returns the engine if connected - fn as_engine(&self) -> crate::Result<&ConnectedEngine> { - match self { - Inner::Builder(_) => Err(ApiError::NotConnected), - Inner::Connected(ref engine) => Ok(engine), - } - } -} - #[napi] impl QueryEngine { /// Parse a validated datamodel and configuration to allow connecting later on. @@ -163,11 +71,8 @@ impl QueryEngine { datamodel, log_level, log_queries, - datasource_overrides, - env, - config_dir, - ignore_env_var_errors, engine_protocol, + native, } = napi_env.from_js_value(options).expect( r###" Failed to deserialize constructor options. @@ -180,6 +85,13 @@ impl QueryEngine { "###, ); + let ConstructorOptionsNative { + datasource_overrides, + config_dir, + env, + ignore_env_var_errors, + } = native; + let env = stringify_env_values(env)?; // we cannot trust anything JS sends us from process.env let overrides: Vec<(_, _)> = datasource_overrides.into_iter().collect(); @@ -232,9 +144,8 @@ impl QueryEngine { let builder = EngineBuilder { schema: Arc::new(schema), - config_dir, engine_protocol, - env, + native: EngineBuilderNative { config_dir, env }, }; let log_level = log_level.parse::().unwrap(); @@ -290,8 +201,8 @@ impl QueryEngine { let connector_kind = match self.connector_mode { ConnectorMode::Rust => { let url = data_source - .load_url_with_config_dir(&builder.config_dir, |key| { - builder.env.get(key).map(ToString::to_string) + .load_url_with_config_dir(&builder.native.config_dir, |key| { + builder.native.env.get(key).map(ToString::to_string) }) .map_err(|err| { crate::error::ApiError::Conversion(err, builder.schema.db.source().to_owned()) @@ -328,10 +239,12 @@ impl QueryEngine { schema: builder.schema.clone(), query_schema: Arc::new(query_schema.unwrap()), executor: executor?, - config_dir: builder.config_dir.clone(), - env: builder.env.clone(), - metrics: self.logger.metrics(), engine_protocol: builder.engine_protocol, + native: ConnectedEngineNative { + config_dir: builder.native.config_dir.clone(), + env: builder.native.env.clone(), + metrics: self.logger.metrics(), + }, }) as crate::Result } .instrument(span) @@ -364,9 +277,11 @@ impl QueryEngine { let builder = EngineBuilder { schema: engine.schema.clone(), - config_dir: engine.config_dir.clone(), - env: engine.env.clone(), engine_protocol: engine.engine_protocol(), + native: EngineBuilderNative { + config_dir: engine.native.config_dir.clone(), + env: engine.native.env.clone(), + }, }; *inner = Inner::Builder(builder); @@ -529,7 +444,7 @@ impl QueryEngine { let engine = inner.as_engine()?; let options: MetricOptions = serde_json::from_str(&json_options)?; - if let Some(metrics) = &engine.metrics { + if let Some(metrics) = &engine.native.metrics { if options.is_json_format() { let engine_metrics = metrics.to_json(options.global_labels); let res = serde_json::to_string(&engine_metrics)?; @@ -548,44 +463,6 @@ impl QueryEngine { } } -fn map_known_error(err: query_core::CoreError) -> crate::Result { - let user_error: user_facing_errors::Error = err.into(); - let value = serde_json::to_string(&user_error)?; - - Ok(value) -} - -fn stringify_env_values(origin: serde_json::Value) -> crate::Result> { - use serde_json::Value; - - let msg = match origin { - Value::Object(map) => { - let mut result: HashMap = HashMap::new(); - - for (key, val) in map.into_iter() { - match val { - Value::Null => continue, - Value::String(val) => { - result.insert(key, val); - } - val => { - result.insert(key, val.to_string()); - } - } - } - - return Ok(result); - } - Value::Null => return Ok(Default::default()), - Value::Bool(_) => "Expected an object for the env constructor parameter, got a boolean.", - Value::Number(_) => "Expected an object for the env constructor parameter, got a number.", - Value::String(_) => "Expected an object for the env constructor parameter, got a string.", - Value::Array(_) => "Expected an object for the env constructor parameter, got an array.", - }; - - Err(ApiError::JsonDecode(msg.to_string())) -} - async fn async_panic_to_js_error(fut: F) -> napi::Result where F: Future>, diff --git a/query-engine/query-engine-node-api/src/error.rs b/query-engine/query-engine-node-api/src/error.rs index 0dec758a03f5..8c385ddf487a 100644 --- a/query-engine/query-engine-node-api/src/error.rs +++ b/query-engine/query-engine-node-api/src/error.rs @@ -1,103 +1 @@ -use psl::diagnostics::Diagnostics; -use query_connector::error::ConnectorError; -use query_core::CoreError; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum ApiError { - #[error("{:?}", _0)] - Conversion(Diagnostics, String), - - #[error("{}", _0)] - Configuration(String), - - #[error("{}", _0)] - Core(CoreError), - - #[error("{}", _0)] - Connector(ConnectorError), - - #[error("Can't modify an already connected engine.")] - AlreadyConnected, - - #[error("Engine is not yet connected.")] - NotConnected, - - #[error("{}", _0)] - JsonDecode(String), -} - -impl From for user_facing_errors::Error { - fn from(err: ApiError) -> Self { - use std::fmt::Write as _; - - match err { - ApiError::Connector(ConnectorError { - user_facing_error: Some(err), - .. - }) => err.into(), - ApiError::Conversion(errors, dml_string) => { - let mut full_error = errors.to_pretty_string("schema.prisma", &dml_string); - write!(full_error, "\nValidation Error Count: {}", errors.errors().len()).unwrap(); - - user_facing_errors::Error::from(user_facing_errors::KnownError::new( - user_facing_errors::common::SchemaParserError { full_error }, - )) - } - ApiError::Core(error) => user_facing_errors::Error::from(error), - other => user_facing_errors::Error::new_non_panic_with_current_backtrace(other.to_string()), - } - } -} - -impl ApiError { - pub fn conversion(diagnostics: Diagnostics, dml: impl ToString) -> Self { - Self::Conversion(diagnostics, dml.to_string()) - } - - pub fn configuration(msg: impl ToString) -> Self { - Self::Configuration(msg.to_string()) - } -} - -impl From for ApiError { - fn from(e: CoreError) -> Self { - match e { - CoreError::ConfigurationError(message) => Self::Configuration(message), - core_error => Self::Core(core_error), - } - } -} - -impl From for ApiError { - fn from(e: ConnectorError) -> Self { - Self::Connector(e) - } -} - -impl From for ApiError { - fn from(e: url::ParseError) -> Self { - Self::configuration(format!("Error parsing connection string: {e}")) - } -} - -impl From for ApiError { - fn from(e: connection_string::Error) -> Self { - Self::configuration(format!("Error parsing connection string: {e}")) - } -} - -impl From for ApiError { - fn from(e: serde_json::Error) -> Self { - Self::JsonDecode(format!("{e}")) - } -} - -impl From for napi::Error { - fn from(e: ApiError) -> Self { - let user_facing = user_facing_errors::Error::from(e); - let message = serde_json::to_string(&user_facing).unwrap(); - - napi::Error::from_reason(message) - } -} +pub(crate) use query_engine_common::error::*; diff --git a/query-engine/query-engine-node-api/src/functions.rs b/query-engine/query-engine-node-api/src/functions.rs index 5178d82d6120..298ed19aee5e 100644 --- a/query-engine/query-engine-node-api/src/functions.rs +++ b/query-engine/query-engine-node-api/src/functions.rs @@ -1,8 +1,4 @@ use napi_derive::napi; -use request_handlers::dmmf; -use std::sync::Arc; - -use crate::error::ApiError; #[derive(serde::Serialize, Clone, Copy)] #[napi(object)] @@ -19,21 +15,6 @@ pub fn version() -> Version { } } -#[napi] -pub fn dmmf(datamodel_string: String) -> napi::Result { - let mut schema = psl::validate(datamodel_string.into()); - - schema - .diagnostics - .to_result() - .map_err(|errors| ApiError::conversion(errors, schema.db.source()))?; - - let query_schema = query_core::schema::build(Arc::new(schema), true); - let dmmf = dmmf::render_dmmf(&query_schema); - - Ok(serde_json::to_string(&dmmf)?) -} - #[napi] pub fn debug_panic(panic_message: Option) -> napi::Result<()> { let user_facing = user_facing_errors::Error::from_panic_payload(Box::new( diff --git a/query-engine/query-engine-node-api/src/lib.rs b/query-engine/query-engine-node-api/src/lib.rs index 27583c43ad13..a08ddb0aca33 100644 --- a/query-engine/query-engine-node-api/src/lib.rs +++ b/query-engine/query-engine-node-api/src/lib.rs @@ -6,4 +6,3 @@ pub mod logger; mod tracer; pub(crate) type Result = std::result::Result; -pub(crate) type Executor = Box; diff --git a/query-engine/query-engine-node-api/src/logger.rs b/query-engine/query-engine-node-api/src/logger.rs index da3e725c0218..cfdc3d3db718 100644 --- a/query-engine/query-engine-node-api/src/logger.rs +++ b/query-engine/query-engine-node-api/src/logger.rs @@ -1,6 +1,7 @@ use core::fmt; use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode}; use query_core::telemetry; +use query_engine_common::logger::StringCallback; use query_engine_metrics::MetricRegistry; use serde_json::Value; use std::collections::BTreeMap; @@ -47,8 +48,10 @@ impl Logger { FilterExt::boxed(log_level) }; + let log_callback = CallbackLayer::new(log_callback); + let is_user_trace = filter_fn(telemetry::helpers::user_facing_span_only_filter); - let tracer = crate::tracer::new_pipeline().install_simple(log_callback.clone()); + let tracer = super::tracer::new_pipeline().install_simple(Box::new(log_callback.clone())); let telemetry = if enable_tracing { let telemetry = tracing_opentelemetry::layer() .with_tracer(tracer) @@ -58,7 +61,7 @@ impl Logger { None }; - let layer = CallbackLayer::new(log_callback).with_filter(filters); + let layer = log_callback.with_filter(filters); let metrics = if enable_metrics { query_engine_metrics::setup(); @@ -134,6 +137,7 @@ impl<'a> ToString for JsonVisitor<'a> { } } +#[derive(Clone)] pub(crate) struct CallbackLayer { callback: LogCallback, } @@ -144,14 +148,24 @@ impl CallbackLayer { } } +impl StringCallback for CallbackLayer { + fn call(&self, message: String) -> Result<(), String> { + let status = self.callback.call(message, ThreadsafeFunctionCallMode::Blocking); + + if status != napi::Status::Ok { + Err(format!("Could not call JS callback: {}", status)) + } else { + Ok(()) + } + } +} + // A tracing layer for sending logs to a js callback, layers are composable, subscribers are not. impl Layer for CallbackLayer { fn on_event(&self, event: &tracing::Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) { let mut visitor = JsonVisitor::new(event.metadata().level(), event.metadata().target()); event.record(&mut visitor); - let _ = self - .callback - .call(visitor.to_string(), ThreadsafeFunctionCallMode::Blocking); + let _ = self.call(visitor.to_string()); } } diff --git a/query-engine/query-engine-node-api/src/tracer.rs b/query-engine/query-engine-node-api/src/tracer.rs index 80ea25ded6cd..3bfae7b1e02d 100644 --- a/query-engine/query-engine-node-api/src/tracer.rs +++ b/query-engine/query-engine-node-api/src/tracer.rs @@ -1,94 +1 @@ -use async_trait::async_trait; -use napi::{threadsafe_function::ThreadsafeFunctionCallMode, Status}; -use opentelemetry::{ - global, sdk, - sdk::{ - export::trace::{ExportResult, SpanData, SpanExporter}, - propagation::TraceContextPropagator, - }, - trace::{TraceError, TracerProvider}, -}; -use query_core::telemetry; -use std::fmt::{self, Debug}; - -use crate::logger::LogCallback; - -/// Pipeline builder -#[derive(Debug)] -pub struct PipelineBuilder { - trace_config: Option, -} - -/// Create a new stdout exporter pipeline builder. -pub fn new_pipeline() -> PipelineBuilder { - PipelineBuilder::default() -} - -impl Default for PipelineBuilder { - /// Return the default pipeline builder. - fn default() -> Self { - Self { trace_config: None } - } -} - -impl PipelineBuilder { - /// Assign the SDK trace configuration. - #[allow(dead_code)] - pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { - self.trace_config = Some(config); - self - } -} - -impl PipelineBuilder { - pub fn install_simple(mut self, log_callback: LogCallback) -> sdk::trace::Tracer { - global::set_text_map_propagator(TraceContextPropagator::new()); - let exporter = ClientSpanExporter::new(log_callback); - - let mut provider_builder = sdk::trace::TracerProvider::builder().with_simple_exporter(exporter); - // This doesn't work at the moment because we create the logger outside of an async runtime - // we could later move the creation of logger into the `connect` function - // let mut provider_builder = sdk::trace::TracerProvider::builder().with_batch_exporter(exporter, runtime::Tokio); - // remember to add features = ["rt-tokio"] to the cargo.toml - if let Some(config) = self.trace_config.take() { - provider_builder = provider_builder.with_config(config); - } - let provider = provider_builder.build(); - let tracer = provider.tracer("opentelemetry"); - global::set_tracer_provider(provider); - - tracer - } -} - -/// A [`ClientSpanExporter`] that sends spans to the JS callback. -pub struct ClientSpanExporter { - callback: LogCallback, -} - -impl ClientSpanExporter { - pub fn new(callback: LogCallback) -> Self { - Self { callback } - } -} - -impl Debug for ClientSpanExporter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ClientSpanExporter").finish() - } -} - -#[async_trait] -impl SpanExporter for ClientSpanExporter { - /// Export spans to stdout - async fn export(&mut self, batch: Vec) -> ExportResult { - let result = telemetry::helpers::spans_to_json(batch); - let status = self.callback.call(result, ThreadsafeFunctionCallMode::Blocking); - - if status != Status::Ok { - return Err(TraceError::from(format!("Could not call JS callback: {}", status))); - } - - Ok(()) - } -} +pub(crate) use query_engine_common::tracer::*; diff --git a/query-engine/query-engine-wasm/Cargo.toml b/query-engine/query-engine-wasm/Cargo.toml index 1abe891bc69a..dbf922f4e914 100644 --- a/query-engine/query-engine-wasm/Cargo.toml +++ b/query-engine/query-engine-wasm/Cargo.toml @@ -11,7 +11,7 @@ name = "query_engine_wasm" [dependencies] query-connector = { path = "../connectors/query-connector" } - +query-engine-common = { path = "../../libs/query-engine-common" } anyhow = "1" async-trait = "0.1" user-facing-errors = { path = "../../libs/user-facing-errors" } diff --git a/query-engine/query-engine-wasm/src/wasm.rs b/query-engine/query-engine-wasm/src/wasm.rs index 8174dc8738c4..aa8172044608 100644 --- a/query-engine/query-engine-wasm/src/wasm.rs +++ b/query-engine/query-engine-wasm/src/wasm.rs @@ -3,5 +3,3 @@ pub mod error; pub mod functions; pub mod logger; mod tracer; - -pub(crate) type Executor = Box; diff --git a/query-engine/query-engine-wasm/src/wasm/engine.rs b/query-engine/query-engine-wasm/src/wasm/engine.rs index fe087b5f90cd..32722bae6b85 100644 --- a/query-engine/query-engine-wasm/src/wasm/engine.rs +++ b/query-engine/query-engine-wasm/src/wasm/engine.rs @@ -9,19 +9,19 @@ use driver_adapters::JsObject; use js_sys::Function as JsFunction; use query_core::{ protocol::EngineProtocol, - schema::{self, QuerySchema}, - telemetry, QueryExecutor, TransactionOptions, TxId, + schema::{self}, + telemetry, TransactionOptions, TxId, }; +use query_engine_common::engine::{map_known_error, ConnectedEngine, ConstructorOptions, EngineBuilder, Inner}; use request_handlers::ConnectorKind; use request_handlers::{load_executor, RequestBody, RequestHandler}; -use serde::{Deserialize, Serialize}; use serde_json::json; use std::sync::Arc; use tokio::sync::RwLock; use tracing::{field, instrument::WithSubscriber, Instrument, Span}; use tracing_subscriber::filter::LevelFilter; -use tsify::Tsify; use wasm_bindgen::prelude::wasm_bindgen; + /// The main query engine used by JS #[wasm_bindgen] pub struct QueryEngine { @@ -29,85 +29,6 @@ pub struct QueryEngine { logger: Logger, } -/// The state of the engine. -enum Inner { - /// Not connected, holding all data to form a connection. - Builder(EngineBuilder), - /// A connected engine, holding all data to disconnect and form a new - /// connection. Allows querying when on this state. - Connected(ConnectedEngine), -} - -/// Everything needed to connect to the database and have the core running. -struct EngineBuilder { - schema: Arc, - engine_protocol: EngineProtocol, -} - -/// Internal structure for querying and reconnecting with the engine. -struct ConnectedEngine { - schema: Arc, - query_schema: Arc, - executor: crate::Executor, - engine_protocol: EngineProtocol, -} - -/// Returned from the `serverInfo` method in javascript. -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct ServerInfo { - commit: String, - version: String, - primary_connector: Option, -} - -impl ConnectedEngine { - /// The schema AST for Query Engine core. - pub fn query_schema(&self) -> &Arc { - &self.query_schema - } - - /// The query executor. - pub fn executor(&self) -> &(dyn QueryExecutor + Send + Sync) { - self.executor.as_ref() - } - - pub fn engine_protocol(&self) -> EngineProtocol { - self.engine_protocol - } -} - -/// Parameters defining the construction of an engine. -#[derive(Debug, Deserialize, Tsify)] -#[tsify(from_wasm_abi)] -#[serde(rename_all = "camelCase")] -pub struct ConstructorOptions { - datamodel: String, - log_level: String, - #[serde(default)] - log_queries: bool, - #[serde(default)] - engine_protocol: Option, -} - -impl Inner { - /// Returns a builder if the engine is not connected - fn as_builder(&self) -> crate::Result<&EngineBuilder> { - match self { - Inner::Builder(ref builder) => Ok(builder), - Inner::Connected(_) => Err(ApiError::AlreadyConnected), - } - } - - /// Returns the engine if connected - fn as_engine(&self) -> crate::Result<&ConnectedEngine> { - match self { - Inner::Builder(_) => Err(ApiError::NotConnected), - Inner::Connected(ref engine) => Ok(engine), - } - } -} - #[wasm_bindgen] impl QueryEngine { /// Parse a validated datamodel and configuration to allow connecting later on. @@ -136,7 +57,7 @@ impl QueryEngine { sql_connector::activate_driver_adapter(Arc::new(js_queryable)); let provider_name = schema.connector.provider_name(); - tracing::info!("Received driver adapter for {provider_name}."); + tracing::info!("Registered driver adapter for {provider_name}."); schema .diagnostics @@ -355,10 +276,3 @@ impl QueryEngine { Err(ApiError::configuration("Metrics is not enabled in Wasm.").into()) } } - -fn map_known_error(err: query_core::CoreError) -> crate::Result { - let user_error: user_facing_errors::Error = err.into(); - let value = serde_json::to_string(&user_error)?; - - Ok(value) -} diff --git a/query-engine/query-engine-wasm/src/wasm/error.rs b/query-engine/query-engine-wasm/src/wasm/error.rs index cfabc92ea0b0..8c385ddf487a 100644 --- a/query-engine/query-engine-wasm/src/wasm/error.rs +++ b/query-engine/query-engine-wasm/src/wasm/error.rs @@ -1,94 +1 @@ -use psl::diagnostics::Diagnostics; -use query_connector::error::ConnectorError; -use query_core::CoreError; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum ApiError { - #[error("{:?}", _0)] - Conversion(Diagnostics, String), - - #[error("{}", _0)] - Configuration(String), - - #[error("{}", _0)] - Core(CoreError), - - #[error("{}", _0)] - Connector(ConnectorError), - - #[error("Can't modify an already connected engine.")] - AlreadyConnected, - - #[error("Engine is not yet connected.")] - NotConnected, - - #[error("{}", _0)] - JsonDecode(String), -} - -impl From for user_facing_errors::Error { - fn from(err: ApiError) -> Self { - use std::fmt::Write as _; - - match err { - ApiError::Connector(ConnectorError { - user_facing_error: Some(err), - .. - }) => err.into(), - ApiError::Conversion(errors, dml_string) => { - let mut full_error = errors.to_pretty_string("schema.prisma", &dml_string); - write!(full_error, "\nValidation Error Count: {}", errors.errors().len()).unwrap(); - - user_facing_errors::Error::from(user_facing_errors::KnownError::new( - user_facing_errors::common::SchemaParserError { full_error }, - )) - } - ApiError::Core(error) => user_facing_errors::Error::from(error), - other => user_facing_errors::Error::new_non_panic_with_current_backtrace(other.to_string()), - } - } -} - -impl ApiError { - pub fn conversion(diagnostics: Diagnostics, dml: impl ToString) -> Self { - Self::Conversion(diagnostics, dml.to_string()) - } - - pub fn configuration(msg: impl ToString) -> Self { - Self::Configuration(msg.to_string()) - } -} - -impl From for ApiError { - fn from(e: CoreError) -> Self { - match e { - CoreError::ConfigurationError(message) => Self::Configuration(message), - core_error => Self::Core(core_error), - } - } -} - -impl From for ApiError { - fn from(e: ConnectorError) -> Self { - Self::Connector(e) - } -} - -impl From for ApiError { - fn from(e: url::ParseError) -> Self { - Self::configuration(format!("Error parsing connection string: {e}")) - } -} - -impl From for ApiError { - fn from(e: connection_string::Error) -> Self { - Self::configuration(format!("Error parsing connection string: {e}")) - } -} - -impl From for ApiError { - fn from(e: serde_json::Error) -> Self { - Self::JsonDecode(format!("{e}")) - } -} +pub(crate) use query_engine_common::error::*; diff --git a/query-engine/query-engine-wasm/src/wasm/logger.rs b/query-engine/query-engine-wasm/src/wasm/logger.rs index c0ccbf7f2a3e..cf38a88e3f29 100644 --- a/query-engine/query-engine-wasm/src/wasm/logger.rs +++ b/query-engine/query-engine-wasm/src/wasm/logger.rs @@ -3,6 +3,7 @@ use core::fmt; use js_sys::Function as JsFunction; use query_core::telemetry; +use query_engine_common::logger::StringCallback; use serde_json::Value; use std::collections::BTreeMap; use tracing::{ @@ -20,15 +21,6 @@ use wasm_bindgen::JsValue; #[derive(Clone)] pub struct LogCallback(pub JsFunction); -impl LogCallback { - pub fn call>(&self, arg1: T) -> Result<(), String> { - self.0 - .call1(&JsValue::NULL, &arg1.into()) - .map(|_| ()) - .map_err(|err| err.as_string().unwrap_or_default()) - } -} - unsafe impl Send for LogCallback {} unsafe impl Sync for LogCallback {} @@ -55,8 +47,10 @@ impl Logger { FilterExt::boxed(log_level) }; + let log_callback = CallbackLayer::new(log_callback); + let is_user_trace = filter_fn(telemetry::helpers::user_facing_span_only_filter); - let tracer = super::tracer::new_pipeline().install_simple(log_callback.clone()); + let tracer = super::tracer::new_pipeline().install_simple(Box::new(log_callback.clone())); let telemetry = if enable_tracing { let telemetry = tracing_opentelemetry::layer() .with_tracer(tracer) @@ -66,7 +60,7 @@ impl Logger { None }; - let layer = CallbackLayer::new(log_callback).with_filter(filters); + let layer = log_callback.with_filter(filters); Self { dispatcher: Dispatch::new(Registry::default().with(telemetry).with(layer)), @@ -130,6 +124,7 @@ impl<'a> ToString for JsonVisitor<'a> { } } +#[derive(Clone)] pub(crate) struct CallbackLayer { callback: LogCallback, } @@ -140,12 +135,22 @@ impl CallbackLayer { } } +impl StringCallback for CallbackLayer { + fn call(&self, message: String) -> Result<(), String> { + self.callback + .0 + .call1(&JsValue::NULL, &message.into()) + .map(|_| ()) + .map_err(|err| format!("Could not call JS callback: {}", err.as_string().unwrap_or_default())) + } +} + // A tracing layer for sending logs to a js callback, layers are composable, subscribers are not. impl Layer for CallbackLayer { fn on_event(&self, event: &tracing::Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) { let mut visitor = JsonVisitor::new(event.metadata().level(), event.metadata().target()); event.record(&mut visitor); - let _ = self.callback.call(visitor.to_string()); + let _ = self.call(visitor.to_string()); } } diff --git a/query-engine/query-engine-wasm/src/wasm/tracer.rs b/query-engine/query-engine-wasm/src/wasm/tracer.rs index 7bcd1ab81043..3bfae7b1e02d 100644 --- a/query-engine/query-engine-wasm/src/wasm/tracer.rs +++ b/query-engine/query-engine-wasm/src/wasm/tracer.rs @@ -1,93 +1 @@ -use async_trait::async_trait; -use opentelemetry::{ - global, sdk, - sdk::{ - export::trace::{ExportResult, SpanData, SpanExporter}, - propagation::TraceContextPropagator, - }, - trace::{TraceError, TracerProvider}, -}; -use query_core::telemetry; -use std::fmt::{self, Debug}; - -use crate::logger::LogCallback; - -/// Pipeline builder -#[derive(Debug)] -pub struct PipelineBuilder { - trace_config: Option, -} - -/// Create a new stdout exporter pipeline builder. -pub fn new_pipeline() -> PipelineBuilder { - PipelineBuilder::default() -} - -impl Default for PipelineBuilder { - /// Return the default pipeline builder. - fn default() -> Self { - Self { trace_config: None } - } -} - -impl PipelineBuilder { - /// Assign the SDK trace configuration. - #[allow(dead_code)] - pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { - self.trace_config = Some(config); - self - } -} - -impl PipelineBuilder { - pub fn install_simple(mut self, log_callback: LogCallback) -> sdk::trace::Tracer { - global::set_text_map_propagator(TraceContextPropagator::new()); - let exporter = ClientSpanExporter::new(log_callback); - - let mut provider_builder = sdk::trace::TracerProvider::builder().with_simple_exporter(exporter); - // This doesn't work at the moment because we create the logger outside of an async runtime - // we could later move the creation of logger into the `connect` function - // let mut provider_builder = sdk::trace::TracerProvider::builder().with_batch_exporter(exporter, runtime::Tokio); - // remember to add features = ["rt-tokio"] to the cargo.toml - if let Some(config) = self.trace_config.take() { - provider_builder = provider_builder.with_config(config); - } - let provider = provider_builder.build(); - let tracer = provider.tracer("opentelemetry"); - global::set_tracer_provider(provider); - - tracer - } -} - -/// A [`ClientSpanExporter`] that sends spans to the JS callback. -pub struct ClientSpanExporter { - callback: LogCallback, -} - -impl ClientSpanExporter { - pub fn new(callback: LogCallback) -> Self { - Self { callback } - } -} - -impl Debug for ClientSpanExporter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ClientSpanExporter").finish() - } -} - -#[async_trait] -impl SpanExporter for ClientSpanExporter { - /// Export spans to stdout - async fn export(&mut self, batch: Vec) -> ExportResult { - let result = telemetry::helpers::spans_to_json(batch); - let status = self.callback.call(result); - - if let Err(err) = status { - return Err(TraceError::from(format!("Could not call JS callback: {}", err))); - } - - Ok(()) - } -} +pub(crate) use query_engine_common::tracer::*;