Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: export agent pool stats #481

Merged
merged 3 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions src/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import {
Dispatcher,
Agent,
getGlobalDispatcher,
Pool,
} from 'undici';
import undiciSymbols from 'undici/lib/core/symbols.js';
import { FormData as FormDataNode } from 'formdata-node';
import { FormDataEncoder } from 'form-data-encoder';
import createUserAgent from 'default-user-agent';
Expand Down Expand Up @@ -136,7 +138,7 @@ function defaultIsRetry(response: HttpClientResponse) {
return response.status >= 500;
}

type RequestContext = {
export type RequestContext = {
retries: number;
socketErrorRetries: number;
requestStartTime?: number;
Expand All @@ -157,6 +159,20 @@ export type ResponseDiagnosticsMessage = {
error?: Error;
};

export interface PoolStat {
/** Number of open socket connections in this pool. */
connected: number;
/** Number of open socket connections in this pool that do not have an active request. */
free: number;
/** Number of pending requests across all clients in this pool. */
pending: number;
/** Number of queued requests across all clients in this pool. */
queued: number;
/** Number of currently active requests across all clients in this pool. */
running: number;
/** Number of active, pending, or queued requests across all clients in this pool. */
size: number;
}

export class HttpClient extends EventEmitter {
#defaultArgs?: RequestOptions;
Expand Down Expand Up @@ -187,11 +203,32 @@ export class HttpClient extends EventEmitter {
this.#dispatcher = dispatcher;
}

getDispatcherPoolStats() {
const agent = this.getDispatcher();
// origin => Pool Instance
const clients: Map<string, WeakRef<Pool>> = agent[undiciSymbols.kClients];
const poolStatsMap: Record<string, PoolStat> = {};
for (const [ key, ref ] of clients) {
const pool = ref.deref();
const stats = pool?.stats;
if (!stats) continue;
poolStatsMap[key] = {
connected: stats.connected,
free: stats.free,
pending: stats.pending,
queued: stats.queued,
running: stats.running,
size: stats.size,
} satisfies PoolStat;
}
return poolStatsMap;
}

async request<T = any>(url: RequestURL, options?: RequestOptions) {
return await this.#requestInternal<T>(url, options);
}

// alias to request, keep compatible with urlib@2 HttpClient.curl
// alias to request, keep compatible with urllib@2 HttpClient.curl
async curl<T = any>(url: RequestURL, options?: RequestOptions) {
return await this.request<T>(url, options);
}
Expand Down
24 changes: 14 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,29 @@
import { HttpClient, HEADER_USER_AGENT } from './HttpClient.js';
import { RequestOptions, RequestURL } from './Request.js';

let httpclient: HttpClient;
const domainSocketHttpclients = new LRU(50);
let httpClient: HttpClient;
const domainSocketHttpClients = new LRU(50);

export function getDefaultHttpClient(): HttpClient {
if (!httpClient) {
httpClient = new HttpClient();
}
return httpClient;
}

export async function request<T = any>(url: RequestURL, options?: RequestOptions) {
if (options?.socketPath) {
let domainSocketHttpclient = domainSocketHttpclients.get<HttpClient>(options.socketPath);
let domainSocketHttpclient = domainSocketHttpClients.get<HttpClient>(options.socketPath);

Check warning on line 17 in src/index.ts

View check run for this annotation

Codecov / codecov/patch

src/index.ts#L17

Added line #L17 was not covered by tests
if (!domainSocketHttpclient) {
domainSocketHttpclient = new HttpClient({
connect: { socketPath: options.socketPath },
});
domainSocketHttpclients.set(options.socketPath, domainSocketHttpclient);
domainSocketHttpClients.set(options.socketPath, domainSocketHttpclient);

Check warning on line 22 in src/index.ts

View check run for this annotation

Codecov / codecov/patch

src/index.ts#L22

Added line #L22 was not covered by tests
}
return await domainSocketHttpclient.request<T>(url, options);
}

if (!httpclient) {
httpclient = new HttpClient({});
}
return await httpclient.request<T>(url, options);
return await getDefaultHttpClient().request<T>(url, options);
}

// export curl method is keep compatible with urllib.curl()
Expand All @@ -36,12 +40,12 @@
MockAgent, ProxyAgent, Agent, Dispatcher,
setGlobalDispatcher, getGlobalDispatcher,
} from 'undici';
// HttpClient2 is keep compatible with urlib@2 HttpClient2
// HttpClient2 is keep compatible with urllib@2 HttpClient2
export {
HttpClient, HttpClient as HttpClient2, HEADER_USER_AGENT as USER_AGENT,
RequestDiagnosticsMessage, ResponseDiagnosticsMessage,
} from './HttpClient.js';
// RequestOptions2 is keep compatible with urlib@2 RequestOptions2
// RequestOptions2 is keep compatible with urllib@2 RequestOptions2
export {
RequestOptions, RequestOptions as RequestOptions2, RequestURL, HttpMethod,
FixJSONCtlCharsHandler, FixJSONCtlChars,
Expand Down
3 changes: 2 additions & 1 deletion test/esm/index.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { strict as assert } from 'assert';
import * as urllibStar from 'urllib';
import urllib from 'urllib';
import { request, HttpClient, USER_AGENT } from 'urllib';
import { request, HttpClient, USER_AGENT, getDefaultHttpClient } from 'urllib';

console.log(urllibStar);
console.log(urllibStar.request, urllibStar.HttpClient);
console.log(urllibStar.request, urllibStar.HttpClient);
console.log(urllibStar.USER_AGENT, urllib.USER_AGENT, USER_AGENT);
console.log(request, HttpClient);
console.log('stats %o', getDefaultHttpClient().getDispatcherPoolStats());

assert(urllibStar);
assert.equal(typeof urllibStar.request, 'function');
Expand Down
15 changes: 14 additions & 1 deletion test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { strict as assert } from 'node:assert';
import { parse as urlparse } from 'node:url';
import { readFileSync } from 'node:fs';
import { describe, it, beforeAll, afterAll, afterEach, beforeEach } from 'vitest';
import urllib, { HttpClient } from '../src';
import urllib, { HttpClient, getDefaultHttpClient } from '../src';
import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from '../src';
import { startServer } from './fixtures/server';
import { readableToBytes } from './utils';
Expand All @@ -20,6 +20,19 @@ describe('index.test.ts', () => {
await close();
});

describe('getDefaultHttpClient()', () => {
it('should work', async () => {
const response = await getDefaultHttpClient().request(`${_url}html`);
assert.equal(response.status, 200);
assert.equal(response.headers['content-type'], 'text/html');
assert(response.headers.date);
assert.equal(response.url, `${_url}html`);
assert(!response.redirected);
assert.equal(getDefaultHttpClient(), getDefaultHttpClient());
console.log('stats %o', getDefaultHttpClient().getDispatcherPoolStats());
});
});

describe('urllib.request()', () => {
it('should work', async () => {
const response = await urllib.request(`${_url}html`);
Expand Down
58 changes: 39 additions & 19 deletions test/keep-alive-header.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { strict as assert } from 'node:assert';
import { describe, it, beforeAll, afterAll } from 'vitest';
import urllib from '../src';
import { HttpClient } from '../src';
import { startServer } from './fixtures/server';
import { sleep } from './utils';

describe('keep-alive-header.test.ts', () => {
// should shorter than server keepalive timeout
// https://zhuanlan.zhihu.com/p/34147188
const keepAliveTimeout = 2000;
const httpClient = new HttpClient();
let close: any;
let _url: string;
beforeAll(async () => {
Expand All @@ -25,97 +26,116 @@ describe('keep-alive-header.test.ts', () => {
const max = process.env.TEST_KEEPALIVE_COUNT ? parseInt(process.env.TEST_KEEPALIVE_COUNT) : 3;
let otherSideClosed = 0;
let readECONNRESET = 0;
const origin = _url.substring(0, _url.length - 1);
while (count < max) {
count++;
try {
let response = await urllib.request(_url);
const task = httpClient.request(_url);
console.log('after request stats: %o', httpClient.getDispatcherPoolStats());
assert.equal(httpClient.getDispatcherPoolStats()[origin].pending, 1);
assert.equal(httpClient.getDispatcherPoolStats()[origin].size, 1);
let response = await task;
console.log('after response stats: %o', httpClient.getDispatcherPoolStats());
assert.equal(httpClient.getDispatcherPoolStats()[origin].pending, 0);
assert.equal(httpClient.getDispatcherPoolStats()[origin].connected, 1);
// console.log(response.res.socket);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
assert(parseInt(response.headers['x-requests-persocket'] as string) > 1);
await sleep(keepAliveTimeout / 2);
response = await urllib.request(_url);
response = await httpClient.request(_url);
// console.log(response.res.socket);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
assert(parseInt(response.headers['x-requests-persocket'] as string) > 1);
console.log('before sleep stats: %o', httpClient.getDispatcherPoolStats());
// { connected: 2, free: 1, pending: 0, queued: 0, running: 0, size: 0 }
assert.equal(httpClient.getDispatcherPoolStats()[origin].connected, 2);
assert.equal(httpClient.getDispatcherPoolStats()[origin].free, 1);
await sleep(keepAliveTimeout);
console.log('after sleep stats: %o', httpClient.getDispatcherPoolStats());
// { connected: 0, free: 0, pending: 0, queued: 0, running: 0, size: 0 }
// { connected: 1, free: 1, pending: 0, queued: 0, running: 0, size: 0 }
// { connected: 2, free: 2, pending: 0, queued: 0, running: 0, size: 0 }
assert(httpClient.getDispatcherPoolStats()[origin].connected <= 2);
assert(httpClient.getDispatcherPoolStats()[origin].free <= 2);
assert.equal(httpClient.getDispatcherPoolStats()[origin].size, 0);
} catch (err) {
if (err.message === 'other side closed') {
console.log(err);
Expand Down
Loading