Skip to content

Commit

Permalink
feat(js-connectors): Proper JS error propagation (#4186)
Browse files Browse the repository at this point in the history
* feat(js-connectors): Proper JS error propogation

The way it is implemented:
1. On TS side, we catch all errors from connector methods. In case we
   encounterd an error, we store it on per-connector error registry and
   return it's numeric id to the engine.
2. Engine propogates that numeric id all the way throuhg quaint up to
   query-engine-node-api, where it becomes new kind of error,
   `ExternalError` with a code of P2036 and error id in the meta field.
3. Client is then expected to pick the error from the registry and
   re-throw it. This PR implements this for smoke-tests, similar
   implementation needs to be done in the client.

Implementation is done this way, rather than propogating `napi::Error`
instance, to avoid dependencies on napi types in all intermediate crates
other than `js-connectors` and `query-engine-node-api`.

To better accomodate this pattern, `Result` type was introduced to the
connectors. It allows to explicitly indicate if particular call
succeeded or failed via return value. On Rust side, those values get
parsed and converted into `std::result::Result` values.

On Rust side, `AsyncJsFunction` wrapper over `ThreadsafeFunction` is
introduced. It handles promises and JS result type conversions
automatically and simplifies `Proxy` code.

This also lays the foundation for supporting "Known" errors - JsError
type could be extended in a future with new error types, that could be
converted into standard prisma errors with normal error codes.

Fix #prisma/team-orm#260

* Address review feedback

---------

Co-authored-by: jkomyno <skiabo97@gmail.com>
  • Loading branch information
SevInf and jkomyno committed Aug 31, 2023
1 parent c67600c commit bb89459
Show file tree
Hide file tree
Showing 18 changed files with 410 additions and 192 deletions.
7 changes: 7 additions & 0 deletions libs/user-facing-errors/src/query_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,10 @@ pub struct DatabaseAssertionViolation {
/// Database error returned by the underlying connector driver.
pub database_error: String,
}

#[derive(Debug, UserFacingError, Serialize)]
#[user_facing(code = "P2036", message = "Error in external connector (id {id})")]
pub struct ExternalError {
/// id of the error in external system, which would allow to retrieve it later
pub id: i32,
}
8 changes: 8 additions & 0 deletions quaint/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ impl Error {
pub fn raw_connector_error(status: String, reason: String) -> Error {
Error::builder(ErrorKind::RawConnectorError { status, reason }).build()
}

// Builds an error from an externally stored error
pub fn external_error(error_id: i32) -> Error {
Error::builder(ErrorKind::ExternalError(error_id)).build()
}
}

impl fmt::Display for Error {
Expand Down Expand Up @@ -272,6 +277,9 @@ pub enum ErrorKind {

#[error("Column type '{}' could not be deserialized from the database.", column_type)]
UnsupportedColumnType { column_type: String },

#[error("External error id#{}", _0)]
ExternalError(i32),
}

impl ErrorKind {
Expand Down
6 changes: 6 additions & 0 deletions query-engine/connectors/query-connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ impl ConnectorError {
message: message.clone(),
},
)),
ErrorKind::ExternalError(id) => Some(user_facing_errors::KnownError::new(
user_facing_errors::query_engine::ExternalError { id: id.to_owned() },
)),
_ => None,
};

Expand Down Expand Up @@ -266,6 +269,9 @@ pub enum ErrorKind {

#[error("Unsupported connector: {0}")]
UnsupportedConnector(String),

#[error("External connector error")]
ExternalError(i32),
}

