From 64aa75d121d63ddae4fa4bd2a6e097773f280dfe Mon Sep 17 00:00:00 2001 From: fengmk2 Date: Sat, 10 Jun 2023 13:38:48 +0800 Subject: [PATCH] feat: export connection and query diagnostics_channel (#111) Drop Node.js < 16.17.0 support --- .eslintrc | 5 ++++- .github/workflows/nodejs.yml | 2 +- package.json | 3 ++- src/channels.ts | 35 ++++++++++++++++++++++++++++++ src/client.ts | 42 ++++++++++++++++++++++++++++++------ src/connection.ts | 2 +- src/operator.ts | 30 +++++++++++++++++++++++--- src/transaction.ts | 2 +- src/types.ts | 2 +- test/PoolConfig.test.ts | 40 +++++++++++++++++++++++++++++++++- test/client.test.ts | 7 +++++- test/operator.test.ts | 7 ++++++ 12 files changed, 160 insertions(+), 17 deletions(-) create mode 100644 src/channels.ts diff --git a/.eslintrc b/.eslintrc index 114c344..9bcdb46 100644 --- a/.eslintrc +++ b/.eslintrc @@ -1,3 +1,6 @@ { - "extends": "eslint-config-egg/typescript" + "extends": [ + "eslint-config-egg/typescript", + "eslint-config-egg/lib/rules/enforce-node-prefix" + ] } diff --git a/.github/workflows/nodejs.yml b/.github/workflows/nodejs.yml index 3a3c864..c6ac5fa 100644 --- a/.github/workflows/nodejs.yml +++ b/.github/workflows/nodejs.yml @@ -13,4 +13,4 @@ jobs: uses: node-modules/github-actions/.github/workflows/node-test-mysql.yml@master with: os: 'ubuntu-latest' - version: '16, 18, 20' + version: '16.17.0, 16, 18, 20' diff --git a/package.json b/package.json index b56f6e0..696eb89 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,7 @@ "version": "6.1.0", "description": "Aliyun RDS client", "main": "lib/client.js", + "types": "lib/client.d.ts", "files": [ "lib" ], @@ -43,7 +44,7 @@ "mysql" ], "engines": { - "node": ">= 14.17.0" + "node": ">= 16.17.0" }, "author": "fengmk2 (https://github.com/fengmk2)", "license": "MIT" diff --git a/src/channels.ts b/src/channels.ts new file mode 100644 index 0000000..e867b39 --- /dev/null +++ b/src/channels.ts @@ -0,0 +1,35 @@ +import diagnosticsChannel from 'node:diagnostics_channel'; +import type { PoolConnectionPromisify } from './types'; +import type { RDSClient } from './client'; + +export default { + // pool events https://github.com/mysqljs/mysql#pool-events + connectionNew: diagnosticsChannel.channel('ali-rds:connection:new'), + connectionAcquire: diagnosticsChannel.channel('ali-rds:connection:acquire'), + connectionRelease: diagnosticsChannel.channel('ali-rds:connection:release'), + connectionEnqueue: diagnosticsChannel.channel('ali-rds:connection:enqueue'), + // query + queryStart: diagnosticsChannel.channel('ali-rds:query:start'), + queryEnd: diagnosticsChannel.channel('ali-rds:query:end'), +}; + +export interface ConnectionMessage { + client: RDSClient; + connection: PoolConnectionPromisify; +} + +export interface ConnectionEnqueueMessage { + client: RDSClient; +} + +export interface QueryStartMessage { + connection: PoolConnectionPromisify; + sql: string; +} + +export interface QueryEndMessage { + connection: PoolConnectionPromisify; + sql: string; + duration: number; + error?: Error; +} diff --git a/src/client.ts b/src/client.ts index a73684f..71ec03d 100644 --- a/src/client.ts +++ b/src/client.ts @@ -8,6 +8,8 @@ import { RDSConnection } from './connection'; import { RDSTransaction } from './transaction'; import { RDSPoolConfig } from './PoolConfig'; import literals from './literals'; +import channels from './channels'; +import type { ConnectionMessage, ConnectionEnqueueMessage } from './channels'; interface PoolPromisify extends Omit { query(sql: string): Promise; @@ -39,8 +41,8 @@ export class RDSClient extends Operator { // get connection options from getConnectionConfig method every time if (mysqlOptions.getConnectionConfig) { // eslint-disable-next-line @typescript-eslint/no-var-requires - const Pool = require('mysql/lib/Pool'); - this.#pool = new Pool({ + const MySQLPool = require('mysql/lib/Pool'); + this.#pool = new MySQLPool({ config: new RDSPoolConfig(mysqlOptions, mysqlOptions.getConnectionConfig), }); // override _needsChangeUser to return false @@ -57,11 +59,39 @@ export class RDSClient extends Operator { }); this.#connectionStorage = connectionStorage || new AsyncLocalStorage(); this.#connectionStorageKey = connectionStorageKey || RDSClient.#DEFAULT_STORAGE_KEY; + // https://github.com/mysqljs/mysql#pool-events + this.#pool.on('connection', (connection: PoolConnectionPromisify) => { + channels.connectionNew.publish({ + client: this, + connection, + } as ConnectionMessage); + }); + this.#pool.on('enqueue', () => { + channels.connectionEnqueue.publish({ + client: this, + } as ConnectionEnqueueMessage); + }); + this.#pool.on('acquire', (connection: PoolConnectionPromisify) => { + channels.connectionAcquire.publish({ + client: this, + connection, + } as ConnectionMessage); + }); + this.#pool.on('release', (connection: PoolConnectionPromisify) => { + channels.connectionRelease.publish({ + client: this, + connection, + } as ConnectionMessage); + }); } - // impl Operator._query - protected async _query(sql: string) { - return await this.#pool.query(sql); + async query(sql: string, values?: object | any[]): Promise { + const conn = await this.getConnection(); + try { + return await conn.query(sql, values); + } finally { + conn.release(); + } } get pool() { @@ -197,7 +227,7 @@ export class RDSClient extends Operator { * @param scope - scope with code * @return {Object} - scope return result */ - async beginTransactionScope(scope: TransactionScope) { + async beginTransactionScope(scope: TransactionScope): Promise { let ctx = this.#connectionStorage.getStore(); if (ctx) { return await this.#beginTransactionScope(scope, ctx); diff --git a/src/connection.ts b/src/connection.ts index edad6a0..3d1f091 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -7,7 +7,7 @@ const kWrapToRDS = Symbol('kWrapToRDS'); export class RDSConnection extends Operator { conn: PoolConnectionPromisify; constructor(conn: PoolConnectionPromisify) { - super(); + super(conn); this.conn = conn; if (!this.conn[kWrapToRDS]) { [ diff --git a/src/operator.ts b/src/operator.ts index ccbc2a8..f5110b0 100644 --- a/src/operator.ts +++ b/src/operator.ts @@ -8,7 +8,10 @@ import { LockResult, LockTableOption, SelectOption, UpdateOption, UpdateResult, UpdateRow, + PoolConnectionPromisify, } from './types'; +import channels from './channels'; +import type { QueryStartMessage, QueryEndMessage } from './channels'; const debug = debuglog('ali-rds:operator'); @@ -16,11 +19,22 @@ const debug = debuglog('ali-rds:operator'); * Operator Interface */ export abstract class Operator { + #connection: PoolConnectionPromisify; + constructor(connection?: PoolConnectionPromisify) { + if (connection) { + this.#connection = connection; + } + } + protected beforeQueryHandlers: BeforeQueryHandler[] = []; protected afterQueryHandlers: AfterQueryHandler[] = []; get literals() { return literals; } + get threadId() { + return this.#connection?.threadId; + } + beforeQuery(beforeQueryHandler: BeforeQueryHandler) { this.beforeQueryHandlers.push(beforeQueryHandler); } @@ -66,9 +80,13 @@ export abstract class Operator { } } debug('query %o', sql); - const queryStart = Date.now(); + const queryStart = performance.now(); let rows: any; let lastError: Error | undefined; + channels.queryStart.publish({ + sql, + connection: this.#connection, + } as QueryStartMessage); try { rows = await this._query(sql); if (Array.isArray(rows)) { @@ -83,10 +101,16 @@ export abstract class Operator { debug('query error: %o', err); throw err; } finally { + const duration = Math.floor((performance.now() - queryStart) * 1000) / 1000; + channels.queryEnd.publish({ + sql, + connection: this.#connection, + duration, + error: lastError, + } as QueryEndMessage); if (this.afterQueryHandlers.length > 0) { - const execDuration = Date.now() - queryStart; for (const afterQueryHandler of this.afterQueryHandlers) { - afterQueryHandler(sql, rows, execDuration, lastError); + afterQueryHandler(sql, rows, duration, lastError); } } } diff --git a/src/transaction.ts b/src/transaction.ts index cae6fb4..7174946 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -6,7 +6,7 @@ export class RDSTransaction extends Operator { isRollback = false; conn: RDSConnection | null; constructor(conn: RDSConnection) { - super(); + super(conn.conn); this.conn = conn; } diff --git a/src/types.ts b/src/types.ts index 6a84e75..06356af 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,4 +1,4 @@ -import { AsyncLocalStorage } from 'async_hooks'; +import { AsyncLocalStorage } from 'node:async_hooks'; import type { PoolConnection, PoolConfig, ConnectionConfig } from 'mysql'; import { RDSTransaction } from './transaction'; diff --git a/test/PoolConfig.test.ts b/test/PoolConfig.test.ts index 4c82c29..551e0ee 100644 --- a/test/PoolConfig.test.ts +++ b/test/PoolConfig.test.ts @@ -1,9 +1,11 @@ import { strict as assert } from 'node:assert'; import fs from 'node:fs/promises'; import path from 'node:path'; +import diagnosticsChannel from 'node:diagnostics_channel'; import mm from 'mm'; import config from './config'; import { RDSClient } from '../src/client'; +import type { ConnectionMessage, QueryEndMessage } from '../src/channels'; describe('test/PoolConfig.test.ts', () => { const prefix = 'prefix-PoolConfig' + process.version + '-'; @@ -11,7 +13,29 @@ describe('test/PoolConfig.test.ts', () => { let db: RDSClient; let index = 0; let newConnectionCount = 0; + let newConnectionCountByDiagnosticsChannel = 0; + let queryCount = 0; + let queryErrorCount = 0; + let end = false; + before(async () => { + diagnosticsChannel.subscribe('ali-rds:connection:new', message => { + if (end) return; + const { connection } = message as ConnectionMessage; + console.log('[diagnosticsChannel] connection threadId %o created', connection.threadId); + newConnectionCountByDiagnosticsChannel++; + }); + diagnosticsChannel.subscribe('ali-rds:query:end', message => { + if (end) return; + const { connection, sql, duration, error } = message as QueryEndMessage; + console.log('[diagnosticsChannel] connection threadId %o query %o, duration: %oms, error: %o', + connection.threadId, sql, duration, error); + queryCount++; + if (error) { + queryErrorCount++; + } + }); + db = new RDSClient({ // test getConnectionConfig connectionLimit: 2, @@ -44,7 +68,9 @@ describe('test/PoolConfig.test.ts', () => { }); after(async () => { - return await db.end(); + await db.end(); + assert.equal(queryCount, 7); + end = true; }); afterEach(() => { @@ -79,6 +105,18 @@ describe('test/PoolConfig.test.ts', () => { assert(Array.isArray(results[2])); assert.equal(index, 3); assert.equal(newConnectionCount, 2); + assert.equal(newConnectionCountByDiagnosticsChannel, 2); + }); + + it('should query error', async () => { + assert.equal(queryErrorCount, 0); + await assert.rejects(async () => { + await db.query('show tables wrong sql'); + }, (err: Error) => { + assert.match(err.message, /You have an error in your SQL syntax/); + return true; + }); + assert.equal(queryErrorCount, 1); }); }); }); diff --git a/test/client.test.ts b/test/client.test.ts index c8d8349..accfe2a 100644 --- a/test/client.test.ts +++ b/test/client.test.ts @@ -1,4 +1,4 @@ -import { AsyncLocalStorage } from 'async_hooks'; +import { AsyncLocalStorage } from 'node:async_hooks'; import { strict as assert } from 'node:assert'; import fs from 'node:fs/promises'; import path from 'node:path'; @@ -1315,6 +1315,8 @@ describe('test/client.test.ts', () => { assert(result.insertId > 0); result = await conn.delete(table); assert(result.affectedRows > 0); + assert.equal(typeof conn.threadId, 'number'); + assert(conn.threadId! > 0); conn.release(); }); }); @@ -1417,6 +1419,9 @@ describe('test/client.test.ts', () => { assert.equal(count, 1); await db.beginTransactionScope(async conn => { + assert.equal(typeof conn.threadId, 'number'); + assert(conn.threadId! > 0); + assert.equal(conn.threadId, conn.conn!.threadId); await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) values(?, ?, now(), now())`, [ table, prefix + 'beginTransactionScope1', prefix + 'm@beginTransactionScope1.com' ]); diff --git a/test/operator.test.ts b/test/operator.test.ts index 7c86a93..d421a7d 100644 --- a/test/operator.test.ts +++ b/test/operator.test.ts @@ -24,6 +24,13 @@ describe('test/operator.test.ts', () => { }); }); + describe('get threadId()', () => { + it('should get return undefined when connection not exists', () => { + const op = new CustomOperator(); + assert.equal(op.threadId, undefined); + }); + }); + describe('format()', () => { it('should get literal string', () => { const op = new CustomOperator();