From ab8e698f205dfe81b81b30fe7a7520db2430006c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eray=20Hano=C4=9Flu?= Date: Fri, 20 Nov 2020 10:31:14 +0300 Subject: [PATCH] Improved auto-commit operations by detecting sql is a transaction command --- src/IntlConnection.ts | 165 +++++++++++--------- src/PreparedStatement.ts | 22 ++- test/B-connection/03-execute-script.spec.ts | 2 +- 3 files changed, 98 insertions(+), 91 deletions(-) diff --git a/src/IntlConnection.ts b/src/IntlConnection.ts index d6494d2..96b208e 100644 --- a/src/IntlConnection.ts +++ b/src/IntlConnection.ts @@ -15,6 +15,7 @@ import {GlobalTypeMap} from './DataTypeMap'; import {convertRowToObject, getParsers, parseRow, wrapRowDescription} from './common'; import {escapeLiteral} from './util/escape-literal'; import DataFormat = Protocol.DataFormat; +import {coerceToBoolean} from 'putil-varhelpers'; export class IntlConnection extends SafeEventEmitter { protected _refCount = 0; @@ -101,102 +102,29 @@ export class IntlConnection extends SafeEventEmitter { async execute(sql: string, options?: ScriptExecuteOptions, cb?: (event: string, ...args: any[]) => void): Promise { this.assertConnected(); return this.statementQueue.enqueue(async (): Promise => { - this.ref(); - try { - const startTime = Date.now(); - const result: ScriptResult = { - totalCommands: 0, - totalTime: 0, - results: [], - }; - const opts = options || {}; - const autoCommit = opts.autoCommit != null ? opts.autoCommit : this.config.autoCommit; - const commit = autoCommit == true && this.inTransaction; - if (autoCommit == false) - sql = 'BEGIN;' + sql; - else if (commit) - sql += ';\nCOMMIT;'; - - this.socket.sendQueryMessage(sql); - let currentStart = Date.now(); - let parsers; - let current: CommandResult = {command: undefined}; - let fields: Protocol.RowDescription[]; - const typeMap = opts.typeMap || GlobalTypeMap; - let commandIdx = 0; - return this.socket.capture(async (code: Protocol.BackendMessageCode, msg: any, - done: (err?: Error, result?: any) => void) => { - switch (code) { - case Protocol.BackendMessageCode.NoticeResponse: - case Protocol.BackendMessageCode.CopyInResponse: - case Protocol.BackendMessageCode.CopyOutResponse: - case Protocol.BackendMessageCode.EmptyQueryResponse: - break; - case Protocol.BackendMessageCode.RowDescription: - fields = msg.fields; - parsers = getParsers(typeMap, fields); - current.fields = wrapRowDescription(typeMap, fields, DataFormat.text); - current.rows = []; - break; - case Protocol.BackendMessageCode.DataRow: - let row = msg.columns.map((x: Buffer) => x.toString('utf8')); - parseRow(parsers, row, opts); - if (opts.objectRows && current.fields) - row = convertRowToObject(current.fields, row); - if (cb) cb('row', row); - current.rows = current.rows || []; - current.rows.push(row); - break; - case Protocol.BackendMessageCode.CommandComplete: - // Ignore BEGIN command that we added to sql - if (autoCommit == false && commandIdx++ == 0) - break; - current.command = msg.command; - if (current.command === 'DELETE' || - current.command === 'INSERT' || - current.command === 'UPDATE') - current.rowsAffected = msg.rowCount; - current.executeTime = Date.now() - currentStart; - result.results.push(current); - result.totalCommands++; - if (cb) cb('command-complete', current); - current = {command: undefined}; - currentStart = Date.now(); - break; - case Protocol.BackendMessageCode.ReadyForQuery: - this.transactionStatus = msg.status; - result.totalTime = Date.now() - startTime; - // Ignore COMMIT command that we added to sql - if (commit) - result.results.pop(); - done(undefined, result); - } - }); - } finally { - this.unref(); - } + return this._execute(sql, options, cb); }); } async startTransaction(): Promise { if (!this.inTransaction) - await this.execute('BEGIN', {autoCommit: false}); + await this.execute('BEGIN'); } async savepoint(name: string): Promise { if (!(name && name.match(/^[a-zA-Z]\w+$/))) throw new Error(`Invalid savepoint "${name}`); - await this.execute('BEGIN; SAVEPOINT ' + name, {autoCommit: false}); + await this.execute('BEGIN; SAVEPOINT ' + name); } async commit(): Promise { if (this.inTransaction) - await this.execute('COMMIT', {autoCommit: false}); + await this.execute('COMMIT'); } async rollback(): Promise { if (this.inTransaction) - await this.execute('ROLLBACK', {autoCommit: false}); + await this.execute('ROLLBACK'); } async rollbackToSavepoint(name: string): Promise { @@ -220,6 +148,87 @@ export class IntlConnection extends SafeEventEmitter { throw new Error('Connection closed'); } + protected async _execute(sql: string, options?: ScriptExecuteOptions, cb?: (event: string, ...args: any[]) => void): Promise { + this.ref(); + try { + const startTime = Date.now(); + const result: ScriptResult = { + totalCommands: 0, + totalTime: 0, + results: [], + }; + const opts = options || {}; + const transactionCommand = sql.match(/^(\bBEGIN\b|\bCOMMIT\b|\bROLLBACK\b)/i) && + !sql.match(/^\bROLLBACK TO SAVEPOINT\b/i); + const autoCommit = coerceToBoolean(opts.autoCommit != null ? + opts.autoCommit : this.config.autoCommit, true); + const beginAtFirst = !autoCommit && !transactionCommand; + const commitLast = autoCommit && !transactionCommand; + if (beginAtFirst) + sql = 'BEGIN;\n' + sql; + if (commitLast) + sql += ';\nCOMMIT;'; + + this.socket.sendQueryMessage(sql); + let currentStart = Date.now(); + let parsers; + let current: CommandResult = {command: undefined}; + let fields: Protocol.RowDescription[]; + const typeMap = opts.typeMap || GlobalTypeMap; + let commandIdx = 0; + return this.socket.capture(async (code: Protocol.BackendMessageCode, msg: any, + done: (err?: Error, result?: any) => void) => { + switch (code) { + case Protocol.BackendMessageCode.NoticeResponse: + case Protocol.BackendMessageCode.CopyInResponse: + case Protocol.BackendMessageCode.CopyOutResponse: + case Protocol.BackendMessageCode.EmptyQueryResponse: + break; + case Protocol.BackendMessageCode.RowDescription: + fields = msg.fields; + parsers = getParsers(typeMap, fields); + current.fields = wrapRowDescription(typeMap, fields, DataFormat.text); + current.rows = []; + break; + case Protocol.BackendMessageCode.DataRow: + let row = msg.columns.map((x: Buffer) => x.toString('utf8')); + parseRow(parsers, row, opts); + if (opts.objectRows && current.fields) + row = convertRowToObject(current.fields, row); + if (cb) cb('row', row); + current.rows = current.rows || []; + current.rows.push(row); + break; + case Protocol.BackendMessageCode.CommandComplete: + // Ignore BEGIN command that we added to sql + if (beginAtFirst && commandIdx++ == 0) + break; + current.command = msg.command; + if (current.command === 'DELETE' || + current.command === 'INSERT' || + current.command === 'UPDATE') + current.rowsAffected = msg.rowCount; + current.executeTime = Date.now() - currentStart; + result.results.push(current); + if (cb) cb('command-complete', current); + current = {command: undefined}; + currentStart = Date.now(); + break; + case Protocol.BackendMessageCode.ReadyForQuery: + this.transactionStatus = msg.status; + result.totalTime = Date.now() - startTime; + // Ignore COMMIT command that we added to sql + if (commitLast) + result.results.pop(); + result.totalCommands = result.results.length; + done(undefined, result); + } + }); + } finally { + this.unref(); + } + } + protected _onError(err: Error): void { if (this.socket.state !== ConnectionState.READY) return; diff --git a/src/PreparedStatement.ts b/src/PreparedStatement.ts index 93c1531..3c92147 100644 --- a/src/PreparedStatement.ts +++ b/src/PreparedStatement.ts @@ -15,6 +15,7 @@ import {Cursor} from './Cursor'; import {Portal} from './Portal'; import {convertRowToObject, getIntlConnection, getParsers, parseRow, wrapRowDescription} from './common'; import {GlobalTypeMap} from './DataTypeMap'; +import {coerceToBoolean} from "putil-varhelpers"; let statementCounter = 0; let portalCounter = 0; @@ -102,19 +103,16 @@ export class PreparedStatement extends SafeEventEmitter { async execute(options: QueryOptions = {}): Promise { const intoCon = getIntlConnection(this.connection); - const autoCommit = options.autoCommit != null ? options.autoCommit : intoCon.config.autoCommit; - if (autoCommit == false) + const transactionCommand = this.sql.match(/^(\bBEGIN\b|\bCOMMIT\b|\bROLLBACK\b)/i) && + !this.sql.match(/^\bROLLBACK TO SAVEPOINT\b/i); + const autoCommit = coerceToBoolean(options.autoCommit != null ? + options.autoCommit : intoCon.config.autoCommit, true); + if (!autoCommit && !transactionCommand) await intoCon.startTransaction(); - try { - const result = await intoCon.statementQueue.enqueue(() => this._execute(options)); - if (autoCommit == true) - await intoCon.commit(); - return result; - } catch (e) { - if (autoCommit == true) - await intoCon.rollback(); - throw e; - } + const result = await intoCon.statementQueue.enqueue(() => this._execute(options)); + if (autoCommit && !transactionCommand) + await intoCon.commit(); + return result; } async close(): Promise { diff --git a/test/B-connection/03-execute-script.spec.ts b/test/B-connection/03-execute-script.spec.ts index 5d8384b..defdf60 100644 --- a/test/B-connection/03-execute-script.spec.ts +++ b/test/B-connection/03-execute-script.spec.ts @@ -17,7 +17,7 @@ describe('execute() (Simple Query)', function () { }) it('should execute sql script', async function () { - const result = await connection.execute(`select 1`); + const result = await connection.execute(`select 1;`); assert.ok(result); assert.strictEqual(result.totalCommands, 1); assert.strictEqual(result.results.length, 1);