From f5fc07adcc8e9da4578d9400d2bbe93bd5032cdd Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 29 Aug 2024 12:03:27 +0200 Subject: [PATCH 1/5] Refactor driver api. --- packages/driver/src/driver-api.ts | 63 ++++--- packages/driver/src/node/impl.ts | 158 ++++++------------ packages/driver/src/util/ErrorStatement.ts | 44 +++-- .../src/worker_threads/WorkerDriverAdapter.ts | 49 ++---- .../src/worker_threads/async-commands.ts | 69 +++----- .../src/worker_threads/worker-driver.ts | 99 ++++++----- 6 files changed, 217 insertions(+), 265 deletions(-) diff --git a/packages/driver/src/driver-api.ts b/packages/driver/src/driver-api.ts index 15e2266..0396078 100644 --- a/packages/driver/src/driver-api.ts +++ b/packages/driver/src/driver-api.ts @@ -5,14 +5,11 @@ export type SqliteArguments = | null | undefined; -export type SqliteRowRaw = SqliteValue[]; -export type SqliteRowObject = Record; -export type SqliteRow = SqliteRowRaw | SqliteRowObject; +export type SqliteArrayRow = SqliteValue[]; +export type SqliteObjectRow = Record; export interface PrepareOptions { - bigint?: boolean; - rawResults?: boolean; - persist?: boolean; + autoFinalize?: boolean; } export interface ResetOptions { @@ -32,42 +29,56 @@ export interface SqliteDriverConnection { options?: { tables?: string[]; batchLimit?: number } ): () => void; - getLastChanges(): Promise; - close(): Promise; } export type SqliteParameterBinding = - | (SqliteValue | undefined)[] + | SqliteValue[] | Record | null | undefined; -export interface SqliteStepResult { - rows?: SqliteRow[]; - done?: boolean; +export interface QueryOptions { + requireTransaction?: boolean; + bigint?: boolean; } -export interface SqliteDriverStatement { - getColumns(): Promise; +export interface StreamQueryOptions extends QueryOptions { + chunkMaxRows?: number; + chunkMaxSize?: number; +} - bind(parameters: SqliteParameterBinding): void; - step(n?: number, options?: StepOptions): Promise; - finalize(): void; - reset(options?: ResetOptions): void; +export interface SqliteDriverStatement { + all( + parameters?: SqliteParameterBinding, + options?: QueryOptions + ): Promise; + allArray( + parameters: SqliteParameterBinding, + options?: QueryOptions + ): Promise; + + stream( + parameters?: SqliteParameterBinding, + options?: StreamQueryOptions + ): AsyncIterator; + streamArray( + parameters?: SqliteParameterBinding, + options?: StreamQueryOptions + ): AsyncIterator; /** - * Similar to step, followed by reset, and returning number of changed rows. - * - * Avoids the need to use a separate statement to get changes. + * Run a query, and return the number of changed rows, and last insert id. */ - run(options?: StepOptions): Promise; + run( + parameters?: SqliteParameterBinding, + options?: QueryOptions + ): Promise; - [Symbol.dispose](): void; -} + getColumns(): Promise; -export interface StepOptions { - requireTransaction?: boolean; + finalize(): void; + [Symbol.dispose](): void; } export interface SqliteDriverConnectionPool { diff --git a/packages/driver/src/node/impl.ts b/packages/driver/src/node/impl.ts index fc457f1..b0d1197 100644 --- a/packages/driver/src/node/impl.ts +++ b/packages/driver/src/node/impl.ts @@ -2,16 +2,15 @@ import type * as sqlite from './node-sqlite.js'; import { PrepareOptions, - ResetOptions, + QueryOptions, + SqliteArrayRow, SqliteChanges, SqliteDriverConnection, SqliteDriverConnectionPool, SqliteDriverStatement, + SqliteObjectRow, SqliteParameterBinding, - SqliteRow, - SqliteStepResult, - SqliteValue, - StepOptions, + StreamQueryOptions, UpdateListener } from '../driver-api.js'; @@ -43,11 +42,6 @@ interface InternalStatement extends SqliteDriverStatement { class NodeSqliteSyncStatement implements InternalStatement { public statement: sqlite.StatementSync; - private options: PrepareOptions; - private bindPositional: SqliteValue[] = []; - private bindNamed: Record = {}; - private statementDone = false; - private iterator: Iterator | undefined = undefined; readonly persisted: boolean; @@ -55,8 +49,7 @@ class NodeSqliteSyncStatement implements InternalStatement { constructor(statement: sqlite.StatementSync, options: PrepareOptions) { this.statement = statement; - this.options = options; - this.persisted = options.persist ?? false; + this.persisted = options.autoFinalize ?? false; if (typeof Symbol.dispose != 'undefined') { this[Symbol.dispose] = () => this.finalize(); @@ -72,120 +65,69 @@ class NodeSqliteSyncStatement implements InternalStatement { return []; } - bind(parameters: SqliteParameterBinding): void { - if (parameters == null) { - return; - } - if (Array.isArray(parameters)) { - let bindArray = this.bindPositional; - - for (let i = 0; i < parameters.length; i++) { - if (typeof parameters[i] != 'undefined') { - bindArray[i] = parameters[i]!; - } - } - } else { - let previous = this.bindNamed; - this.bindNamed = { ...previous, ...parameters }; - } - } - - async run(options?: StepOptions): Promise { + async run( + parameters: SqliteParameterBinding, + options?: QueryOptions + ): Promise { try { if (options?.requireTransaction) { // TODO: Implement } const statement = this.statement; - this.reset(); - - try { - const bindNamed = this.bindNamed; - const bindPositional = this.bindPositional; - - statement.setReadBigInts(true); - const r = statement.run(bindNamed, ...bindPositional); - return { - changes: Number(r.changes), - lastInsertRowId: r.lastInsertRowid as bigint - }; - } finally { - this.reset(); - } + statement.setReadBigInts(true); + const r = statement.run(...convertParameters(parameters)); + return { + changes: Number(r.changes), + lastInsertRowId: r.lastInsertRowid as bigint + }; } catch (e) { throw mapError(e); } } - async step(n?: number, options?: StepOptions): Promise { + async all( + parameters: SqliteParameterBinding, + options?: QueryOptions + ): Promise { try { - const all = n == null; - - const statement = this.statement; - if (this.statementDone) { - return { done: true }; - } - if (options?.requireTransaction) { - // TODO: implement + // TODO: Implement } - const bindNamed = this.bindNamed; - const bindPositional = this.bindPositional; - - let iterator = this.iterator; - const num_rows = n ?? 1; - if (iterator == null) { - if (this.options.rawResults) { - // Not supported - } - if (this.options.bigint) { - statement.setReadBigInts(true); - } - iterator = statement - .all(bindNamed, ...bindPositional) - [Symbol.iterator](); - this.iterator = iterator; - } - let rows: SqliteRow[] = []; - let isDone = false; - for (let i = 0; i < num_rows || all; i++) { - const { value, done } = iterator.next(); - if (done) { - isDone = true; - break; - } - rows.push(value as SqliteRow); - } - if (isDone) { - this.statementDone = true; - } - return { rows, done: isDone }; + const statement = this.statement; + statement.setReadBigInts(options?.bigint ?? false); + const rows = statement.all(...convertParameters(parameters)); + return rows; } catch (e) { throw mapError(e); } } - finalize(): void { - const existingIter = this.iterator; - if (existingIter != null) { - existingIter.return?.(); - } - this.iterator = undefined; - this.statementDone = false; + allArray( + parameters: SqliteParameterBinding, + options: QueryOptions + ): Promise { + throw new Error('array rows are not supported'); } - reset(options?: ResetOptions): void { - if (this.iterator) { - const iter = this.iterator; - iter.return?.(); - this.iterator = undefined; - } - if (options?.clearBindings) { - this.bindNamed = {}; - this.bindPositional = []; - } - this.statementDone = false; + async *stream( + parameters: SqliteParameterBinding, + options: StreamQueryOptions + ): AsyncIterator { + const rows = await this.all(parameters, options); + yield rows; + } + + streamArray( + parameters: SqliteParameterBinding, + options: StreamQueryOptions + ): AsyncIterator { + throw new Error('array rows are not supported'); + } + + finalize(): void { + // We don't use any iterators internally - nothing to cancel here } } @@ -243,3 +185,11 @@ export class NodeSqliteConnection implements SqliteDriverConnection { throw new Error('not supported yet'); } } + +function convertParameters(parameters: SqliteParameterBinding): any[] { + if (Array.isArray(parameters)) { + return parameters; + } else { + return [parameters]; + } +} diff --git a/packages/driver/src/util/ErrorStatement.ts b/packages/driver/src/util/ErrorStatement.ts index 6f401d7..b3d227b 100644 --- a/packages/driver/src/util/ErrorStatement.ts +++ b/packages/driver/src/util/ErrorStatement.ts @@ -1,11 +1,12 @@ import { PrepareOptions, - ResetOptions, + QueryOptions, + SqliteArrayRow, SqliteChanges, SqliteDriverStatement, + SqliteObjectRow, SqliteParameterBinding, - SqliteStepResult, - StepOptions + StreamQueryOptions } from '../driver-api.js'; import { SqliteDriverError } from '../worker_threads/async-commands.js'; @@ -27,28 +28,45 @@ export class ErrorStatement implements SqliteDriverStatement { ) { this.error = error; this.source = source; - this.persisted = options.persist ?? false; + this.persisted = options.autoFinalize ?? false; } - async getColumns(): Promise { + all( + parameters: SqliteParameterBinding, + options: QueryOptions + ): Promise { throw this.error; } - bind(parameters: SqliteParameterBinding): void { - // no-op + allArray( + parameters: SqliteParameterBinding, + options: QueryOptions + ): Promise { + throw this.error; } - async step(n?: number, options?: StepOptions): Promise { + stream( + parameters: SqliteParameterBinding, + options: StreamQueryOptions + ): AsyncIterator { throw this.error; } - - async run(options?: StepOptions): Promise { + streamArray( + parameters: SqliteParameterBinding, + options: StreamQueryOptions + ): AsyncIterator { throw this.error; } - finalize(): void { - // no-op + async getColumns(): Promise { + throw this.error; + } + async run( + parameters: SqliteParameterBinding, + options?: QueryOptions + ): Promise { + throw this.error; } - reset(options?: ResetOptions): void { + finalize(): void { // no-op } diff --git a/packages/driver/src/worker_threads/WorkerDriverAdapter.ts b/packages/driver/src/worker_threads/WorkerDriverAdapter.ts index 85f867b..c17ce25 100644 --- a/packages/driver/src/worker_threads/WorkerDriverAdapter.ts +++ b/packages/driver/src/worker_threads/WorkerDriverAdapter.ts @@ -2,13 +2,11 @@ import { SqliteChanges, SqliteDriverConnection, SqliteDriverStatement, - SqliteStepResult, UpdateListener } from '../driver-api.js'; import { mapError } from '../util/errors.js'; import { InferBatchResult, - SqliteBind, SqliteCommand, SqliteCommandResponse, SqliteCommandType, @@ -16,9 +14,9 @@ import { SqliteParse, SqliteParseResult, SqlitePrepare, - SqliteReset, + SqliteQuery, + SqliteQueryResult, SqliteRun, - SqliteStep, WorkerDriver } from './async-commands.js'; @@ -50,9 +48,7 @@ export class WorkerConnectionAdapter implements WorkerDriver { } const statement = this.connnection.prepare(sql, { - bigint: command.bigint, - persist: command.persist, - rawResults: command.rawResults + autoFinalize: command.autoFinalize }); this.statements.set(id, statement); } @@ -63,28 +59,25 @@ export class WorkerConnectionAdapter implements WorkerDriver { return { columns: await statement.getColumns() }; } - private _bind(command: SqliteBind): void { - const { id, parameters } = command; - const statement = this.requireStatement(id); - statement.bind(parameters); - } - - private _step(command: SqliteStep): Promise { - const { id, n, requireTransaction } = command; - const statement = this.requireStatement(id); - return statement.step(n, { requireTransaction }); - } - private _run(command: SqliteRun): Promise { const { id } = command; const statement = this.requireStatement(id); - return statement.run(command); + return statement.run(command.parameters, command.options); } - private _reset(command: SqliteReset): void { + private async _query(command: SqliteQuery): Promise { const { id } = command; const statement = this.requireStatement(id); - statement.reset(command); + if (command.array) { + const results = await statement.allArray( + command.parameters, + command.options + ); + return { rows: results }; + } else { + const results = await statement.all(command.parameters, command.options); + return { rows: results }; + } } private _finalize(command: SqliteFinalize): void { @@ -98,22 +91,16 @@ export class WorkerConnectionAdapter implements WorkerDriver { switch (command.type) { case SqliteCommandType.prepare: return this._prepare(command); - case SqliteCommandType.bind: - return this._bind(command); - case SqliteCommandType.step: - return this._step(command); + case SqliteCommandType.query: + return this._query(command); case SqliteCommandType.run: return this._run(command); - case SqliteCommandType.reset: - return this._reset(command); case SqliteCommandType.finalize: return this._finalize(command); case SqliteCommandType.parse: return this._parse(command); - case SqliteCommandType.changes: - return this.connnection.getLastChanges(); default: - throw new Error(`Unknown command: ${command.type}`); + throw new Error(`Unknown command: ${(command as SqliteCommand).type}`); } } diff --git a/packages/driver/src/worker_threads/async-commands.ts b/packages/driver/src/worker_threads/async-commands.ts index 70a352f..e9450f6 100644 --- a/packages/driver/src/worker_threads/async-commands.ts +++ b/packages/driver/src/worker_threads/async-commands.ts @@ -1,20 +1,19 @@ import { SqliteParameterBinding, SqliteChanges, - SqliteStepResult + QueryOptions, + StreamQueryOptions, + SqliteArrayRow, + SqliteObjectRow } from '../driver-api.js'; import { SerializedDriverError } from '../sqlite-error.js'; -export enum SqliteCommandType { +export const enum SqliteCommandType { prepare = 1, - bind = 2, - step = 3, - reset = 4, finalize = 5, - sync = 6, parse = 7, run = 8, - changes = 9 + query = 9 } export type SqliteDriverError = SerializedDriverError; @@ -37,43 +36,35 @@ export interface SqlitePrepare extends SqliteBaseCommand { type: SqliteCommandType.prepare; id: number; sql: string; - bigint?: boolean; - persist?: boolean; - rawResults?: boolean; + autoFinalize?: boolean; } export interface SqliteParseResult { columns: string[]; } -export interface SqliteBind extends SqliteBaseCommand { - type: SqliteCommandType.bind; - id: number; - parameters: SqliteParameterBinding; -} - export interface SqliteParse extends SqliteBaseCommand { type: SqliteCommandType.parse; id: number; } -export interface SqliteStep extends SqliteBaseCommand { - type: SqliteCommandType.step; - id: number; - n?: number; - requireTransaction?: boolean; -} - export interface SqliteRun extends SqliteBaseCommand { type: SqliteCommandType.run; id: number; - requireTransaction?: boolean; + parameters?: SqliteParameterBinding; + options?: QueryOptions; } -export interface SqliteReset extends SqliteBaseCommand { - type: SqliteCommandType.reset; +export interface SqliteQueryResult { + rows: SqliteArrayRow[] | SqliteObjectRow[]; +} + +export interface SqliteQuery extends SqliteBaseCommand { + type: SqliteCommandType.query; id: number; - clearBindings?: boolean; + parameters?: SqliteParameterBinding; + options?: StreamQueryOptions; + array?: boolean; } export interface SqliteFinalize extends SqliteBaseCommand { @@ -81,34 +72,20 @@ export interface SqliteFinalize extends SqliteBaseCommand { id: number; } -export interface SqliteSync { - type: SqliteCommandType.sync; -} - -export interface SqliteGetChanges { - type: SqliteCommandType.changes; -} - export type SqliteCommand = | SqlitePrepare - | SqliteBind - | SqliteStep | SqliteRun - | SqliteReset | SqliteFinalize - | SqliteSync - | SqliteParse - | SqliteGetChanges; + | SqliteQuery + | SqliteParse; export type InferCommandResult = T extends SqliteRun ? SqliteChanges - : T extends SqliteStep - ? SqliteStepResult + : T extends SqliteQuery + ? SqliteQueryResult : T extends SqliteParse ? SqliteParseResult - : T extends SqliteGetChanges - ? SqliteChanges - : void; + : void; export type InferBatchResult = { [i in keyof T]: diff --git a/packages/driver/src/worker_threads/worker-driver.ts b/packages/driver/src/worker_threads/worker-driver.ts index a0392a8..7ce3016 100644 --- a/packages/driver/src/worker_threads/worker-driver.ts +++ b/packages/driver/src/worker_threads/worker-driver.ts @@ -6,9 +6,11 @@ import { SqliteDriverStatement, SqliteParameterBinding, SqliteChanges, - SqliteStepResult, - StepOptions, - UpdateListener + UpdateListener, + QueryOptions, + SqliteArrayRow, + SqliteObjectRow, + StreamQueryOptions } from '../driver-api.js'; import { Deferred } from '../deferred.js'; @@ -78,27 +80,13 @@ export class WorkerDriverConnection implements SqliteDriverConnection { cmd: { type: SqliteCommandType.prepare, id, - bigint: options?.bigint, - persist: options?.persist, - rawResults: options?.rawResults, + autoFinalize: options?.autoFinalize, sql } }); return new WorkerDriverStatement(this, id); } - async getLastChanges(): Promise { - return await this._push({ - type: SqliteCommandType.changes - }); - } - - async sync(): Promise { - await this._push({ - type: SqliteCommandType.sync - }); - } - _push(cmd: T): Promise> { const d = new Deferred(); this.buffer.push({ cmd, resolve: d.resolve, reject: d.reject }); @@ -206,38 +194,67 @@ class WorkerDriverStatement implements SqliteDriverStatement { this[Symbol.dispose] = () => this.finalize(); } } + all( + parameters: SqliteParameterBinding, + options?: QueryOptions + ): Promise { + return this.driver + ._push({ + type: SqliteCommandType.query, + id: this.id, + parameters: parameters, + options: options + }) + .then((r) => r.rows as SqliteObjectRow[]); + } - async getColumns(): Promise { + allArray( + parameters: SqliteParameterBinding, + options?: QueryOptions + ): Promise { return this.driver ._push({ - type: SqliteCommandType.parse, - id: this.id + type: SqliteCommandType.query, + id: this.id, + parameters: parameters, + options: options, + array: true }) - .then((r) => r.columns); + .then((r) => r.rows as SqliteArrayRow[]); } - bind(parameters: SqliteParameterBinding): void { - this.driver._send({ - type: SqliteCommandType.bind, - id: this.id, - parameters: parameters - }); + stream( + parameters: SqliteParameterBinding, + options?: StreamQueryOptions + ): AsyncIterator { + throw new Error('Method not implemented.'); } - async step(n?: number, options?: StepOptions): Promise { - return this.driver._push({ - type: SqliteCommandType.step, - id: this.id, - n: n, - requireTransaction: options?.requireTransaction - }); + streamArray( + parameters: SqliteParameterBinding, + options?: StreamQueryOptions + ): AsyncIterator { + throw new Error('Method not implemented.'); } - async run(options?: StepOptions): Promise { + async getColumns(): Promise { + return this.driver + ._push({ + type: SqliteCommandType.parse, + id: this.id + }) + .then((r) => r.columns); + } + + async run( + parameters?: SqliteParameterBinding, + options?: QueryOptions + ): Promise { return this.driver._push({ type: SqliteCommandType.run, id: this.id, - requireTransaction: options?.requireTransaction + parameters: parameters, + options: options }); } @@ -247,14 +264,6 @@ class WorkerDriverStatement implements SqliteDriverStatement { id: this.id }); } - - reset(options?: ResetOptions): void { - this.driver._send({ - type: SqliteCommandType.reset, - id: this.id, - clearBindings: options?.clearBindings - }); - } } interface CommandQueueItem { From d03f87efcaa3a555b56e9edb6fdef592ac5e4ee7 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 29 Aug 2024 12:23:19 +0200 Subject: [PATCH 2/5] Update tests. --- packages/api/src/api.ts | 14 +- packages/api/src/impl.ts | 58 +++--- .../better-sqlite3-driver/src/sync-driver.ts | 2 +- packages/driver-tests/src/driver-tests.ts | 167 ++++-------------- packages/driver/src/driver-api.ts | 2 +- packages/driver/src/node/impl.ts | 4 +- packages/driver/src/util/ErrorStatement.ts | 8 +- 7 files changed, 83 insertions(+), 172 deletions(-) diff --git a/packages/api/src/api.ts b/packages/api/src/api.ts index 4dfc6b3..41e2e45 100644 --- a/packages/api/src/api.ts +++ b/packages/api/src/api.ts @@ -1,4 +1,4 @@ -import { SqliteArguments, SqliteRowObject } from '@sqlite-js/driver'; +import { SqliteArguments, SqliteObjectRow } from '@sqlite-js/driver'; export type SqliteDatabase = SqliteConnectionPool & SqliteConnection; @@ -61,7 +61,7 @@ export interface ReservedSqliteConnection extends SqliteConnection { } export interface QueryInterface { - prepare( + prepare( query: string, args?: SqliteArguments, options?: QueryOptions @@ -73,7 +73,7 @@ export interface QueryInterface { options?: ReserveConnectionOptions ): Promise; - stream( + stream( query: string, args: SqliteArguments, options?: StreamOptions & ReserveConnectionOptions @@ -84,7 +84,7 @@ export interface QueryInterface { * * When called on a connection pool, uses readonly: true by default. */ - select( + select( query: string, args?: SqliteArguments, options?: QueryOptions & ReserveConnectionOptions @@ -99,7 +99,7 @@ export interface QueryInterface { * @param args * @param options */ - get( + get( query: string, args?: SqliteArguments, options?: QueryOptions & ReserveConnectionOptions @@ -114,7 +114,7 @@ export interface QueryInterface { * @param args * @param options */ - getOptional( + getOptional( query: string, args?: SqliteArguments, options?: QueryOptions & ReserveConnectionOptions @@ -236,7 +236,7 @@ export interface RunResult { lastInsertRowId: bigint; } -export interface PreparedQuery { +export interface PreparedQuery { parse(): Promise<{ columns: string[] }>; /** diff --git a/packages/api/src/impl.ts b/packages/api/src/impl.ts index 8edff3d..6e66d8d 100644 --- a/packages/api/src/impl.ts +++ b/packages/api/src/impl.ts @@ -26,7 +26,7 @@ import { SqliteDriverConnection, SqliteDriverConnectionPool, SqliteDriverStatement, - SqliteRowObject + SqliteObjectRow } from '@sqlite-js/driver'; export class ConnectionPoolImpl @@ -54,7 +54,7 @@ export class ConnectionPoolImpl throw new Error('Method not implemented.'); } - prepare( + prepare( sql: string, args?: SqliteArguments ): PreparedQuery { @@ -103,7 +103,7 @@ export class ConnectionPoolImpl return tx; } - async *stream( + async *stream( query: string, args?: SqliteArguments, options?: (StreamOptions & ReserveConnectionOptions) | undefined @@ -116,7 +116,7 @@ export class ConnectionPoolImpl } } - async select( + async select( query: string, args?: SqliteArguments | undefined, options?: (QueryOptions & ReserveConnectionOptions) | undefined @@ -129,7 +129,7 @@ export class ConnectionPoolImpl } } - async get( + async get( query: string, args?: SqliteArguments | undefined, options?: (QueryOptions & ReserveConnectionOptions) | undefined @@ -142,7 +142,7 @@ export class ConnectionPoolImpl } } - async getOptional( + async getOptional( query: string, args?: SqliteArguments | undefined, options?: (QueryOptions & ReserveConnectionOptions) | undefined @@ -202,7 +202,7 @@ export class ReservedConnectionImpl implements ReservedSqliteConnection { } } - prepare( + prepare( sql: string, args?: SqliteArguments, options?: QueryOptions @@ -252,7 +252,7 @@ export class ReservedConnectionImpl implements ReservedSqliteConnection { return this.connection.run(query, args); } - stream( + stream( query: string, args: SqliteArguments | undefined, options?: StreamOptions | undefined @@ -260,7 +260,7 @@ export class ReservedConnectionImpl implements ReservedSqliteConnection { return this.connection.stream(query, args, options); } - select( + select( query: string, args?: SqliteArguments | undefined, options?: QueryOptions | undefined @@ -268,7 +268,7 @@ export class ReservedConnectionImpl implements ReservedSqliteConnection { return this.connection.select(query, args, options); } - get( + get( query: string, args?: SqliteArguments, options?: QueryOptions @@ -276,7 +276,7 @@ export class ReservedConnectionImpl implements ReservedSqliteConnection { return this.connection.get(query, args, options); } - getOptional( + getOptional( query: string, args?: SqliteArguments, options?: QueryOptions @@ -295,11 +295,13 @@ export class ConnectionImpl implements SqliteConnection { private init() { this._beginExclusive ??= this.prepare('BEGIN EXCLUSIVE', undefined, { - persist: true + autoFinalize: true + }); + this._begin ??= this.prepare('BEGIN', undefined, { autoFinalize: true }); + this.commit ??= this.prepare('COMMIT', undefined, { autoFinalize: true }); + this.rollback ??= this.prepare('ROLLBACK', undefined, { + autoFinalize: true }); - this._begin ??= this.prepare('BEGIN', undefined, { persist: true }); - this.commit ??= this.prepare('COMMIT', undefined, { persist: true }); - this.rollback ??= this.prepare('ROLLBACK', undefined, { persist: true }); } async begin(options?: TransactionOptions): Promise { @@ -362,7 +364,7 @@ export class ConnectionImpl implements SqliteConnection { this.rollback?.dispose(); } - prepare( + prepare( sql: string, args?: SqliteArguments, options?: PrepareOptions @@ -392,7 +394,7 @@ export class ConnectionImpl implements SqliteConnection { return await statement.run(); } - async *stream( + async *stream( query: string | PreparedQuery, args: SqliteArguments | undefined, options?: StreamOptions | undefined @@ -416,7 +418,7 @@ export class ConnectionImpl implements SqliteConnection { } } - async select( + async select( query: string, args?: SqliteArguments, options?: (QueryOptions & ReserveConnectionOptions) | undefined @@ -432,7 +434,7 @@ export class ConnectionImpl implements SqliteConnection { return rows as T[]; } - async get( + async get( query: string, args?: SqliteArguments, options?: (QueryOptions & ReserveConnectionOptions) | undefined @@ -444,7 +446,7 @@ export class ConnectionImpl implements SqliteConnection { return row; } - async getOptional( + async getOptional( query: string, args?: SqliteArguments, options?: (QueryOptions & ReserveConnectionOptions) | undefined @@ -467,7 +469,7 @@ export class TransactionImpl implements SqliteTransaction { await this.con.rollback!.select(); } - prepare( + prepare( sql: string, args?: SqliteArguments, options?: QueryOptions @@ -486,7 +488,7 @@ export class TransactionImpl implements SqliteTransaction { return this.con.run(query, args); } - stream( + stream( query: string, args: SqliteArguments, options?: StreamOptions | undefined @@ -494,7 +496,7 @@ export class TransactionImpl implements SqliteTransaction { return this.con.stream(query, args, options); } - select( + select( query: string, args?: SqliteArguments, options?: QueryOptions | undefined @@ -502,7 +504,7 @@ export class TransactionImpl implements SqliteTransaction { return this.con.select(query, args, options); } - get( + get( query: string, args?: SqliteArguments, options?: QueryOptions | undefined @@ -510,7 +512,7 @@ export class TransactionImpl implements SqliteTransaction { return this.con.get(query, args, options); } - getOptional( + getOptional( query: string, args?: SqliteArguments, options?: QueryOptions | undefined @@ -556,7 +558,7 @@ class BeginTransactionImpl } } - async select( + async select( query: string, args?: SqliteArguments, options?: (QueryOptions & ReserveConnectionOptions) | undefined @@ -590,7 +592,7 @@ class BeginTransactionImpl } } -class ConnectionPoolPreparedQueryImpl +class ConnectionPoolPreparedQueryImpl implements PreparedQuery { [Symbol.dispose]: () => void = undefined as any; @@ -668,7 +670,7 @@ class ConnectionPoolPreparedQueryImpl } } -class ConnectionPreparedQueryImpl +class ConnectionPreparedQueryImpl implements PreparedQuery { [Symbol.dispose]: () => void = undefined as any; diff --git a/packages/better-sqlite3-driver/src/sync-driver.ts b/packages/better-sqlite3-driver/src/sync-driver.ts index 8a0bac9..d50da3e 100644 --- a/packages/better-sqlite3-driver/src/sync-driver.ts +++ b/packages/better-sqlite3-driver/src/sync-driver.ts @@ -36,7 +36,7 @@ class BetterSqlitePreparedStatement implements InternalStatement { constructor(statement: bsqlite.Statement, options: PrepareOptions) { this.statement = statement; this.options = options; - this.persisted = options.persist ?? false; + this.persisted = options.autoFinalize ?? false; if (typeof Symbol.dispose != 'undefined') { this[Symbol.dispose] = () => this.finalize(); diff --git a/packages/driver-tests/src/driver-tests.ts b/packages/driver-tests/src/driver-tests.ts index 6e185e5..241c32d 100644 --- a/packages/driver-tests/src/driver-tests.ts +++ b/packages/driver-tests/src/driver-tests.ts @@ -38,13 +38,11 @@ export function describeDriverTests( dbPath = `test-db/${testNameSanitized}.db`; }); - test.skipIf(!features.rawResults)('basic select - raw', async () => { + test.skipIf(!features.rawResults)('basic select - array', async () => { await using driver = await open(); await using connection = await driver.reserveConnection(); - using s = connection.prepare('select 1 as one', { - rawResults: true - }); - const { rows } = await s.step(); + using s = connection.prepare('select 1 as one'); + const rows = await s.allArray(); expect(rows).toEqual([[1]]); @@ -58,7 +56,7 @@ export function describeDriverTests( await using driver = await open(); await using connection = await driver.reserveConnection(); using s = connection.prepare('select 1 as one'); - const { rows } = await s.step(); + const rows = await s.all(); expect(rows).toEqual([{ one: 1 }]); }); @@ -66,13 +64,12 @@ export function describeDriverTests( await using driver = await open(); await using connection = await driver.reserveConnection(); using s = connection.prepare('select 9223372036854775807 as bignumber'); - const { rows } = await s.step(); + const rows = await s.all(); expect(rows).toEqual([{ bignumber: 9223372036854776000 }]); using s2 = connection.prepare('select ? as bignumber'); - s2.bind([9223372036854775807n]); - const { rows: rows2 } = await s2.step(); + const rows2 = await s2.all([9223372036854775807n]); expect(rows2).toEqual([{ bignumber: 9223372036854776000 }]); }); @@ -80,15 +77,15 @@ export function describeDriverTests( test('bigint', async () => { await using driver = await open(); await using connection = await driver.reserveConnection(); - using s = connection.prepare('select ? as bignumber', { bigint: true }); - s.bind([9223372036854775807n]); - const { rows: rows1 } = await s.step(); + using s = connection.prepare('select ? as bignumber'); + const rows1 = await s.all([9223372036854775807n], { bigint: true }); expect(rows1).toEqual([{ bignumber: 9223372036854775807n }]); - using s2 = connection.prepare('select 9223372036854775807 as bignumber', { - bigint: true - }); - const { rows: rows2 } = await s2.step(); + using s2 = connection.prepare( + 'select 9223372036854775807 as bignumber', + {} + ); + const rows2 = await s2.all(undefined, { bigint: true }); expect(rows2).toEqual([{ bignumber: 9223372036854775807n }]); }); @@ -98,19 +95,14 @@ export function describeDriverTests( using s1 = connection.prepare( 'create table test_data(id integer primary key, data text)' ); - await s1.step(); + await s1.run(); using s2 = connection.prepare( 'insert into test_data(data) values(123) returning id' ); - const { rows } = await s2.step(); + const rows = await s2.all(); expect(rows).toEqual([{ id: 1 }]); - expect(await connection.connection.getLastChanges()).toEqual({ - changes: 1, - lastInsertRowId: 1n - }); - if (features.getColumns) { const columns = await s2.getColumns(); expect(columns).toEqual(['id']); @@ -121,8 +113,7 @@ export function describeDriverTests( await using driver = await open(); await using connection = await driver.reserveConnection(); using s = connection.prepare('select :one as one, :two as two'); - s.bind({ one: 1, two: 2 }); - const { rows } = await s.step(); + const rows = await s.all({ one: 1, two: 2 }); expect(rows).toEqual([{ one: 1, two: 2 }]); }); @@ -130,8 +121,7 @@ export function describeDriverTests( await using driver = await open(); await using connection = await driver.reserveConnection(); using s = connection.prepare('select $one as one, $two as two'); - s.bind({ $one: 1, $two: 2 }); - const { rows } = await s.step(); + const rows = await s.all({ $one: 1, $two: 2 }); expect(rows).toEqual([{ one: 1, two: 2 }]); }); @@ -141,39 +131,17 @@ export function describeDriverTests( await using driver = await open(); await using connection = await driver.reserveConnection(); using s = connection.prepare('select :one as one, :two as two'); - s.bind({ two: 2 }); - const { rows } = await s.step(); + const rows = await s.all({ two: 2 }); expect(rows).toEqual([{ one: null, two: 2 }]); } ); - test('rebind arg', async () => { - await using driver = await open(); - await using connection = await driver.reserveConnection(); - using s = connection.prepare('select :one as one, :two as two'); - s.bind({ one: 1, two: 2 }); - s.bind({ one: 11, two: 22 }); - const { rows } = await s.step(); - expect(rows).toEqual([{ one: 11, two: 22 }]); - }); - - test('partial rebind', async () => { - await using driver = await open(); - await using connection = await driver.reserveConnection(); - using s = connection.prepare('select :one as one, :two as two'); - s.bind({ one: 1, two: 2 }); - s.bind({ two: 22 }); - const { rows } = await s.step(); - expect(rows).toEqual([{ one: 1, two: 22 }]); - }); - test('positional parameters', async () => { await using driver = await open(); await using connection = await driver.reserveConnection(); using s = connection.prepare('select ? as one, ? as two'); - s.bind([1, 2]); - const { rows } = await s.step(); + const rows = await s.all([1, 2]); expect(rows).toEqual([{ one: 1, two: 2 }]); }); @@ -181,96 +149,35 @@ export function describeDriverTests( await using driver = await open(); await using connection = await driver.reserveConnection(); using s = connection.prepare('select ?2 as two, ?1 as one'); - s.bind({ '1': 1, '2': 2 }); - const { rows } = await s.step(); + const rows = await s.all({ '1': 1, '2': 2 }); expect(rows).toEqual([{ two: 2, one: 1 }]); }); - test('positional parameters partial rebind', async () => { - await using driver = await open(); - await using connection = await driver.reserveConnection(); - using s = connection.prepare('select ? as one, ? as two'); - s.bind([1, 2]); - s.bind([undefined, 22]); - const { rows } = await s.step(); - expect(rows).toEqual([{ one: 1, two: 22 }]); - }); - - test('named and positional parameters', async () => { + test.skip('named and positional parameters', async () => { + // TODO: Specify the behavior for this await using driver = await open(); await using connection = await driver.reserveConnection(); using s = connection.prepare( 'select ? as one, @three as three, ? as two' ); - s.bind([1, 2]); - s.bind({ three: 3 }); - const { rows } = await s.step(); + const rows = await s.all({ 1: 1, 2: 2, three: 3 }); expect(rows).toEqual([{ one: 1, three: 3, two: 2 }]); }); - test('reset parameters', async () => { - await using driver = await open(); - await using connection = await driver.reserveConnection(); - using s = connection.prepare('select ? as one, ? as two'); - s.bind([1, 2]); - const { rows: rows1 } = await s.step(); - s.reset(); - const { rows: rows2 } = await s.step(); - s.reset({ clearBindings: true }); + test.skipIf(!features.allowsMissingParameters)( + 'reset parameters', + async () => { + await using driver = await open(); + await using connection = await driver.reserveConnection(); + using s = connection.prepare('select ? as one, ? as two'); + const rows1 = await s.all([1, 2]); - expect(rows1).toEqual([{ one: 1, two: 2 }]); - expect(rows2).toEqual([{ one: 1, two: 2 }]); + expect(rows1).toEqual([{ one: 1, two: 2 }]); - if (features.allowsMissingParameters) { - const { rows: rows3 } = await s.step(); - expect(rows3).toEqual([{ one: null, two: null }]); + const rows2 = await s.all(); + expect(rows2).toEqual([{ one: null, two: null }]); } - }); - - test('partial reset', async () => { - await using driver = await open(); - await using connection = await driver.reserveConnection(); - using s = connection.prepare( - "select json_each.value as v from json_each('[1,2,3,4,5]')" - ); - const { rows: rows1 } = await s.step(3); - s.reset(); - const { rows: rows2 } = await s.step(3); - const { rows: rows3 } = await s.step(3); - const { rows: rows4 } = await s.step(3); - s.reset(); - const { rows: rows5 } = await s.step(); - - expect(rows1).toEqual([{ v: 1 }, { v: 2 }, { v: 3 }]); - expect(rows2).toEqual([{ v: 1 }, { v: 2 }, { v: 3 }]); - expect(rows3).toEqual([{ v: 4 }, { v: 5 }]); - expect(rows4).toBe(undefined); - expect(rows5).toEqual([{ v: 1 }, { v: 2 }, { v: 3 }, { v: 4 }, { v: 5 }]); - }); - - test('multiple insert step', async () => { - await using driver = await open(); - await using connection = await driver.reserveConnection(); - - using s1 = connection.prepare( - 'create table test_data(id integer primary key, data text)' - ); - await s1.step(); - using s2 = connection.prepare( - "insert into test_data(data) values('test')" - ); - const { rows: rows1 } = await s2.step(); - const { rows: rows2 } = await s2.step(); - s2.reset(); - const { rows: rows3 } = await s2.step(); - using s3 = connection.prepare('select count(*) as count from test_data'); - const { rows: rows4 } = await s3.step(); - - expect(rows1).toEqual([]); - expect(rows2).toBe(undefined); - expect(rows3).toEqual([]); - expect(rows4).toEqual([{ count: 2 }]); - }); + ); test('error handling - prepare', async () => { await using driver = await open(); @@ -280,13 +187,13 @@ export function describeDriverTests( code: 'SQLITE_ERROR', message: 'no such column: foobar' }); - expect(await s.step().catch((e) => e)).toMatchObject({ + expect(await s.all().catch((e) => e)).toMatchObject({ code: 'SQLITE_ERROR', message: 'no such column: foobar' }); }); - test('error handling - step', async () => { + test('error handling - query', async () => { await using driver = await open(); await using connection = await driver.reserveConnection(); using s = connection.prepare( @@ -295,7 +202,7 @@ export function describeDriverTests( if (features.getColumns) { expect(await s.getColumns()).toEqual(['value']); } - expect(await s.step().catch((e) => e)).toMatchObject({ + expect(await s.all().catch((e) => e)).toMatchObject({ code: 'SQLITE_ERROR', message: 'malformed JSON' }); diff --git a/packages/driver/src/driver-api.ts b/packages/driver/src/driver-api.ts index 0396078..7cf4784 100644 --- a/packages/driver/src/driver-api.ts +++ b/packages/driver/src/driver-api.ts @@ -54,7 +54,7 @@ export interface SqliteDriverStatement { options?: QueryOptions ): Promise; allArray( - parameters: SqliteParameterBinding, + parameters?: SqliteParameterBinding, options?: QueryOptions ): Promise; diff --git a/packages/driver/src/node/impl.ts b/packages/driver/src/node/impl.ts index b0d1197..d3b2798 100644 --- a/packages/driver/src/node/impl.ts +++ b/packages/driver/src/node/impl.ts @@ -187,7 +187,9 @@ export class NodeSqliteConnection implements SqliteDriverConnection { } function convertParameters(parameters: SqliteParameterBinding): any[] { - if (Array.isArray(parameters)) { + if (parameters == null) { + return []; + } else if (Array.isArray(parameters)) { return parameters; } else { return [parameters]; diff --git a/packages/driver/src/util/ErrorStatement.ts b/packages/driver/src/util/ErrorStatement.ts index b3d227b..7d47d7b 100644 --- a/packages/driver/src/util/ErrorStatement.ts +++ b/packages/driver/src/util/ErrorStatement.ts @@ -31,25 +31,25 @@ export class ErrorStatement implements SqliteDriverStatement { this.persisted = options.autoFinalize ?? false; } - all( + async all( parameters: SqliteParameterBinding, options: QueryOptions ): Promise { throw this.error; } - allArray( + async allArray( parameters: SqliteParameterBinding, options: QueryOptions ): Promise { throw this.error; } - stream( + async *stream( parameters: SqliteParameterBinding, options: StreamQueryOptions ): AsyncIterator { throw this.error; } - streamArray( + async *streamArray( parameters: SqliteParameterBinding, options: StreamQueryOptions ): AsyncIterator { From 6994452612cc857e0ddb97f202020402ebd41bff Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 29 Aug 2024 16:54:24 +0200 Subject: [PATCH 3/5] Update better-sqlite3-driver. --- .../better-sqlite3-driver/src/sync-driver.ts | 226 +++++++++--------- 1 file changed, 114 insertions(+), 112 deletions(-) diff --git a/packages/better-sqlite3-driver/src/sync-driver.ts b/packages/better-sqlite3-driver/src/sync-driver.ts index d50da3e..0509ea5 100644 --- a/packages/better-sqlite3-driver/src/sync-driver.ts +++ b/packages/better-sqlite3-driver/src/sync-driver.ts @@ -1,13 +1,14 @@ import { PrepareOptions, - ResetOptions, + QueryOptions, + SqliteArrayRow, SqliteChanges, SqliteDriverConnection, SqliteDriverStatement, + SqliteObjectRow, SqliteParameterBinding, - SqliteStepResult, SqliteValue, - StepOptions, + StreamQueryOptions, UpdateListener } from '@sqlite-js/driver'; import type * as bsqlite from 'better-sqlite3'; @@ -23,11 +24,6 @@ interface InternalStatement extends SqliteDriverStatement { class BetterSqlitePreparedStatement implements InternalStatement { public statement: bsqlite.Statement; - private options: PrepareOptions; - private bindPositional: SqliteValue[] = []; - private bindNamed: Record = {}; - private statementDone = false; - private iterator: Iterator | undefined = undefined; readonly persisted: boolean; @@ -35,7 +31,6 @@ class BetterSqlitePreparedStatement implements InternalStatement { constructor(statement: bsqlite.Statement, options: PrepareOptions) { this.statement = statement; - this.options = options; this.persisted = options.autoFinalize ?? false; if (typeof Symbol.dispose != 'undefined') { @@ -57,140 +52,125 @@ class BetterSqlitePreparedStatement implements InternalStatement { } } - bind(parameters: SqliteParameterBinding): void { - if (parameters == null) { - return; - } - if (Array.isArray(parameters)) { - let bindArray = this.bindPositional; - - for (let i = 0; i < parameters.length; i++) { - if (typeof parameters[i] != 'undefined') { - bindArray[i] = parameters[i]!; - } - } - } else { - for (let key in parameters) { - const value = parameters[key]; - let name = key; - const prefix = key[0]; - // better-sqlite doesn't support the explicit prefix - strip it - if (prefix == ':' || prefix == '?' || prefix == '$' || prefix == '@') { - name = key.substring(1); - } - this.bindNamed[name] = value; + private checkTransaction(options: QueryOptions | undefined) { + if (options?.requireTransaction) { + if (!this.statement.database.inTransaction) { + throw new Error('Transaction has been rolled back'); } } } - async step(n?: number, options?: StepOptions): Promise { + _all( + parameters: SqliteParameterBinding, + options: QueryOptions | undefined, + array: boolean + ): unknown[] { + this.checkTransaction(options); + + const statement = this.statement; + + statement.safeIntegers(options?.bigint ?? false); + statement.raw(array); + const r = statement.all(sanitizeParameters(parameters)); + return r; + } + + async all( + parameters?: SqliteParameterBinding, + options?: QueryOptions + ): Promise { try { - return this.stepSync(n, options); + return this._all(parameters, options, false) as SqliteObjectRow[]; } catch (e) { throw mapError(e); } } - async run(options?: StepOptions): Promise { + async allArray( + parameters?: SqliteParameterBinding, + options?: QueryOptions + ): Promise { try { - return this.runSync(options); + return this._all(parameters, options, true) as SqliteArrayRow[]; } catch (e) { throw mapError(e); } } - runSync(options?: StepOptions): SqliteChanges { - if (options?.requireTransaction) { - if (!this.statement.database.inTransaction) { - throw new Error('Transaction has been rolled back'); - } - } + *_stream( + parameters: SqliteParameterBinding, + options: StreamQueryOptions | undefined, + array: boolean + ) { + this.checkTransaction(options); const statement = this.statement; - this.reset(); - try { - const bindNamed = this.bindNamed; - const bindPositional = this.bindPositional; - const bind = [bindPositional, bindNamed].filter((b) => b != null); - - statement.safeIntegers(true); - const r = statement.run(...bind); - return { - changes: r.changes, - lastInsertRowId: r.lastInsertRowid as bigint - }; - } finally { - this.reset(); + statement.safeIntegers(options?.bigint ?? false); + statement.raw(array); + const iter = statement.iterate(sanitizeParameters(parameters)); + const maxBuffer = options?.chunkMaxRows ?? 100; + let buffer: any[] = []; + for (let row of iter) { + buffer.push(row as any); + if (buffer.length >= maxBuffer) { + yield buffer; + buffer = []; + } } } - stepSync(n?: number, options?: StepOptions): SqliteStepResult { - const all = n == null; - - const statement = this.statement; - if (this.statementDone) { - return { done: true } as SqliteStepResult; + async *stream( + parameters?: SqliteParameterBinding, + options?: StreamQueryOptions + ): AsyncIterator { + try { + yield* this._stream(parameters, options, false); + } catch (e) { + throw mapError(e); } + } - if (options?.requireTransaction) { - if (!this.statement.database.inTransaction) { - throw new Error('Transaction has been rolled back'); - } + async *streamArray( + parameters?: SqliteParameterBinding, + options?: StreamQueryOptions + ): AsyncIterator { + try { + yield* this._stream(parameters, options, true); + } catch (e) { + throw mapError(e); } + } - const bindNamed = this.bindNamed; - const bindPositional = this.bindPositional; - const bind = [bindPositional, bindNamed].filter((b) => b != null); - if (!statement.reader) { - statement.run(...bind); - this.statementDone = true; - return { rows: [], done: true } as SqliteStepResult; - } - let iterator = this.iterator; - const num_rows = n ?? 1; - if (iterator == null) { - statement.raw(this.options.rawResults ?? false); - statement.safeIntegers(this.options.bigint ?? false); - iterator = statement.iterate(...bind); - this.iterator = iterator; - } - let rows = []; - let isDone = false; - for (let i = 0; i < num_rows || all; i++) { - const { value, done } = iterator.next(); - if (done) { - isDone = true; - break; - } - rows.push(value); - } - if (isDone) { - this.statementDone = true; + async run( + parameters?: SqliteParameterBinding, + options?: QueryOptions + ): Promise { + try { + return this._run(parameters, options); + } catch (e) { + throw mapError(e); } - return { rows, done: isDone } as SqliteStepResult; } - finalize(): void { - const existingIter = this.iterator; - if (existingIter != null) { - existingIter.return?.(); - } - this.iterator = undefined; - this.statementDone = false; + _run( + parameters: SqliteParameterBinding, + options?: QueryOptions + ): SqliteChanges { + this.checkTransaction(options); + + const statement = this.statement; + + statement.safeIntegers(true); + const r = statement.run(sanitizeParameters(parameters)); + return { + changes: r.changes, + lastInsertRowId: r.lastInsertRowid as bigint + }; } - reset(options?: ResetOptions): void { - if (this.iterator) { - const iter = this.iterator; - iter.return!(); - this.iterator = undefined; - } - if (options?.clearBindings) { - this.bindNamed = {}; - this.bindPositional = []; - } - this.statementDone = false; + finalize(): void { + // TODO: cancel iterators } } @@ -286,3 +266,25 @@ export class BetterSqliteConnection implements SqliteDriverConnection { return () => {}; } } + +function sanitizeParameters( + parameters: SqliteParameterBinding +): SqliteParameterBinding { + if (parameters == null) { + return []; + } else if (Array.isArray(parameters)) { + return parameters; + } + let result: Record = {}; + for (let key in parameters) { + const value = parameters[key]; + let name = key; + const prefix = key[0]; + // better-sqlite doesn't support the explicit prefix - strip it + if (prefix == ':' || prefix == '?' || prefix == '$' || prefix == '@') { + name = key.substring(1); + } + result[name] = value; + } + return result; +} From 8c1ea50de075f9eff8e3a0eca74607a861a4f5f3 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 29 Aug 2024 17:12:52 +0200 Subject: [PATCH 4/5] Update high-level API implementation. --- packages/api/src/api.ts | 6 +- packages/api/src/impl.ts | 136 +++++------------- .../better-sqlite3-driver/src/sync-driver.ts | 29 ++-- packages/driver/src/driver-api.ts | 4 +- packages/driver/src/node/impl.ts | 4 +- packages/driver/src/util/ErrorStatement.ts | 4 +- .../src/worker_threads/worker-driver.ts | 4 +- 7 files changed, 65 insertions(+), 122 deletions(-) diff --git a/packages/api/src/api.ts b/packages/api/src/api.ts index 41e2e45..9ad22ee 100644 --- a/packages/api/src/api.ts +++ b/packages/api/src/api.ts @@ -61,11 +61,7 @@ export interface ReservedSqliteConnection extends SqliteConnection { } export interface QueryInterface { - prepare( - query: string, - args?: SqliteArguments, - options?: QueryOptions - ): PreparedQuery; + prepare(query: string): PreparedQuery; run( query: string, diff --git a/packages/api/src/impl.ts b/packages/api/src/impl.ts index 6e66d8d..6bcc783 100644 --- a/packages/api/src/impl.ts +++ b/packages/api/src/impl.ts @@ -54,11 +54,8 @@ export class ConnectionPoolImpl throw new Error('Method not implemented.'); } - prepare( - sql: string, - args?: SqliteArguments - ): PreparedQuery { - return new ConnectionPoolPreparedQueryImpl(this, sql, args); + prepare(sql: string): PreparedQuery { + return new ConnectionPoolPreparedQueryImpl(this, sql); } pipeline(options?: ReserveConnectionOptions | undefined): QueryPipeline { @@ -202,12 +199,8 @@ export class ReservedConnectionImpl implements ReservedSqliteConnection { } } - prepare( - sql: string, - args?: SqliteArguments, - options?: QueryOptions - ): PreparedQuery { - return this.connection.prepare(sql, args, options); + prepare(sql: string): PreparedQuery { + return this.connection.prepare(sql); } pipeline(): QueryPipeline { @@ -294,12 +287,12 @@ export class ConnectionImpl implements SqliteConnection { constructor(private driver: SqliteDriverConnection) {} private init() { - this._beginExclusive ??= this.prepare('BEGIN EXCLUSIVE', undefined, { + this._beginExclusive ??= this.prepare('BEGIN EXCLUSIVE', { autoFinalize: true }); - this._begin ??= this.prepare('BEGIN', undefined, { autoFinalize: true }); - this.commit ??= this.prepare('COMMIT', undefined, { autoFinalize: true }); - this.rollback ??= this.prepare('ROLLBACK', undefined, { + this._begin ??= this.prepare('BEGIN', { autoFinalize: true }); + this.commit ??= this.prepare('COMMIT', { autoFinalize: true }); + this.rollback ??= this.prepare('ROLLBACK', { autoFinalize: true }); } @@ -366,20 +359,10 @@ export class ConnectionImpl implements SqliteConnection { prepare( sql: string, - args?: SqliteArguments, options?: PrepareOptions ): PreparedQuery { const statement = this.driver.prepare(sql, options); - if (args) { - statement.bind(args); - } - return new ConnectionPreparedQueryImpl( - this, - this.driver, - statement, - sql, - args - ); + return new ConnectionPreparedQueryImpl(this, this.driver, statement, sql); } pipeline(): QueryPipeline { @@ -388,34 +371,22 @@ export class ConnectionImpl implements SqliteConnection { async run(query: string, args: SqliteArguments): Promise { using statement = this.driver.prepare(query); - if (args != null) { - statement.bind(args); - } - return await statement.run(); + return await statement.run(args); } async *stream( - query: string | PreparedQuery, + query: string, args: SqliteArguments | undefined, options?: StreamOptions | undefined ): AsyncGenerator { - using statement = this.driver.prepare(query as string, { + using statement = this.driver.prepare(query); + const chunkSize = options?.chunkSize; + + const iter = statement.stream(args, { + chunkMaxRows: chunkSize, bigint: options?.bigint }); - if (args != null) { - statement.bind(args); - } - const chunkSize = options?.chunkSize ?? 100; - - while (true) { - const { rows, done } = await statement.step(chunkSize); - if (rows != null) { - yield rows as T[]; - } - if (done) { - break; - } - } + yield* iter as AsyncGenerator; } async select( @@ -423,14 +394,8 @@ export class ConnectionImpl implements SqliteConnection { args?: SqliteArguments, options?: (QueryOptions & ReserveConnectionOptions) | undefined ): Promise { - using statement = this.driver.prepare(query, { - bigint: options?.bigint, - rawResults: false - }); - if (args != null) { - statement.bind(args); - } - const { rows } = await statement.step(); + using statement = this.driver.prepare(query); + const rows = await statement.all(args, { bigint: options?.bigint }); return rows as T[]; } @@ -469,12 +434,8 @@ export class TransactionImpl implements SqliteTransaction { await this.con.rollback!.select(); } - prepare( - sql: string, - args?: SqliteArguments, - options?: QueryOptions - ): PreparedQuery { - const q = this.con.prepare(sql, args, options); + prepare(sql: string): PreparedQuery { + const q = this.con.prepare(sql); // FIXME: auto-dispose these after transaction commit / rollback this.preparedQueries.push(q); return q; @@ -601,8 +562,7 @@ class ConnectionPoolPreparedQueryImpl constructor( private context: ConnectionPoolImpl, - public sql: string, - public args: SqliteArguments + public sql: string ) { if (typeof Symbol.dispose != 'undefined') { this[Symbol.dispose] = () => this.dispose(); @@ -663,7 +623,7 @@ class ConnectionPoolPreparedQueryImpl const cimpl = connection as ConnectionImpl; let sub = this.byConnection.get(cimpl); if (sub == null) { - sub = cimpl.prepare(this.sql, this.args); + sub = cimpl.prepare(this.sql); this.byConnection.set(cimpl, sub); } return sub; @@ -681,8 +641,7 @@ class ConnectionPreparedQueryImpl private context: ConnectionImpl, private driver: SqliteDriverConnection, public statement: SqliteDriverStatement, - public sql: string, - public args: SqliteArguments + public sql: string ) { if (typeof Symbol.dispose != 'undefined') { this[Symbol.dispose] = () => this.dispose(); @@ -700,42 +659,22 @@ class ConnectionPreparedQueryImpl args?: SqliteArguments, options?: StreamOptions | undefined ): AsyncGenerator { - const chunkSize = options?.chunkSize ?? 10; - if (args != null) { - this.statement.bind(args); - } - try { - while (true) { - const { rows, done } = await this.statement.step(chunkSize); - if (rows != null) { - yield rows as T[]; - } - if (done) { - break; - } - } - } finally { - this.statement.reset(); + const chunkSize = options?.chunkSize; + const iter = this.statement.stream(args, { + chunkMaxRows: chunkSize + }); + for await (let chunk of iter) { + yield chunk as T[]; } } async run(args?: SqliteArguments): Promise { - if (args != null) { - this.statement.bind(args); - } - return await this.statement.run(); + return await this.statement.run(args); } async select(args?: SqliteArguments): Promise { - try { - if (args != null) { - this.statement.bind(args); - } - const { rows } = await this.statement.step(); - return rows as T[]; - } finally { - this.statement.reset(); - } + const rows = await this.statement.all(args); + return rows as T[]; } dispose(): void { @@ -753,19 +692,14 @@ class QueryPipelineImpl implements QueryPipeline { this.count += 1; if (typeof query == 'string') { using statement = this.driver.prepare(query); - if (args) { - statement.bind(args); - } - this.lastPromise = statement.step(undefined, { + this.lastPromise = statement.run(args, { requireTransaction: true }); } else if (query instanceof ConnectionPreparedQueryImpl) { const statement = query.statement; - statement.bind(args ?? []); - this.lastPromise = statement.step(undefined, { + this.lastPromise = statement.run(args, { requireTransaction: true }); - statement.reset(); } else { throw new Error('not implemented yet'); } diff --git a/packages/better-sqlite3-driver/src/sync-driver.ts b/packages/better-sqlite3-driver/src/sync-driver.ts index 0509ea5..865362a 100644 --- a/packages/better-sqlite3-driver/src/sync-driver.ts +++ b/packages/better-sqlite3-driver/src/sync-driver.ts @@ -69,10 +69,15 @@ class BetterSqlitePreparedStatement implements InternalStatement { const statement = this.statement; - statement.safeIntegers(options?.bigint ?? false); - statement.raw(array); - const r = statement.all(sanitizeParameters(parameters)); - return r; + if (statement.reader) { + statement.safeIntegers(options?.bigint ?? false); + statement.raw(array); + const rows = statement.all(sanitizeParameters(parameters)); + return rows; + } else { + statement.run(sanitizeParameters(parameters)); + return []; + } } async all( @@ -106,8 +111,13 @@ class BetterSqlitePreparedStatement implements InternalStatement { const statement = this.statement; - statement.safeIntegers(options?.bigint ?? false); - statement.raw(array); + if (statement.reader) { + statement.safeIntegers(options?.bigint ?? false); + statement.raw(array); + } else { + statement.run(sanitizeParameters(parameters)); + return; + } const iter = statement.iterate(sanitizeParameters(parameters)); const maxBuffer = options?.chunkMaxRows ?? 100; let buffer: any[] = []; @@ -118,12 +128,15 @@ class BetterSqlitePreparedStatement implements InternalStatement { buffer = []; } } + if (buffer.length > 0) { + yield buffer; + } } async *stream( parameters?: SqliteParameterBinding, options?: StreamQueryOptions - ): AsyncIterator { + ): AsyncIterableIterator { try { yield* this._stream(parameters, options, false); } catch (e) { @@ -134,7 +147,7 @@ class BetterSqlitePreparedStatement implements InternalStatement { async *streamArray( parameters?: SqliteParameterBinding, options?: StreamQueryOptions - ): AsyncIterator { + ): AsyncIterableIterator { try { yield* this._stream(parameters, options, true); } catch (e) { diff --git a/packages/driver/src/driver-api.ts b/packages/driver/src/driver-api.ts index 7cf4784..38050fb 100644 --- a/packages/driver/src/driver-api.ts +++ b/packages/driver/src/driver-api.ts @@ -61,11 +61,11 @@ export interface SqliteDriverStatement { stream( parameters?: SqliteParameterBinding, options?: StreamQueryOptions - ): AsyncIterator; + ): AsyncIterableIterator; streamArray( parameters?: SqliteParameterBinding, options?: StreamQueryOptions - ): AsyncIterator; + ): AsyncIterableIterator; /** * Run a query, and return the number of changed rows, and last insert id. diff --git a/packages/driver/src/node/impl.ts b/packages/driver/src/node/impl.ts index d3b2798..e1a6a63 100644 --- a/packages/driver/src/node/impl.ts +++ b/packages/driver/src/node/impl.ts @@ -114,7 +114,7 @@ class NodeSqliteSyncStatement implements InternalStatement { async *stream( parameters: SqliteParameterBinding, options: StreamQueryOptions - ): AsyncIterator { + ): AsyncIterableIterator { const rows = await this.all(parameters, options); yield rows; } @@ -122,7 +122,7 @@ class NodeSqliteSyncStatement implements InternalStatement { streamArray( parameters: SqliteParameterBinding, options: StreamQueryOptions - ): AsyncIterator { + ): AsyncIterableIterator { throw new Error('array rows are not supported'); } diff --git a/packages/driver/src/util/ErrorStatement.ts b/packages/driver/src/util/ErrorStatement.ts index 7d47d7b..17e7ae9 100644 --- a/packages/driver/src/util/ErrorStatement.ts +++ b/packages/driver/src/util/ErrorStatement.ts @@ -46,13 +46,13 @@ export class ErrorStatement implements SqliteDriverStatement { async *stream( parameters: SqliteParameterBinding, options: StreamQueryOptions - ): AsyncIterator { + ): AsyncIterableIterator { throw this.error; } async *streamArray( parameters: SqliteParameterBinding, options: StreamQueryOptions - ): AsyncIterator { + ): AsyncIterableIterator { throw this.error; } diff --git a/packages/driver/src/worker_threads/worker-driver.ts b/packages/driver/src/worker_threads/worker-driver.ts index 7ce3016..b8d1c74 100644 --- a/packages/driver/src/worker_threads/worker-driver.ts +++ b/packages/driver/src/worker_threads/worker-driver.ts @@ -226,14 +226,14 @@ class WorkerDriverStatement implements SqliteDriverStatement { stream( parameters: SqliteParameterBinding, options?: StreamQueryOptions - ): AsyncIterator { + ): AsyncIterableIterator { throw new Error('Method not implemented.'); } streamArray( parameters: SqliteParameterBinding, options?: StreamQueryOptions - ): AsyncIterator { + ): AsyncIterableIterator { throw new Error('Method not implemented.'); } From 842db77ecc697ebbf42e8bfc3963a4dd9ef4b648 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 29 Aug 2024 17:19:48 +0200 Subject: [PATCH 5/5] Document APIs. --- DRIVER-API.md | 4 --- packages/driver/src/driver-api.ts | 32 ++++++++++++++++--- .../src/worker_threads/worker-driver.ts | 13 ++++---- 3 files changed, 33 insertions(+), 16 deletions(-) diff --git a/DRIVER-API.md b/DRIVER-API.md index 8e57090..a61cc2e 100644 --- a/DRIVER-API.md +++ b/DRIVER-API.md @@ -110,10 +110,6 @@ Returning an array of cells for each row, along with a separate "columns" array, However, many current SQLite bindings do not expose the raw array calls. Even if they do, this path may be slower than using objects from the start. Since using the results as an array is quite rare in practice, this is left as an optional configuration, rather than a requirement for the all queries. -### Separate bind/step/reset - -This allows a lot of flexibility, for example partial rebinding of parameters instead of specifying all parameters each time a prepared statement is used. However, those type of use cases are rare, and this is not important in the overall architecture. These could all be combined into a single "query with parameters" call, but would need to take into account optional streaming of results. - ### bigint SQLite supports up to 8-byte signed integers (up to 2^64-1), while JavaScript's number is limited to 2^53-1. General approaches include: diff --git a/packages/driver/src/driver-api.ts b/packages/driver/src/driver-api.ts index 38050fb..39932b9 100644 --- a/packages/driver/src/driver-api.ts +++ b/packages/driver/src/driver-api.ts @@ -12,10 +12,6 @@ export interface PrepareOptions { autoFinalize?: boolean; } -export interface ResetOptions { - clearBindings?: boolean; -} - export interface SqliteDriverConnection { /** * Prepare a statement. @@ -49,19 +45,43 @@ export interface StreamQueryOptions extends QueryOptions { } export interface SqliteDriverStatement { + /** + * Run a query, and return results as an array of row objects. + * + * If the query does not return results, an empty array is returned. + */ all( parameters?: SqliteParameterBinding, options?: QueryOptions ): Promise; + + /** + * Run a query, and return results as an array of row arrays. + * + * If the query does not return results, an empty array is returned. + */ allArray( parameters?: SqliteParameterBinding, options?: QueryOptions ): Promise; + /** + * Run a query, and return as an iterator of array of row object chunks. + * + * It is an error to call any other query methods on the same statement + * before the iterator has returned. + */ stream( parameters?: SqliteParameterBinding, options?: StreamQueryOptions ): AsyncIterableIterator; + + /** + * Run a query, and return as an iterator of array of row array chunks. + * + * It is an error to call any other query methods on the same statement + * before the iterator has returned. + */ streamArray( parameters?: SqliteParameterBinding, options?: StreamQueryOptions @@ -75,6 +95,9 @@ export interface SqliteDriverStatement { options?: QueryOptions ): Promise; + /** + * Get the column names of the data returned by the query. + */ getColumns(): Promise; finalize(): void; @@ -86,7 +109,6 @@ export interface SqliteDriverConnectionPool { * Reserve a connection for exclusive use. * * If there is no available connection, this will wait until one is available. - * @param options */ reserveConnection( options?: ReserveConnectionOptions diff --git a/packages/driver/src/worker_threads/worker-driver.ts b/packages/driver/src/worker_threads/worker-driver.ts index b8d1c74..d6722ab 100644 --- a/packages/driver/src/worker_threads/worker-driver.ts +++ b/packages/driver/src/worker_threads/worker-driver.ts @@ -1,16 +1,15 @@ import * as worker_threads from 'worker_threads'; import { PrepareOptions, - ResetOptions, - SqliteDriverConnection, - SqliteDriverStatement, - SqliteParameterBinding, - SqliteChanges, - UpdateListener, QueryOptions, SqliteArrayRow, + SqliteChanges, + SqliteDriverConnection, + SqliteDriverStatement, SqliteObjectRow, - StreamQueryOptions + SqliteParameterBinding, + StreamQueryOptions, + UpdateListener } from '../driver-api.js'; import { Deferred } from '../deferred.js';