Skip to content

Commit

Permalink
Merge pull request #463 from vitaly-t/#461-event-receive
Browse files Browse the repository at this point in the history
#461 event receive
  • Loading branch information
vitaly-t committed Feb 24, 2018
2 parents 07d66e0 + af3725b commit 90a410e
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 38 deletions.
33 changes: 17 additions & 16 deletions lib/events.js
Expand Up @@ -142,39 +142,40 @@ const $events = {
* @description
* Global notification of any data received from the database, coming from a regular query or from a stream.
*
* The event is fired before the data reaches the client, and only when receiving 1 or more records.
*
* This event notification serves two purposes:
* The event is fired before the data reaches the client, and it serves two purposes:
* - Providing selective data logging for debugging;
* - Pre-processing data before it reaches the client.
*
* **NOTES:**
* - If you alter the size of `data` directly or through the `result` object, it may affect `QueryResultMask`
* validation for regular queries, which is executed right after this notification.
* - When adding data pre-processing, you should consider possible performance penalty this may bring.
* validation for regular queries, which is executed right after.
* - Any data pre-processing needs to be fast here, to avoid performance penalties.
* - If the event handler throws an error, the original request will be rejected with that error.
*
* When executing methods {@link Database#multi Database.multi} or {@link Database#multiResult Database.multiResult},
* this event is called once for every non-empty result that's returned.
* For methods {@link Database#multi Database.multi} and {@link Database#multiResult Database.multiResult},
* this event is called for every result that's returned.
*
* @param {array} data
* A non-empty array of received data objects/rows.
* @param {Array<Object>} data
* Array of received objects/rows.
*
* If any of those objects are modified during notification, the client will receive the modified data.
*
* From v6.8.0 onwards, maximum number of rows within `data` equals to the `batchSize` value within
* the $[QueryStream] object.
* For method {@link Database#stream Database.stream}, the maximum number of rows equals the `batchSize`
* value within the $[QueryStream] object.
*
* @param {object} result
* - original $[Result] object, if the data comes from a regular query, in which case `data = result.rows`.
* - `undefined` when the data comes from a stream.
* @param {external:Result} result
* - Original $[Result] object, if the data is from a non-stream query, in which case `data = result.rows`.
* For single-query requests, $[Result] object is extended with property `duration` - number of milliseconds
* it took to send the query, execute it and get the result back.
* - It is `undefined` when the data comes from a stream (method {@link Database#stream Database.stream}).
*
* @param {EventContext} e
* Event Context Object.
*
* @example
*
* // Example below shows the fastest way to camelize all column names:
* // Example below shows the fastest way to camelize all column names.
* // NOTE: The example does not do processing for nested JSON objects.
*
* const initOptions = {
*
Expand All @@ -187,7 +188,7 @@ const $events = {
*
* function camelizeColumns(data) {
* const tmp = data[0];
* for (let prop in tmp) {
* for (const prop in tmp) {
* const camel = pgp.utils.camelize(prop);
* if (!(camel in tmp)) {
* for (let i = 0; i < data.length; i++) {
Expand Down
2 changes: 1 addition & 1 deletion lib/helpers/columnSet.js
Expand Up @@ -213,7 +213,7 @@ function ColumnSet(columns, options) {
this.columns = [columns];
} else {
this.columns = [];
for (let name in columns) {
for (const name in columns) {
if (inherit || Object.prototype.hasOwnProperty.call(columns, name)) {
this.columns.push(new npm.Column(name));
}
Expand Down
2 changes: 1 addition & 1 deletion lib/main.js
Expand Up @@ -171,7 +171,7 @@ function $main(options) {
'connect', 'disconnect', 'query', 'receive', 'task', 'transact', 'error', 'extend'];

if (!options.noWarnings) {
for (let prop in options) {
for (const prop in options) {
if (validOptions.indexOf(prop) === -1) {
npm.con.warn('WARNING: Invalid property \'%s\' in initialization options.\n%s\n', prop, npm.utils.getLocalStack(3));
break;
Expand Down
14 changes: 6 additions & 8 deletions lib/query.js
Expand Up @@ -133,6 +133,7 @@ function $query(ctx, query, values, qrm, config) {
return;
}
try {
const start = Date.now();
ctx.db.client.query(query, params, (err, result) => {
let data, multiResult, lastResult = result;
if (err) {
Expand All @@ -143,17 +144,14 @@ function $query(ctx, query, values, qrm, config) {
lastResult = result[result.length - 1];
for (let i = 0; i < result.length; i++) {
const r = result[i];
if (r.rows.length) {
error = npm.events.receive(opt, r.rows, r, getContext());
if (error) {
break;
}
error = npm.events.receive(opt, r.rows, r, getContext());
if (error) {
break;
}
}
} else {
if (result.rows.length) {
error = npm.events.receive(opt, result.rows, result, getContext());
}
result.duration = Date.now() - start;
error = npm.events.receive(opt, result.rows, result, getContext());
}
}
if (!error) {
Expand Down
4 changes: 2 additions & 2 deletions lib/utils/index.js
Expand Up @@ -58,7 +58,7 @@ function lock(obj, freeze, options) {
configurable: false,
enumerable: true
};
for (let p in obj) {
for (const p in obj) {
Object.defineProperty(obj, p, desc);
}
}
Expand All @@ -68,7 +68,7 @@ function lock(obj, freeze, options) {
// Adds properties from source to the target,
// making them read-only and enumerable.
function addReadProperties(target, source) {
for (let p in source) {
for (const p in source) {
addReadProp(target, p, source[p]);
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/utils/public.js
Expand Up @@ -309,7 +309,7 @@ function objectToCode(obj, cb) {
function generate(obj, level) {
let code = '', idx = 0;
const gap = npm.utils.messageGap(level);
for (let prop in obj) {
for (const prop in obj) {
const value = obj[prop];
if (idx) {
code += ',';
Expand Down
19 changes: 12 additions & 7 deletions test/dbSpec.js
Expand Up @@ -107,23 +107,25 @@ describe('Connection', function () {
});
});

describe('for raw queries', function () {
describe('for raw queries', () => {
let result, sco;
beforeEach(function (done) {
beforeEach(done => {
db.connect()
.then(function (obj) {
.then(obj => {
sco = obj;
return sco.result('select * from users');
}, function (reason) {
})
.catch(reason => {
result = null;
return promise.reject(reason);
})
.then(function (data) {
.then(data => {
result = data;
}, function () {
})
.catch(() => {
result = null;
})
.finally(function () {
.finally(() => {
if (sco) {
sco.done();
}
Expand All @@ -134,6 +136,7 @@ describe('Connection', function () {
expect(isResult(result)).toBe(true);
expect(result.rows.length > 0).toBe(true);
expect(typeof(result.rowCount)).toBe('number');
expect(typeof(result.duration)).toBe('number');
expect(result.rows.length === result.rowCount).toBe(true);
});
});
Expand Down Expand Up @@ -1558,6 +1561,7 @@ describe('Method \'result\'', function () {
it('must resolve with a single Result object', () => {
expect(isResult(result)).toBe(true);
expect(result.rows).toEqual([{one: 1}]);
expect(typeof result.duration).toBe('number');
});
});

Expand All @@ -1573,6 +1577,7 @@ describe('Method \'result\'', function () {
it('must resolve with the last Result object', () => {
expect(isResult(result)).toBe(true);
expect(result.rows).toEqual([{two: 2}]);
expect('duration' in result).toBe(false); // must be present in multi-query results
});
});

Expand Down
26 changes: 26 additions & 0 deletions test/eventSpec.js
Expand Up @@ -513,6 +513,32 @@ describe('Receive event', function () {
value: 123
}]);
expect(isResult(res)).toBe(true);
expect(typeof res.duration).toBe('number');
});
});

describe('for empty queries', function () {
let ctx, data, res, counter = 0;
beforeEach(function (done) {
options.receive = function (d, r, e) {
counter++;
data = d;
res = r;
ctx = e;
};
db.none('delete from users where id = $1', 1234567890)
.then(function () {
done();
});
});
it('must pass in correct empty data and context', function () {
expect(counter).toBe(1);
expect(ctx.query).toBe('delete from users where id = 1234567890');
expect(ctx.params).toBeUndefined();
expect(ctx.dc).toBe(testDC);
expect(data).toEqual([]);
expect(isResult(res)).toBe(true);
expect(typeof res.duration).toBe('number');
});
});

Expand Down
11 changes: 9 additions & 2 deletions typescript/pg-promise.d.ts
Expand Up @@ -350,6 +350,13 @@ declare namespace pgPromise {
readonly $pool: any
}

interface IResultExt extends pg.IResult {
// Property 'duration' exists only in the following context:
// - for single-query events 'receive'
// - for method Database.result
duration?: number;
}

type TConfig = pg.TConnectionParameters

// Post-initialization interface;
Expand Down Expand Up @@ -411,7 +418,7 @@ declare namespace pgPromise {
any<T=any>(query: TQuery, values?: any): XPromise<T[]>

// API: http://vitaly-t.github.io/pg-promise/Database.html#result
result<T=pg.IResult>(query: TQuery, values?: any, cb?: (value: pg.IResult) => T, thisArg?: any): XPromise<T>
result<T=IResultExt>(query: TQuery, values?: any, cb?: (value: IResultExt) => T, thisArg?: any): XPromise<T>

// API: http://vitaly-t.github.io/pg-promise/Database.html#multiResult
multiResult(query: TQuery, values?: any): XPromise<pg.IResult[]>
Expand Down Expand Up @@ -600,7 +607,7 @@ declare namespace pgPromise {
connect?: (client: pg.Client, dc: any, fresh: boolean) => void
disconnect?: (client: pg.Client, dc: any) => void
query?: (e: IEventContext) => void
receive?: (data: any[], result: pg.IResult, e: IEventContext) => void
receive?: (data: any[], result: IResultExt, e: IEventContext) => void
task?: (e: IEventContext) => void
transact?: (e: IEventContext) => void
error?: (err: any, e: IEventContext) => void
Expand Down

0 comments on commit 90a410e

Please sign in to comment.