Skip to content

Commit

Permalink
Improved auto-commit operations by detecting sql is a transaction com…
Browse files Browse the repository at this point in the history
…mand
  • Loading branch information
erayhanoglu committed Nov 20, 2020
1 parent bf63a13 commit ab8e698
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 91 deletions.
165 changes: 87 additions & 78 deletions src/IntlConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {GlobalTypeMap} from './DataTypeMap';
import {convertRowToObject, getParsers, parseRow, wrapRowDescription} from './common';
import {escapeLiteral} from './util/escape-literal';
import DataFormat = Protocol.DataFormat;
import {coerceToBoolean} from 'putil-varhelpers';

export class IntlConnection extends SafeEventEmitter {
protected _refCount = 0;
Expand Down Expand Up @@ -101,102 +102,29 @@ export class IntlConnection extends SafeEventEmitter {
async execute(sql: string, options?: ScriptExecuteOptions, cb?: (event: string, ...args: any[]) => void): Promise<ScriptResult> {
this.assertConnected();
return this.statementQueue.enqueue<ScriptResult>(async (): Promise<ScriptResult> => {
this.ref();
try {
const startTime = Date.now();
const result: ScriptResult = {
totalCommands: 0,
totalTime: 0,
results: [],
};
const opts = options || {};
const autoCommit = opts.autoCommit != null ? opts.autoCommit : this.config.autoCommit;
const commit = autoCommit == true && this.inTransaction;
if (autoCommit == false)
sql = 'BEGIN;' + sql;
else if (commit)
sql += ';\nCOMMIT;';

this.socket.sendQueryMessage(sql);
let currentStart = Date.now();
let parsers;
let current: CommandResult = {command: undefined};
let fields: Protocol.RowDescription[];
const typeMap = opts.typeMap || GlobalTypeMap;
let commandIdx = 0;
return this.socket.capture(async (code: Protocol.BackendMessageCode, msg: any,
done: (err?: Error, result?: any) => void) => {
switch (code) {
case Protocol.BackendMessageCode.NoticeResponse:
case Protocol.BackendMessageCode.CopyInResponse:
case Protocol.BackendMessageCode.CopyOutResponse:
case Protocol.BackendMessageCode.EmptyQueryResponse:
break;
case Protocol.BackendMessageCode.RowDescription:
fields = msg.fields;
parsers = getParsers(typeMap, fields);
current.fields = wrapRowDescription(typeMap, fields, DataFormat.text);
current.rows = [];
break;
case Protocol.BackendMessageCode.DataRow:
let row = msg.columns.map((x: Buffer) => x.toString('utf8'));
parseRow(parsers, row, opts);
if (opts.objectRows && current.fields)
row = convertRowToObject(current.fields, row);
if (cb) cb('row', row);
current.rows = current.rows || [];
current.rows.push(row);
break;
case Protocol.BackendMessageCode.CommandComplete:
// Ignore BEGIN command that we added to sql
if (autoCommit == false && commandIdx++ == 0)
break;
current.command = msg.command;
if (current.command === 'DELETE' ||
current.command === 'INSERT' ||
current.command === 'UPDATE')
current.rowsAffected = msg.rowCount;
current.executeTime = Date.now() - currentStart;
result.results.push(current);
result.totalCommands++;
if (cb) cb('command-complete', current);
current = {command: undefined};
currentStart = Date.now();
break;
case Protocol.BackendMessageCode.ReadyForQuery:
this.transactionStatus = msg.status;
result.totalTime = Date.now() - startTime;
// Ignore COMMIT command that we added to sql
if (commit)
result.results.pop();
done(undefined, result);
}
});
} finally {
this.unref();
}
return this._execute(sql, options, cb);
});
}

async startTransaction(): Promise<void> {
if (!this.inTransaction)
await this.execute('BEGIN', {autoCommit: false});
await this.execute('BEGIN');
}

async savepoint(name: string): Promise<void> {
if (!(name && name.match(/^[a-zA-Z]\w+$/)))
throw new Error(`Invalid savepoint "${name}`);
await this.execute('BEGIN; SAVEPOINT ' + name, {autoCommit: false});
await this.execute('BEGIN; SAVEPOINT ' + name);
}

async commit(): Promise<void> {
if (this.inTransaction)
await this.execute('COMMIT', {autoCommit: false});
await this.execute('COMMIT');
}

async rollback(): Promise<void> {
if (this.inTransaction)
await this.execute('ROLLBACK', {autoCommit: false});
await this.execute('ROLLBACK');
}

async rollbackToSavepoint(name: string): Promise<void> {
Expand All @@ -220,6 +148,87 @@ export class IntlConnection extends SafeEventEmitter {
throw new Error('Connection closed');
}

protected async _execute(sql: string, options?: ScriptExecuteOptions, cb?: (event: string, ...args: any[]) => void): Promise<ScriptResult> {
this.ref();
try {
const startTime = Date.now();
const result: ScriptResult = {
totalCommands: 0,
totalTime: 0,
results: [],
};
const opts = options || {};
const transactionCommand = sql.match(/^(\bBEGIN\b|\bCOMMIT\b|\bROLLBACK\b)/i) &&
!sql.match(/^\bROLLBACK TO SAVEPOINT\b/i);
const autoCommit = coerceToBoolean(opts.autoCommit != null ?
opts.autoCommit : this.config.autoCommit, true);
const beginAtFirst = !autoCommit && !transactionCommand;
const commitLast = autoCommit && !transactionCommand;
if (beginAtFirst)
sql = 'BEGIN;\n' + sql;
if (commitLast)
sql += ';\nCOMMIT;';

this.socket.sendQueryMessage(sql);
let currentStart = Date.now();
let parsers;
let current: CommandResult = {command: undefined};
let fields: Protocol.RowDescription[];
const typeMap = opts.typeMap || GlobalTypeMap;
let commandIdx = 0;
return this.socket.capture(async (code: Protocol.BackendMessageCode, msg: any,
done: (err?: Error, result?: any) => void) => {
switch (code) {
case Protocol.BackendMessageCode.NoticeResponse:
case Protocol.BackendMessageCode.CopyInResponse:
case Protocol.BackendMessageCode.CopyOutResponse:
case Protocol.BackendMessageCode.EmptyQueryResponse:
break;
case Protocol.BackendMessageCode.RowDescription:
fields = msg.fields;
parsers = getParsers(typeMap, fields);
current.fields = wrapRowDescription(typeMap, fields, DataFormat.text);
current.rows = [];
break;
case Protocol.BackendMessageCode.DataRow:
let row = msg.columns.map((x: Buffer) => x.toString('utf8'));
parseRow(parsers, row, opts);
if (opts.objectRows && current.fields)
row = convertRowToObject(current.fields, row);
if (cb) cb('row', row);
current.rows = current.rows || [];
current.rows.push(row);
break;
case Protocol.BackendMessageCode.CommandComplete:
// Ignore BEGIN command that we added to sql
if (beginAtFirst && commandIdx++ == 0)
break;
current.command = msg.command;
if (current.command === 'DELETE' ||
current.command === 'INSERT' ||
current.command === 'UPDATE')
current.rowsAffected = msg.rowCount;
current.executeTime = Date.now() - currentStart;
result.results.push(current);
if (cb) cb('command-complete', current);
current = {command: undefined};
currentStart = Date.now();
break;
case Protocol.BackendMessageCode.ReadyForQuery:
this.transactionStatus = msg.status;
result.totalTime = Date.now() - startTime;
// Ignore COMMIT command that we added to sql
if (commitLast)
result.results.pop();
result.totalCommands = result.results.length;
done(undefined, result);
}
});
} finally {
this.unref();
}
}

protected _onError(err: Error): void {
if (this.socket.state !== ConnectionState.READY)
return;
Expand Down
22 changes: 10 additions & 12 deletions src/PreparedStatement.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {Cursor} from './Cursor';
import {Portal} from './Portal';
import {convertRowToObject, getIntlConnection, getParsers, parseRow, wrapRowDescription} from './common';
import {GlobalTypeMap} from './DataTypeMap';
import {coerceToBoolean} from "putil-varhelpers";

let statementCounter = 0;
let portalCounter = 0;
Expand Down Expand Up @@ -102,19 +103,16 @@ export class PreparedStatement extends SafeEventEmitter {

async execute(options: QueryOptions = {}): Promise<QueryResult> {
const intoCon = getIntlConnection(this.connection);
const autoCommit = options.autoCommit != null ? options.autoCommit : intoCon.config.autoCommit;
if (autoCommit == false)
const transactionCommand = this.sql.match(/^(\bBEGIN\b|\bCOMMIT\b|\bROLLBACK\b)/i) &&
!this.sql.match(/^\bROLLBACK TO SAVEPOINT\b/i);
const autoCommit = coerceToBoolean(options.autoCommit != null ?
options.autoCommit : intoCon.config.autoCommit, true);
if (!autoCommit && !transactionCommand)
await intoCon.startTransaction();
try {
const result = await intoCon.statementQueue.enqueue<QueryResult>(() => this._execute(options));
if (autoCommit == true)
await intoCon.commit();
return result;
} catch (e) {
if (autoCommit == true)
await intoCon.rollback();
throw e;
}
const result = await intoCon.statementQueue.enqueue<QueryResult>(() => this._execute(options));
if (autoCommit && !transactionCommand)
await intoCon.commit();
return result;
}

async close(): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion test/B-connection/03-execute-script.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ describe('execute() (Simple Query)', function () {
})

it('should execute sql script', async function () {
const result = await connection.execute(`select 1`);
const result = await connection.execute(`select 1;`);
assert.ok(result);
assert.strictEqual(result.totalCommands, 1);
assert.strictEqual(result.results.length, 1);
Expand Down

0 comments on commit ab8e698

Please sign in to comment.