From 196a3b6e037c0b8f5c8aa50137e869f8e3bd626c Mon Sep 17 00:00:00 2001 From: fengmk2 Date: Thu, 21 Dec 2023 10:07:09 +0800 Subject: [PATCH 1/3] feat: export agent pool stats --- src/HttpClient.ts | 39 ++++++++++++++++++++++- src/index.ts | 8 ++--- test/keep-alive-header.test.ts | 56 ++++++++++++++++++++++------------ 3 files changed, 79 insertions(+), 24 deletions(-) diff --git a/src/HttpClient.ts b/src/HttpClient.ts index 5e9a3bdd..6fd1f1b0 100644 --- a/src/HttpClient.ts +++ b/src/HttpClient.ts @@ -22,7 +22,9 @@ import { Dispatcher, Agent, getGlobalDispatcher, + Pool, } from 'undici'; +import { kClients } from 'undici/lib/core/symbols.js'; import { FormData as FormDataNode } from 'formdata-node'; import { FormDataEncoder } from 'form-data-encoder'; import createUserAgent from 'default-user-agent'; @@ -136,7 +138,7 @@ function defaultIsRetry(response: HttpClientResponse) { return response.status >= 500; } -type RequestContext = { +export type RequestContext = { retries: number; socketErrorRetries: number; requestStartTime?: number; @@ -157,6 +159,20 @@ export type ResponseDiagnosticsMessage = { error?: Error; }; +export interface PoolStat { + /** Number of open socket connections in this pool. */ + connected: number; + /** Number of open socket connections in this pool that do not have an active request. */ + free: number; + /** Number of pending requests across all clients in this pool. */ + pending: number; + /** Number of queued requests across all clients in this pool. */ + queued: number; + /** Number of currently active requests across all clients in this pool. */ + running: number; + /** Number of active, pending, or queued requests across all clients in this pool. */ + size: number; +} export class HttpClient extends EventEmitter { #defaultArgs?: RequestOptions; @@ -187,6 +203,27 @@ export class HttpClient extends EventEmitter { this.#dispatcher = dispatcher; } + getDispatcherPoolStats() { + const agent = this.getDispatcher(); + // origin => Pool Instance + const clients: Map> = agent[kClients]; + const poolStatsMap: Record = {}; + for (const [ key, ref ] of clients) { + const pool = ref.deref(); + const stats = pool?.stats; + if (!stats) continue; + poolStatsMap[key] = { + connected: stats.connected, + free: stats.free, + pending: stats.pending, + queued: stats.queued, + running: stats.running, + size: stats.size, + } satisfies PoolStat; + } + return poolStatsMap; + } + async request(url: RequestURL, options?: RequestOptions) { return await this.#requestInternal(url, options); } diff --git a/src/index.ts b/src/index.ts index 3f6832ee..bec5316c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,7 +2,7 @@ import LRU from 'ylru'; import { HttpClient, HEADER_USER_AGENT } from './HttpClient.js'; import { RequestOptions, RequestURL } from './Request.js'; -let httpclient: HttpClient; +let httpClient: HttpClient; const domainSocketHttpclients = new LRU(50); export async function request(url: RequestURL, options?: RequestOptions) { @@ -17,10 +17,10 @@ export async function request(url: RequestURL, options?: RequestOptions return await domainSocketHttpclient.request(url, options); } - if (!httpclient) { - httpclient = new HttpClient({}); + if (!httpClient) { + httpClient = new HttpClient({}); } - return await httpclient.request(url, options); + return await httpClient.request(url, options); } // export curl method is keep compatible with urllib.curl() diff --git a/test/keep-alive-header.test.ts b/test/keep-alive-header.test.ts index 3e45b4de..3acee10f 100644 --- a/test/keep-alive-header.test.ts +++ b/test/keep-alive-header.test.ts @@ -1,6 +1,6 @@ import { strict as assert } from 'node:assert'; import { describe, it, beforeAll, afterAll } from 'vitest'; -import urllib from '../src'; +import { HttpClient } from '../src'; import { startServer } from './fixtures/server'; import { sleep } from './utils'; @@ -8,6 +8,7 @@ describe('keep-alive-header.test.ts', () => { // should shorter than server keepalive timeout // https://zhuanlan.zhihu.com/p/34147188 const keepAliveTimeout = 2000; + const httpClient = new HttpClient(); let close: any; let _url: string; beforeAll(async () => { @@ -25,97 +26,114 @@ describe('keep-alive-header.test.ts', () => { const max = process.env.TEST_KEEPALIVE_COUNT ? parseInt(process.env.TEST_KEEPALIVE_COUNT) : 3; let otherSideClosed = 0; let readECONNRESET = 0; + const origin = _url.substring(0, _url.length - 1); while (count < max) { count++; try { - let response = await urllib.request(_url); + const task = httpClient.request(_url); + console.log('after request stats: %o', httpClient.getDispatcherPoolStats()); + assert.equal(httpClient.getDispatcherPoolStats()[origin].pending, 1); + assert.equal(httpClient.getDispatcherPoolStats()[origin].size, 1); + let response = await task; + console.log('after response stats: %o', httpClient.getDispatcherPoolStats()); + assert.equal(httpClient.getDispatcherPoolStats()[origin].pending, 0); + assert.equal(httpClient.getDispatcherPoolStats()[origin].connected, 1); // console.log(response.res.socket); assert.equal(response.status, 200); // console.log(response.headers); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); // console.log(response.headers); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); // console.log(response.headers); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); // console.log(response.headers); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); // console.log(response.headers); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); assert(parseInt(response.headers['x-requests-persocket'] as string) > 1); await sleep(keepAliveTimeout / 2); - response = await urllib.request(_url); + response = await httpClient.request(_url); // console.log(response.res.socket); assert.equal(response.status, 200); // console.log(response.headers); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); // console.log(response.headers); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); // console.log(response.headers); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); // console.log(response.headers); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); - response = await urllib.request(_url); + response = await httpClient.request(_url); assert.equal(response.status, 200); // console.log(response.headers); assert.equal(response.headers.connection, 'keep-alive'); assert.equal(response.headers['keep-alive'], 'timeout=2'); assert(parseInt(response.headers['x-requests-persocket'] as string) > 1); + console.log('before sleep stats: %o', httpClient.getDispatcherPoolStats()); + // { connected: 2, free: 1, pending: 0, queued: 0, running: 0, size: 0 } + assert.equal(httpClient.getDispatcherPoolStats()[origin].connected, 2); + assert.equal(httpClient.getDispatcherPoolStats()[origin].free, 1); await sleep(keepAliveTimeout); + console.log('after sleep stats: %o', httpClient.getDispatcherPoolStats()); + // { connected: 0, free: 0, pending: 0, queued: 0, running: 0, size: 0 } + assert.equal(httpClient.getDispatcherPoolStats()[origin].connected, 0); + assert.equal(httpClient.getDispatcherPoolStats()[origin].free, 0); + assert.equal(httpClient.getDispatcherPoolStats()[origin].size, 0); } catch (err) { if (err.message === 'other side closed') { console.log(err); From ba4bd435eae7df97d10dc9ea5e4c93595c34aaaf Mon Sep 17 00:00:00 2001 From: fengmk2 Date: Fri, 22 Dec 2023 00:18:23 +0800 Subject: [PATCH 2/3] f --- src/HttpClient.ts | 6 +++--- src/index.ts | 22 +++++++++++++--------- test/esm/index.js | 3 ++- test/index.test.ts | 15 ++++++++++++++- 4 files changed, 32 insertions(+), 14 deletions(-) diff --git a/src/HttpClient.ts b/src/HttpClient.ts index 6fd1f1b0..ac865067 100644 --- a/src/HttpClient.ts +++ b/src/HttpClient.ts @@ -24,7 +24,7 @@ import { getGlobalDispatcher, Pool, } from 'undici'; -import { kClients } from 'undici/lib/core/symbols.js'; +import undiciSymbols from 'undici/lib/core/symbols.js'; import { FormData as FormDataNode } from 'formdata-node'; import { FormDataEncoder } from 'form-data-encoder'; import createUserAgent from 'default-user-agent'; @@ -206,7 +206,7 @@ export class HttpClient extends EventEmitter { getDispatcherPoolStats() { const agent = this.getDispatcher(); // origin => Pool Instance - const clients: Map> = agent[kClients]; + const clients: Map> = agent[undiciSymbols.kClients]; const poolStatsMap: Record = {}; for (const [ key, ref ] of clients) { const pool = ref.deref(); @@ -228,7 +228,7 @@ export class HttpClient extends EventEmitter { return await this.#requestInternal(url, options); } - // alias to request, keep compatible with urlib@2 HttpClient.curl + // alias to request, keep compatible with urllib@2 HttpClient.curl async curl(url: RequestURL, options?: RequestOptions) { return await this.request(url, options); } diff --git a/src/index.ts b/src/index.ts index bec5316c..cc3cec4d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,24 +3,28 @@ import { HttpClient, HEADER_USER_AGENT } from './HttpClient.js'; import { RequestOptions, RequestURL } from './Request.js'; let httpClient: HttpClient; -const domainSocketHttpclients = new LRU(50); +const domainSocketHttpClients = new LRU(50); + +export function getDefaultHttpClient(): HttpClient { + if (!httpClient) { + httpClient = new HttpClient(); + } + return httpClient; +} export async function request(url: RequestURL, options?: RequestOptions) { if (options?.socketPath) { - let domainSocketHttpclient = domainSocketHttpclients.get(options.socketPath); + let domainSocketHttpclient = domainSocketHttpClients.get(options.socketPath); if (!domainSocketHttpclient) { domainSocketHttpclient = new HttpClient({ connect: { socketPath: options.socketPath }, }); - domainSocketHttpclients.set(options.socketPath, domainSocketHttpclient); + domainSocketHttpClients.set(options.socketPath, domainSocketHttpclient); } return await domainSocketHttpclient.request(url, options); } - if (!httpClient) { - httpClient = new HttpClient({}); - } - return await httpClient.request(url, options); + return await getDefaultHttpClient().request(url, options); } // export curl method is keep compatible with urllib.curl() @@ -36,12 +40,12 @@ export { MockAgent, ProxyAgent, Agent, Dispatcher, setGlobalDispatcher, getGlobalDispatcher, } from 'undici'; -// HttpClient2 is keep compatible with urlib@2 HttpClient2 +// HttpClient2 is keep compatible with urllib@2 HttpClient2 export { HttpClient, HttpClient as HttpClient2, HEADER_USER_AGENT as USER_AGENT, RequestDiagnosticsMessage, ResponseDiagnosticsMessage, } from './HttpClient.js'; -// RequestOptions2 is keep compatible with urlib@2 RequestOptions2 +// RequestOptions2 is keep compatible with urllib@2 RequestOptions2 export { RequestOptions, RequestOptions as RequestOptions2, RequestURL, HttpMethod, FixJSONCtlCharsHandler, FixJSONCtlChars, diff --git a/test/esm/index.js b/test/esm/index.js index 5a26442c..095c7100 100644 --- a/test/esm/index.js +++ b/test/esm/index.js @@ -1,13 +1,14 @@ import { strict as assert } from 'assert'; import * as urllibStar from 'urllib'; import urllib from 'urllib'; -import { request, HttpClient, USER_AGENT } from 'urllib'; +import { request, HttpClient, USER_AGENT, getDefaultHttpClient } from 'urllib'; console.log(urllibStar); console.log(urllibStar.request, urllibStar.HttpClient); console.log(urllibStar.request, urllibStar.HttpClient); console.log(urllibStar.USER_AGENT, urllib.USER_AGENT, USER_AGENT); console.log(request, HttpClient); +console.log('stats %o', getDefaultHttpClient().getDispatcherPoolStats()); assert(urllibStar); assert.equal(typeof urllibStar.request, 'function'); diff --git a/test/index.test.ts b/test/index.test.ts index 630313f3..b6a6c8ef 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -2,7 +2,7 @@ import { strict as assert } from 'node:assert'; import { parse as urlparse } from 'node:url'; import { readFileSync } from 'node:fs'; import { describe, it, beforeAll, afterAll, afterEach, beforeEach } from 'vitest'; -import urllib, { HttpClient } from '../src'; +import urllib, { HttpClient, getDefaultHttpClient } from '../src'; import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from '../src'; import { startServer } from './fixtures/server'; import { readableToBytes } from './utils'; @@ -20,6 +20,19 @@ describe('index.test.ts', () => { await close(); }); + describe('getDefaultHttpClient()', () => { + it('should work', async () => { + const response = await getDefaultHttpClient().request(`${_url}html`); + assert.equal(response.status, 200); + assert.equal(response.headers['content-type'], 'text/html'); + assert(response.headers.date); + assert.equal(response.url, `${_url}html`); + assert(!response.redirected); + assert.equal(getDefaultHttpClient(), getDefaultHttpClient()); + console.log('stats %o', getDefaultHttpClient().getDispatcherPoolStats()); + }); + }); + describe('urllib.request()', () => { it('should work', async () => { const response = await urllib.request(`${_url}html`); From 16e93b57b42814d65b55e609e5a54d503058afb9 Mon Sep 17 00:00:00 2001 From: fengmk2 Date: Fri, 22 Dec 2023 00:22:32 +0800 Subject: [PATCH 3/3] f --- test/keep-alive-header.test.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/keep-alive-header.test.ts b/test/keep-alive-header.test.ts index 3acee10f..00387cee 100644 --- a/test/keep-alive-header.test.ts +++ b/test/keep-alive-header.test.ts @@ -131,8 +131,10 @@ describe('keep-alive-header.test.ts', () => { await sleep(keepAliveTimeout); console.log('after sleep stats: %o', httpClient.getDispatcherPoolStats()); // { connected: 0, free: 0, pending: 0, queued: 0, running: 0, size: 0 } - assert.equal(httpClient.getDispatcherPoolStats()[origin].connected, 0); - assert.equal(httpClient.getDispatcherPoolStats()[origin].free, 0); + // { connected: 1, free: 1, pending: 0, queued: 0, running: 0, size: 0 } + // { connected: 2, free: 2, pending: 0, queued: 0, running: 0, size: 0 } + assert(httpClient.getDispatcherPoolStats()[origin].connected <= 2); + assert(httpClient.getDispatcherPoolStats()[origin].free <= 2); assert.equal(httpClient.getDispatcherPoolStats()[origin].size, 0); } catch (err) { if (err.message === 'other side closed') {