diff --git a/packages/opentelemetry-plugin-postgres/package.json b/packages/opentelemetry-plugin-postgres/package.json index a9e9163ad6..49462f3181 100644 --- a/packages/opentelemetry-plugin-postgres/package.json +++ b/packages/opentelemetry-plugin-postgres/package.json @@ -8,7 +8,7 @@ "repository": "open-telemetry/opentelemetry-js", "scripts": { "test": "nyc ts-mocha -p tsconfig.json 'test/**/*.test.ts'", - "debug": "ts-mocha --inspect-brk --no-timeouts -p tsconfig.json 'test/**/*.test.ts'", + "test:debug": "ts-mocha --inspect-brk --no-timeouts -p tsconfig.json 'test/**/*.test.ts'", "tdd": "yarn test -- --watch-extensions ts --watch", "clean": "rimraf build/*", "check": "gts check", diff --git a/packages/opentelemetry-plugin-postgres/src/enums.ts b/packages/opentelemetry-plugin-postgres/src/enums.ts index 1d3944c172..92578970ef 100644 --- a/packages/opentelemetry-plugin-postgres/src/enums.ts +++ b/packages/opentelemetry-plugin-postgres/src/enums.ts @@ -15,10 +15,22 @@ */ export enum AttributeNames { + // required by https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/data-semantic-conventions.md#databases-client-calls COMPONENT = 'component', - PG_HOST = 'pg.host', - PG_PORT = 'pg.port', - PG_TEXT = 'pg.text', + DB_TYPE = 'db.type', + DB_INSTANCE = 'db.instance', + DB_STATEMENT = 'db.statement', + PEER_ADDRESS = 'peer.address', + PEER_HOST = 'peer.host', + + // optional + DB_USER = 'db.user', + PEER_PORT = 'peer.port', + PEER_IPV4 = 'peer.ipv4', + PEER_IPV6 = 'peer.ipv6', + PEER_SERVICE = 'peer.service', + + // PG specific -- not specified by spec PG_VALUES = 'pg.values', PG_PLAN = 'pg.plan', } diff --git a/packages/opentelemetry-plugin-postgres/src/pg.ts b/packages/opentelemetry-plugin-postgres/src/pg.ts index 6a0763f144..81bf478575 100644 --- a/packages/opentelemetry-plugin-postgres/src/pg.ts +++ b/packages/opentelemetry-plugin-postgres/src/pg.ts @@ -15,15 +15,27 @@ */ import { BasePlugin } from '@opentelemetry/core'; -import { SpanKind } from '@opentelemetry/types'; +import { SpanKind, Span, CanonicalCode } from '@opentelemetry/types'; import { AttributeNames } from './enums'; -import { PostgresCallback, PostgresPluginOptions } from './types'; +import { PostgresPluginOptions, PgClientConnectionParams, PgPluginQueryConfig } from './types'; import * as path from 'path'; import * as pgTypes from 'pg'; import * as shimmer from 'shimmer'; +// Helper function to get a low cardinality command name from the full text query +function getCommandFromText(text?: string): string { + if (text) { + const words = text.split(' '); + if (words && words.length > 0) { + return words[0]; + } + } + return 'unknown'; +} + export class PostgresPlugin extends BasePlugin { - static readonly component = 'pg'; + static readonly COMPONENT = 'pg'; + static readonly BASE_SPAN_NAME = PostgresPlugin.COMPONENT + '.query'; readonly supportedVersions = ['^7.12.1']; protected _config: PostgresPluginOptions; @@ -48,113 +60,131 @@ export class PostgresPlugin extends BasePlugin { } } + // Private helper function to start a span + private _pgStartSpan(client: pgTypes.Client & PgClientConnectionParams) { + return this._tracer.startSpan( + PostgresPlugin.BASE_SPAN_NAME, + { + kind: SpanKind.CLIENT, + parent: this._tracer.getCurrentSpan() || undefined, + attributes: { + [AttributeNames.COMPONENT]: PostgresPlugin.COMPONENT, + [AttributeNames.PEER_HOST]: client.connectionParameters.host, + [AttributeNames.PEER_PORT]: client.connectionParameters.port, + }, + } + ); + } + private _getClientQueryPatch() { const plugin = this; return (original: typeof pgTypes.Client.prototype.query) => { plugin._logger.debug( - `Patching ${PostgresPlugin.component}.Client.prototype.query` + `Patching ${PostgresPlugin.COMPONENT}.Client.prototype.query` ); - return function query(this: pgTypes.Client, ...args: unknown[]) { - // setup span - let callbackProvided: boolean = - args.length > 1 && typeof args[args.length - 1] === 'function'; - const span = plugin._tracer.startSpan( - `${PostgresPlugin.component}.query`, - { - kind: SpanKind.CLIENT, - parent: plugin._tracer.getCurrentSpan() || undefined, - attributes: { - [AttributeNames.COMPONENT]: PostgresPlugin.component, - [AttributeNames.PG_HOST]: (this as any).connectionParameters.host, - [AttributeNames.PG_PORT]: (this as any).connectionParameters.port, - }, - } - ); - - try { - if (typeof args[0] === 'string') { - span.setAttribute(AttributeNames.PG_TEXT, args[0]); - if (args[1] instanceof Array) { - span.setAttribute(AttributeNames.PG_VALUES, args[1]); - if (callbackProvided) { - args[2] = plugin._tracer.bind(args[2]); - } - } else { - if (callbackProvided) { - args[1] = plugin._tracer.bind(args[1]); - } - } + return function query(this: pgTypes.Client & PgClientConnectionParams, ...args: unknown[]) { + let callbackProvided = false; + const span = plugin._pgStartSpan(this); + + // Handle different client.query(...) signatures + if (typeof args[0] === 'string') { + if (args.length > 1 && args[1] instanceof Array) { + _handleParameterizedQuery.call(this, span, ...args); } else { - const config = args[0] as pgTypes.QueryConfig & { - callback?: PostgresCallback; - }; - if (typeof config.name === 'string') { - span.setAttribute(AttributeNames.PG_PLAN, config.name); - } else { - if (typeof config.text === 'string') { - span.setAttribute(AttributeNames.PG_TEXT, config.text); - } - if (config.values instanceof Array) { - span.setAttribute(AttributeNames.PG_VALUES, config.values); - } - } - - if (callbackProvided) { - if (typeof args[1] === 'function') { - args[1] = plugin._tracer.bind(args[1]); - } else if (typeof args[2] === 'function') { - args[2] = plugin._tracer.bind(args[2]); - } - } else if ( - config.callback && - typeof config.callback === 'function' - ) { - callbackProvided = true; - config.callback = plugin._tracer.bind(config.callback); - } + _handleTextQuery.call(this, span, ...args); + } + } else if (typeof args[0] === 'object') { + _handleConfigQuery.call(this, span, ...args); + } + + // Bind callback to parent span + // TODO: end the span + if (args.length > 0) { + if (typeof args[args.length - 1] === 'function') { + args[args.length - 1] = plugin._tracer.bind(args[args.length - 1]); + } else if (typeof (args[0] as PgPluginQueryConfig).callback === 'function') { + (args[0] as PgPluginQueryConfig).callback = plugin._tracer.bind((args[0] as PgPluginQueryConfig).callback); } - } catch (e) { - plugin._logger.warn( - `pg Plugin failed to trace query: error: ${e.message}` - ); - const result = original.apply(this, arguments as any); - span.end(); - return result; } - const queryResult = original.apply(this, args as any); + // Perform the original query + const result: unknown = original.apply(this, args as never); - // No callback was provided, return a promise instead (new as of pg@7.x) + // Bind promise to parent span and end the span + if (result instanceof Promise) { + return plugin._tracer.bind(result + .then((result: unknown) => { + // Return a pass-along promise which ends the span and then goes to user's orig resolvers + return new Promise((resolve, _) => { + span.setStatus({ code: CanonicalCode.OK }); + span.end(); + resolve(result); + }); + }) + .catch((error: Error) => { + return new Promise((_, reject) => { + span.setStatus({ code: CanonicalCode.UNKNOWN }) + span.end(); + reject(error); + }); + })); + } + // else returns void if (!callbackProvided) { - const queryResultPromise = (queryResult as unknown) as Promise< - unknown - >; - return plugin._tracer.bind( - queryResultPromise - .then((result: any) => { - // Return a pass-along promise which ends the span and then goes to user's orig resolvers - return new Promise((resolve, _) => { - span.end(); - resolve(result); - }); - }) - .catch((error: Error) => { - return new Promise((_, reject) => { - span.end(); - reject(error); - }); - }) - ); + span.setStatus({ + code: CanonicalCode.INVALID_ARGUMENT, + message: 'Invalid query provided to the driver' + }); + span.end(); } - - // Else a callback was provided, so just return the result - span.end(); - return queryResult; + return result; // void }; + }; } } + +// Queries where args[0] is a text query and 'values' was not specified +function _handleTextQuery(this: pgTypes.Client & PgClientConnectionParams, span: Span, ...args: unknown[]) { + // Set child span name + const queryCommand = getCommandFromText(args[0] as string); + span.updateName(PostgresPlugin.BASE_SPAN_NAME + ':' + queryCommand); + + // Set attributes + span.setAttribute(AttributeNames.DB_STATEMENT, args[0]); + +} + +// Queries where args[1] is a 'values' array +function _handleParameterizedQuery(this: pgTypes.Client & PgClientConnectionParams, span: Span, ...args: unknown[]) { + // Set child span name + const queryCommand = getCommandFromText(args[0] as string); + span.updateName(PostgresPlugin.BASE_SPAN_NAME + ':' + queryCommand); + + // Set attributes + span.setAttribute(AttributeNames.DB_STATEMENT, args[0]); + span.setAttribute(AttributeNames.PG_VALUES, args[1]); +} + +// Queries where args[0] is a QueryConfig +function _handleConfigQuery(this: pgTypes.Client & PgClientConnectionParams, span: Span, ...args: unknown[]) { + const config = args[0] as PgPluginQueryConfig; + + // Set attributes + span.setAttribute(AttributeNames.DB_STATEMENT, config.text); + if (config.values) { + span.setAttribute(AttributeNames.PG_VALUES, config.values); + } + if (config.name) { + span.setAttribute(AttributeNames.PG_PLAN, config.name); + } + + // Update span name with query command; prefer plan name, if available + const queryCommand = getCommandFromText(config.name || config.text); + span.updateName(PostgresPlugin.BASE_SPAN_NAME + ':' + queryCommand); +} + const basedir = path.dirname(require.resolve('pg')); const version = require(path.join(basedir, '../', 'package.json')).version; -export const plugin = new PostgresPlugin(PostgresPlugin.component, version); +export const plugin = new PostgresPlugin(PostgresPlugin.COMPONENT, version); diff --git a/packages/opentelemetry-plugin-postgres/src/types.ts b/packages/opentelemetry-plugin-postgres/src/types.ts index e74906d6ea..bdc348b2c5 100644 --- a/packages/opentelemetry-plugin-postgres/src/types.ts +++ b/packages/opentelemetry-plugin-postgres/src/types.ts @@ -14,6 +14,21 @@ * limitations under the License. */ +import * as pgTypes from 'pg'; + export interface PostgresPluginOptions {} export type PostgresCallback = (err: Error, res: object) => unknown; + +// These are not included in @types/pg, so manually define them. +// https://github.com/brianc/node-postgres/blob/fde5ec586e49258dfc4a2fcd861fcdecb4794fc3/lib/client.js#L25 +export interface PgClientConnectionParams { + connectionParameters: { + host: string, + port: number + } +} + +export interface PgPluginQueryConfig extends pgTypes.QueryConfig { + callback?: PostgresCallback; +} \ No newline at end of file diff --git a/packages/opentelemetry-plugin-postgres/test/assertionUtils.ts b/packages/opentelemetry-plugin-postgres/test/assertionUtils.ts index 0533d1093f..7daed9c11d 100644 --- a/packages/opentelemetry-plugin-postgres/test/assertionUtils.ts +++ b/packages/opentelemetry-plugin-postgres/test/assertionUtils.ts @@ -42,7 +42,7 @@ export const assertSpan = ( assert.strictEqual( span.attributes[AttributeNames.COMPONENT], - PostgresPlugin.component + PostgresPlugin.COMPONENT ); assert.ok(span.endTime); assert.strictEqual(span.links.length, 0); diff --git a/packages/opentelemetry-plugin-postgres/test/pg.test.ts b/packages/opentelemetry-plugin-postgres/test/pg.test.ts index c8a71ba02e..7bb92e0e0f 100644 --- a/packages/opentelemetry-plugin-postgres/test/pg.test.ts +++ b/packages/opentelemetry-plugin-postgres/test/pg.test.ts @@ -36,7 +36,7 @@ const CONFIG = { password: 'test', database: 'postgres', host: '127.0.0.1', - port: 5432, + port: 54320, }; const runCallbackTest = ( @@ -59,7 +59,7 @@ describe('pg@7.x', () => { const logger = new NoopLogger(); const testPostgres = process.env.TEST_POSTGRES; // For CI: assumes local postgres db is already available const testPostgresLocally = process.env.TEST_POSTGRES_LOCAL; // For local: spins up local postgres db via docker - const shouldTest = testPostgres || testPostgresLocally; // Skips these tests if false (default) + const shouldTest = true || testPostgres || testPostgresLocally; // Skips these tests if false (default) before(function(ready) { if (!shouldTest) { @@ -85,13 +85,11 @@ describe('pg@7.x', () => { } connect(); }); - after(done => { + after((done) => { if (testPostgresLocally) { testUtils.cleanUpDocker(); } - client.end(() => { - done(); - }); + client.end(done); }); beforeEach(function() { @@ -115,7 +113,7 @@ describe('pg@7.x', () => { assert.strictEqual(plugin.moduleName, 'pg'); }); - it('should let the pg module throw its own errors with bad arguments', () => { + it('should maintain pg module error throwing behavior with bad arguments', () => { const assertPgError = (e: Error) => { const src = e.stack!.split('\n').map(line => line.trim())[1]; return /node_modules[/\\]pg/.test(src); @@ -143,12 +141,22 @@ describe('pg@7.x', () => { assert.strictEqual(res, undefined, 'No promise is returned'); }); + it('should return a promise if callback is provided', done => { + const resPromise = client.query('SELECT NOW()'); + resPromise.then(res => { + assert.ok(res); + done(); + }).catch((err: Error) => { + assert.ok(false, err.message); + }); + }); + it('it should intercept client.query(text, callback)', done => { const attributes = { - [AttributeNames.COMPONENT]: PostgresPlugin.component, - [AttributeNames.PG_HOST]: CONFIG.host, - [AttributeNames.PG_PORT]: CONFIG.port, - [AttributeNames.PG_TEXT]: 'SELECT NOW()', + [AttributeNames.COMPONENT]: PostgresPlugin.COMPONENT, + [AttributeNames.PEER_HOST]: CONFIG.host, + [AttributeNames.PEER_PORT]: CONFIG.port, + [AttributeNames.DB_STATEMENT]: 'SELECT NOW()', }; const events: TimedEvent[] = []; const span = tracer.startSpan('test span'); @@ -167,10 +175,10 @@ describe('pg@7.x', () => { const query = 'SELECT $1::text'; const values = ['0']; const attributes = { - [AttributeNames.COMPONENT]: PostgresPlugin.component, - [AttributeNames.PG_HOST]: CONFIG.host, - [AttributeNames.PG_PORT]: CONFIG.port, - [AttributeNames.PG_TEXT]: query, + [AttributeNames.COMPONENT]: PostgresPlugin.COMPONENT, + [AttributeNames.PEER_HOST]: CONFIG.host, + [AttributeNames.PEER_PORT]: CONFIG.port, + [AttributeNames.DB_STATEMENT]: query, [AttributeNames.PG_VALUES]: values, }; const events: TimedEvent[] = []; @@ -189,10 +197,10 @@ describe('pg@7.x', () => { it('should intercept client.query({text, callback})', done => { const query = 'SELECT NOW()'; const attributes = { - [AttributeNames.COMPONENT]: PostgresPlugin.component, - [AttributeNames.PG_HOST]: CONFIG.host, - [AttributeNames.PG_PORT]: CONFIG.port, - [AttributeNames.PG_TEXT]: query, + [AttributeNames.COMPONENT]: PostgresPlugin.COMPONENT, + [AttributeNames.PEER_HOST]: CONFIG.host, + [AttributeNames.PEER_PORT]: CONFIG.port, + [AttributeNames.DB_STATEMENT]: query, }; const events: TimedEvent[] = []; const span = tracer.startSpan('test span'); @@ -213,10 +221,10 @@ describe('pg@7.x', () => { it('should intercept client.query({text}, callback)', done => { const query = 'SELECT NOW()'; const attributes = { - [AttributeNames.COMPONENT]: PostgresPlugin.component, - [AttributeNames.PG_HOST]: CONFIG.host, - [AttributeNames.PG_PORT]: CONFIG.port, - [AttributeNames.PG_TEXT]: query, + [AttributeNames.COMPONENT]: PostgresPlugin.COMPONENT, + [AttributeNames.PEER_HOST]: CONFIG.host, + [AttributeNames.PEER_PORT]: CONFIG.port, + [AttributeNames.DB_STATEMENT]: query, }; const events: TimedEvent[] = []; const span = tracer.startSpan('test span'); @@ -231,14 +239,14 @@ describe('pg@7.x', () => { }); }); - it('should intercept client.query(text, values', async () => { + it('should intercept client.query(text, values)', async () => { const query = 'SELECT $1::text'; const values = ['0']; const attributes = { - [AttributeNames.COMPONENT]: PostgresPlugin.component, - [AttributeNames.PG_HOST]: CONFIG.host, - [AttributeNames.PG_PORT]: CONFIG.port, - [AttributeNames.PG_TEXT]: query, + [AttributeNames.COMPONENT]: PostgresPlugin.COMPONENT, + [AttributeNames.PEER_HOST]: CONFIG.host, + [AttributeNames.PEER_PORT]: CONFIG.port, + [AttributeNames.DB_STATEMENT]: query, [AttributeNames.PG_VALUES]: values, }; const events: TimedEvent[] = []; @@ -258,10 +266,10 @@ describe('pg@7.x', () => { const query = 'SELECT $1::text'; const values = ['0']; const attributes = { - [AttributeNames.COMPONENT]: PostgresPlugin.component, - [AttributeNames.PG_HOST]: CONFIG.host, - [AttributeNames.PG_PORT]: CONFIG.port, - [AttributeNames.PG_TEXT]: query, + [AttributeNames.COMPONENT]: PostgresPlugin.COMPONENT, + [AttributeNames.PEER_HOST]: CONFIG.host, + [AttributeNames.PEER_PORT]: CONFIG.port, + [AttributeNames.DB_STATEMENT]: query, [AttributeNames.PG_VALUES]: values, }; const events: TimedEvent[] = []; @@ -283,10 +291,10 @@ describe('pg@7.x', () => { it('should intercept client.query(text)', async () => { const query = 'SELECT NOW()'; const attributes = { - [AttributeNames.COMPONENT]: PostgresPlugin.component, - [AttributeNames.PG_HOST]: CONFIG.host, - [AttributeNames.PG_PORT]: CONFIG.port, - [AttributeNames.PG_TEXT]: query, + [AttributeNames.COMPONENT]: PostgresPlugin.COMPONENT, + [AttributeNames.PEER_HOST]: CONFIG.host, + [AttributeNames.PEER_PORT]: CONFIG.port, + [AttributeNames.DB_STATEMENT]: query, }; const events: TimedEvent[] = []; const span = tracer.startSpan('test span');