impl From<DomainError> for ConnectorError {
Expand Down
5 changes: 5 additions & 0 deletions query-engine/connectors/sql-query-connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ pub enum SqlError {

#[error("Cannot find a fulltext index to use for the search")]
MissingFullTextSearchIndex,

#[error("External connector error")]
ExternalError(i32),
}

impl SqlError {
Expand Down Expand Up @@ -254,6 +257,7 @@ impl SqlError {
}
SqlError::MissingFullTextSearchIndex => ConnectorError::from_kind(ErrorKind::MissingFullTextSearchIndex),
SqlError::InvalidIsolationLevel(msg) => ConnectorError::from_kind(ErrorKind::InternalConversionError(msg)),
SqlError::ExternalError(error_id) => ConnectorError::from_kind(ErrorKind::ExternalError(error_id)),
}
}
}
Expand Down Expand Up @@ -295,6 +299,7 @@ impl From<quaint::error::Error> for SqlError {
QuaintKind::InvalidIsolationLevel(msg) => Self::InvalidIsolationLevel(msg),
QuaintKind::TransactionWriteConflict => Self::TransactionWriteConflict,
QuaintKind::RollbackWithoutBegin => Self::RollbackWithoutBegin,
QuaintKind::ExternalError(error_id) => Self::ExternalError(error_id),
e @ QuaintKind::UnsupportedColumnType { .. } => SqlError::ConversionError(e.into()),
e @ QuaintKind::TransactionAlreadyClosed(_) => SqlError::TransactionAlreadyClosed(format!("{e}")),
e @ QuaintKind::IncorrectNumberOfParameters { .. } => SqlError::QueryError(e.into()),
Expand Down
67 changes: 54 additions & 13 deletions query-engine/js-connectors/js/js-connector-utils/src/binder.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,64 @@
import type { Connector, Transaction } from './types';
import type { ErrorCapturingConnector, Connector, Transaction, ErrorRegistry, ErrorRecord, Result } from './types';


class ErrorRegistryInternal implements ErrorRegistry {
private registeredErrors: ErrorRecord[] = []

consumeError(id: number): ErrorRecord | undefined {
return this.registeredErrors[id]
}

registerNewError(error: unknown) {
let i=0;
while (this.registeredErrors[i] !== undefined) {
i++
}
this.registeredErrors[i] = { error }
return i
}

}

// *.bind(connector) is required to preserve the `this` context of functions whose
// execution is delegated to napi.rs.
export const bindConnector = (connector: Connector): Connector => ({
queryRaw: connector.queryRaw.bind(connector),
executeRaw: connector.executeRaw.bind(connector),
flavour: connector.flavour,
startTransaction: connector.startTransaction.bind(connector),
close: connector.close.bind(connector)
})
export const bindConnector = (connector: Connector): ErrorCapturingConnector => {
const errorRegistry = new ErrorRegistryInternal()

return {
errorRegistry,
queryRaw: wrapAsync(errorRegistry, connector.queryRaw.bind(connector)),
executeRaw: wrapAsync(errorRegistry, connector.executeRaw.bind(connector)),
flavour: connector.flavour,
startTransaction: async (...args) => {
const result = await connector.startTransaction(...args);
if (result.ok) {
return { ok: true, value: bindTransaction(errorRegistry, result.value)}
}
return result
},
close: wrapAsync(errorRegistry, connector.close.bind(connector))
}
}

// *.bind(transaction) is required to preserve the `this` context of functions whose
// execution is delegated to napi.rs.
export const bindTransaction = (transaction: Transaction): Transaction => {
const bindTransaction = (errorRegistry: ErrorRegistryInternal, transaction: Transaction): Transaction => {
return ({
flavour: transaction.flavour,
queryRaw: transaction.queryRaw.bind(transaction),
executeRaw: transaction.executeRaw.bind(transaction),
commit: transaction.commit.bind(transaction),
rollback: transaction.rollback.bind(transaction)
queryRaw: wrapAsync(errorRegistry, transaction.queryRaw.bind(transaction)),
executeRaw: wrapAsync(errorRegistry, transaction.executeRaw.bind(transaction)),
commit: wrapAsync(errorRegistry, transaction.commit.bind(transaction)),
rollback: wrapAsync(errorRegistry, transaction.rollback.bind(transaction))
});
}

function wrapAsync<A extends unknown[], R>(registry: ErrorRegistryInternal, fn: (...args: A) => Promise<Result<R>>): (...args: A) => Promise<Result<R>> {
return async (...args) => {
try {
return await fn(...args)
} catch (error) {
const id = registry.registerNewError(error)
return { ok: false, error: { kind: 'GenericJsError', id } }
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export { bindConnector, bindTransaction } from './binder'
export { bindConnector } from './binder'
export { ColumnTypeEnum } from './const'
export { Debug } from './debug'
export type * from './types'
35 changes: 29 additions & 6 deletions query-engine/js-connectors/js/js-connector-utils/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ export type Query = {
args: Array<unknown>
}

export type Error = {
kind: 'GenericJsError',
id: number
}

export type Result<T> = {
ok: true,
value: T
} | {
ok: false,
error: Error
}

export interface Queryable {
readonly flavour: 'mysql' | 'postgres'

Expand All @@ -41,7 +54,7 @@ export interface Queryable {
*
* This is the preferred way of executing `SELECT` queries.
*/
queryRaw(params: Query): Promise<ResultSet>
queryRaw(params: Query): Promise<Result<ResultSet>>

/**
* Execute a query given as SQL, interpolating the given parameters,
Expand All @@ -50,31 +63,35 @@ export interface Queryable {
* This is the preferred way of executing `INSERT`, `UPDATE`, `DELETE` queries,
* as well as transactional queries.
*/
executeRaw(params: Query): Promise<number>
executeRaw(params: Query): Promise<Result<number>>
}

export interface Connector extends Queryable {
/**
* Starts new transation with the specified isolation level
* @param isolationLevel
*/
startTransaction(isolationLevel?: string): Promise<Transaction>
startTransaction(isolationLevel?: string): Promise<Result<Transaction>>

/**
* Closes the connection to the database, if any.
*/
close: () => Promise<void>
close: () => Promise<Result<void>>
}

export interface Transaction extends Queryable {
/**
* Commit the transaction
*/
commit(): Promise<void>
commit(): Promise<Result<void>>
/**
* Rolls back the transaction.
*/
rollback(): Promise<void>
rollback(): Promise<Result<void>>
}

export interface ErrorCapturingConnector extends Connector {
readonly errorRegistry: ErrorRegistry
}

/**
Expand All @@ -86,3 +103,9 @@ export type ConnectorConfig = {
*/
url: string,
}

export interface ErrorRegistry {
consumeError(id: number): ErrorRecord | undefined
}

export type ErrorRecord = { error: unknown }
32 changes: 18 additions & 14 deletions query-engine/js-connectors/js/neon-js-connector/src/neon.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { FullQueryResults, PoolClient, neon, neonConfig } from '@neondatabase/serverless'
import { NeonConfig, NeonQueryFunction, Pool, QueryResult } from '@neondatabase/serverless'
import ws from 'ws'
import { bindConnector, bindTransaction, Debug } from '@jkomyno/prisma-js-connector-utils'
import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction } from '@jkomyno/prisma-js-connector-utils'
import { bindConnector, Debug } from '@jkomyno/prisma-js-connector-utils'
import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction, Result, ErrorCapturingConnector } from '@jkomyno/prisma-js-connector-utils'
import { fieldToColumnType } from './conversion'

neonConfig.webSocketConstructor = ws
Expand All @@ -22,7 +22,7 @@ type PerformIOResult = QueryResult<any> | FullQueryResults<ARRAY_MODE_DISABLED>
abstract class NeonQueryable implements Queryable {
flavour = 'postgres' as const

async queryRaw(query: Query): Promise<ResultSet> {
async queryRaw(query: Query): Promise<Result<ResultSet>> {
const tag = '[js::query_raw]'
debug(`${tag} %O`, query)

Expand All @@ -35,15 +35,15 @@ abstract class NeonQueryable implements Queryable {
rows: results.map(result => columns.map(column => result[column])),
}

return resultSet
return { ok: true, value: resultSet }
}

async executeRaw(query: Query): Promise<number> {
async executeRaw(query: Query): Promise<Result<number>> {
const tag = '[js::execute_raw]'
debug(`${tag} %O`, query)

const { rowCount: rowsAffected } = await this.performIO(query)
return rowsAffected
return { ok: true, value: rowsAffected }
}

abstract performIO(query: Query): Promise<PerformIOResult>
Expand Down Expand Up @@ -71,17 +71,19 @@ class NeonWsQueryable<ClientT extends Pool|PoolClient> extends NeonQueryable {
}

class NeonTransaction extends NeonWsQueryable<PoolClient> implements Transaction {
async commit(): Promise<void> {
async commit(): Promise<Result<void>> {
try {
await this.client.query('COMMIT');
return { ok: true, value: undefined }
} finally {
this.client.release()
}
}

async rollback(): Promise<void> {
async rollback(): Promise<Result<void>> {
try {
await this.client.query('ROLLBACK');
return { ok: true, value: undefined }
} finally {
this.client.release()
}
Expand All @@ -96,22 +98,22 @@ class NeonWsConnector extends NeonWsQueryable<Pool> implements Connector {
super(new Pool({ connectionString, ...rest }))
}

async startTransaction(isolationLevel?: string | undefined): Promise<Transaction> {
async startTransaction(isolationLevel?: string | undefined): Promise<Result<Transaction>> {
const connection = await this.client.connect()
await connection.query('BEGIN')
if (isolationLevel) {
await connection.query(`SET TRANSACTION ISOLATION LEVEL ${isolationLevel}`)
}

return bindTransaction(new NeonTransaction(connection))
return { ok: true, value: new NeonTransaction(connection) }
}

async close() {
this.client.on('error', e => console.log(e))
if (this.isRunning) {
await this.client.end()
this.isRunning = false
}
return { ok: true as const, value: undefined }
}
}

Expand All @@ -129,15 +131,17 @@ class NeonHttpConnector extends NeonQueryable implements Connector {
return await this.client(sql, values)
}

startTransaction(): Promise<Transaction> {
startTransaction(): Promise<Result<Transaction>> {
return Promise.reject(new Error('Transactions are not supported in HTTP mode'))
}

async close() {}
async close() {
return { ok: true as const, value: undefined }
}

}

export const createNeonConnector = (config: PrismaNeonConfig): Connector => {
export const createNeonConnector = (config: PrismaNeonConfig): ErrorCapturingConnector => {
const db = config.httpMode ? new NeonHttpConnector(config) : new NeonWsConnector(config)
return bindConnector(db)
}
Loading

0 comments on commit bb89459

Please sign in to comment.