Skip to content

Commit

Permalink
add post-query custom attributes hook for spans
Browse files Browse the repository at this point in the history
  • Loading branch information
Alon-Katz committed Nov 30, 2021
1 parent 29e4ab0 commit 105e837
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 9 deletions.
1 change: 1 addition & 0 deletions plugins/node/opentelemetry-instrumentation-pg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ PostgreSQL instrumentation has few options available to choose from. You can set
| Options | Type | Description |
| ------- | ---- | ----------- |
| [`enhancedDatabaseReporting`](https://github.com/open-telemetry/opentelemetry-js-contrib/blob/main/plugins/node/opentelemetry-instrumentation-pg/src/pg.ts#L48) | `boolean` | If true, additional information about query parameters and results will be attached (as `attributes`) to spans representing database operations |
| `postQueryHook` | `PgPostQueryHookFunction` (function) | Function for adding custom attributes before the query is resolved |
| `responseHook` | `PgInstrumentationExecutionResponseHook` (function) | Function for adding custom attributes from db response |

## Supported Versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
PgPoolExtended,
PgPoolCallback,
PgInstrumentationConfig,
QueryContext,
} from './types';
import * as utils from './utils';
import { AttributeNames } from './enums/AttributeNames';
Expand Down Expand Up @@ -114,7 +115,9 @@ export class PgInstrumentation extends InstrumentationBase {
`Patching ${PgInstrumentation.COMPONENT}.Client.prototype.query`
);
return function query(this: PgClientExtended, ...args: unknown[]) {
const pluginConfig = plugin.getConfig() as PgInstrumentationConfig;
let span: Span;
let postQueryHookParams: QueryContext;

// Handle different client.query(...) signatures
if (typeof args[0] === 'string') {
Expand All @@ -124,21 +127,24 @@ export class PgInstrumentation extends InstrumentationBase {
span = utils.handleParameterizedQuery.call(
this,
plugin.tracer,
plugin.getConfig() as PgInstrumentationConfig,
pluginConfig,
query,
params
);
postQueryHookParams = { span, query, params };
} else {
span = utils.handleTextQuery.call(this, plugin.tracer, query);
postQueryHookParams = { span, query };
}
} else if (typeof args[0] === 'object') {
const queryConfig = args[0] as NormalizedQueryConfig;
span = utils.handleConfigQuery.call(
this,
plugin.tracer,
plugin.getConfig() as PgInstrumentationConfig,
pluginConfig,
queryConfig
);
postQueryHookParams = { span, config: queryConfig };
} else {
return utils.handleInvalidQuery.call(
this,
Expand All @@ -154,7 +160,7 @@ export class PgInstrumentation extends InstrumentationBase {
if (typeof args[args.length - 1] === 'function') {
// Patch ParameterQuery callback
args[args.length - 1] = utils.patchCallback(
plugin.getConfig() as PgInstrumentationConfig,
pluginConfig,
span,
args[args.length - 1] as PostgresCallback
);
Expand All @@ -170,7 +176,7 @@ export class PgInstrumentation extends InstrumentationBase {
) {
// Patch ConfigQuery callback
let callback = utils.patchCallback(
plugin.getConfig() as PgInstrumentationConfig,
pluginConfig,
span,
(args[0] as NormalizedQueryConfig).callback!
);
Expand All @@ -185,6 +191,8 @@ export class PgInstrumentation extends InstrumentationBase {
}
}

utils.handlePostQueryHook(pluginConfig, postQueryHookParams);

// Perform the original query
const result: unknown = original.apply(this, args as never);

Expand All @@ -194,11 +202,7 @@ export class PgInstrumentation extends InstrumentationBase {
.then((result: unknown) => {
// Return a pass-along promise which ends the span and then goes to user's orig resolvers
return new Promise(resolve => {
utils.handleExecutionResult(
plugin.getConfig() as PgInstrumentationConfig,
span,
result
);
utils.handleExecutionResult(pluginConfig, span, result);
span.end();
resolve(result);
});
Expand Down
16 changes: 16 additions & 0 deletions plugins/node/opentelemetry-instrumentation-pg/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,28 @@ export interface PgInstrumentationExecutionResponseHook {
(span: api.Span, responseInfo: PgResponseHookInformation): void;
}

export interface QueryContext {
span: api.Span;
query?: string;
config?: NormalizedQueryConfig;
params?: unknown[];
}

export interface PgPostQueryHookFunction {
(ctx: QueryContext): void;
}

export interface PgInstrumentationConfig extends InstrumentationConfig {
/**
* If true, additional information about query parameters will be attached (as `attributes`) to spans representing
*/
enhancedDatabaseReporting?: boolean;

/**
* Function for adding custom attributes before the query is resolved
*/
postQueryHook?: PgPostQueryHookFunction;

/**
* Hook that allows adding custom span attributes based on the data
* returned from "query" Pg actions.
Expand Down
20 changes: 20 additions & 0 deletions plugins/node/opentelemetry-instrumentation-pg/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
PgPoolCallback,
PgPoolExtended,
PgInstrumentationConfig,
QueryContext,
} from './types';
import * as pgTypes from 'pg';
import { PgInstrumentation } from './';
Expand Down Expand Up @@ -169,6 +170,25 @@ export function handleInvalidQuery(
return result;
}

export function handlePostQueryHook(
config: PgInstrumentationConfig,
ctx: QueryContext
) {
if (typeof config.postQueryHook === 'function') {
safeExecuteInTheMiddle(
() => {
config.postQueryHook!(ctx);
},
err => {
if (err) {
diag.error('Error running post query hook', err);
}
},
true
);
}
}

export function handleExecutionResult(
config: PgInstrumentationConfig,
span: Span,
Expand Down
106 changes: 106 additions & 0 deletions plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,5 +546,111 @@ describe('pg@7.x', () => {
client.query('SELECT NOW()').then(queryHandler);
});
});

it('should call postQueryHook with query text if set', async () => {
instrumentation.disable();
let called = false;
const query = 'SELECT NOW()';
const config: PgInstrumentationConfig = {
postQueryHook: ctx => {
called = true;
assert.strictEqual(ctx.query, query);
assert.strictEqual(ctx.params, undefined);
},
};
instrumentation.setConfig(config);
instrumentation.enable();

const attributes = {
...DEFAULT_ATTRIBUTES,
[SemanticAttributes.DB_STATEMENT]: query,
};
const events: TimedEvent[] = [];
const span = tracer.startSpan('test span');
await context.with(trace.setSpan(context.active(), span), async () => {
try {
const resPromise = await client.query(query);
assert.ok(resPromise);
runCallbackTest(span, attributes, events);
} catch (e) {
assert.ok(false, e.message);
}
});
assert.strictEqual(called, true);
});
it('should call postQueryHook with query text and params if set', async () => {
instrumentation.disable();
let called = false;
const values = ['0'];
const query = 'SELECT $1::text';
const config: PgInstrumentationConfig = {
postQueryHook: ctx => {
called = true;
assert.strictEqual(ctx.query, query);
assert.strictEqual(ctx.params, values);
},
};
instrumentation.setConfig(config);
instrumentation.enable();

const attributes = {
...DEFAULT_ATTRIBUTES,
[SemanticAttributes.DB_STATEMENT]: query,
};
const events: TimedEvent[] = [];
const span = tracer.startSpan('test span');
await context.with(trace.setSpan(context.active(), span), async () => {
const resPromise = await client.query(query, values);
try {
assert.ok(resPromise);
runCallbackTest(span, attributes, events);
} catch (e) {
assert.ok(false, e.message);
}
});
assert.strictEqual(called, true);
});
it('should call postQueryHook with query config if set', async () => {
instrumentation.disable();
const name = 'fetch-text';
const query = 'SELECT $1::text';
const values = ['0'];
let called = false;
const config: PgInstrumentationConfig = {
postQueryHook: ctx => {
called = true;
if (!ctx.config) {
assert.ok(false, 'ctx.config was undefined');
}
assert.strictEqual(ctx.config.text, query);
assert.strictEqual(ctx.config.values, values);
},
};
instrumentation.setConfig(config);
instrumentation.enable();

const attributes = {
...DEFAULT_ATTRIBUTES,
[AttributeNames.PG_PLAN]: name,
[SemanticAttributes.DB_STATEMENT]: query,
};
const events: TimedEvent[] = [];
const span = tracer.startSpan('test span');

await context.with(trace.setSpan(context.active(), span), async () => {
try {
const resPromise = await client.query({
name: name,
text: query,
values: values,
});
assert.strictEqual(resPromise.command, 'SELECT');
runCallbackTest(span, attributes, events);
} catch (e) {
assert.ok(false, e.message);
}
});
assert.strictEqual(called, true);
});
});
});

0 comments on commit 105e837

Please sign in to comment.