From 9c04cfa349998dba72117a05fe0c720416196fbc Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 31 Mar 2021 16:36:19 +0200 Subject: [PATCH] refactor: Unify Client, Pool & Agent (#620) * refactor: Client.clients & Agent is a Client. Refs: https://github.com/nodejs/undici/issues/616 * fixup: rename clientt to dispatcher * fixup: keep promise chain * fixup * fixup * fixup --- README.md | 104 ++++ docs/api/Agent.md | 122 ++--- docs/api/Client.md | 727 ++------------------------- docs/api/Dispatcher.md | 754 +++++++++++++++++++++++++++++ docs/api/Pool.md | 40 +- index.js | 59 ++- lib/agent.js | 227 +++++++++ lib/{core => }/client.js | 50 +- lib/dispatcher.js | 19 + lib/{agent => handler}/redirect.js | 6 - lib/{client-pool.js => pool.js} | 83 ++-- test/agent.js | 165 ++----- test/client-upgrade.js | 2 +- test/client.js | 11 +- test/content-length.js | 2 +- test/pool.js | 59 +-- test/redirect-request.js | 2 +- test/utils/esm-wrapper.mjs | 4 +- 18 files changed, 1395 insertions(+), 1041 deletions(-) create mode 100644 docs/api/Dispatcher.md create mode 100644 lib/agent.js rename lib/{core => }/client.js (97%) create mode 100644 lib/dispatcher.js rename lib/{agent => handler}/redirect.js (98%) rename lib/{client-pool.js => pool.js} (79%) diff --git a/README.md b/README.md index 767adc1d413..0acba5b0686 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,110 @@ for await (const data of body) { console.log('trailers', trailers) ``` +## `undici.request(url[, options]): Promise` + +Arguments: + +* **url** `string | URL | object` +* **options** [`RequestOptions`] + * **dispatcher** `Dispatcher` - Default: [getGlobalDispatcher] + * **method** `String` - Default: `GET` +* **maxRedirections** `Integer` - Default: `0` + +Returns a promise with the result of the `Dispatcher.request` method. + +`url` may contain pathname. `options` may not contain path. + +Calls `options.dispatcher.request(options)`. + +See [Dispatcher.request] for more details. + +## `undici.stream(url, options, factory): Promise` + +Arguments: + +* **url** `string | URL | object` +* **options** [`StreamOptions`] + * **dispatcher** `Dispatcher` - Default: [getGlobalDispatcher] + * **method** `String` - Default: `GET` +* **factory** `Dispatcher.stream.factory` + +Returns a promise with the result of the `Dispatcher.stream` method. + +`url` may contain pathname. `options` may not contain path. + +Calls `options.dispatcher.stream(options, factory)`. + +See [Dispatcher.stream](docs/api/Dispatcher.md#dispatcherstream) for more details. + +## `undici.pipeline(url, options, handler): Duplex` + +Arguments: + +* **url** `string | URL | object` +* **options** [`PipelineOptions`] + * **dispatcher** `Dispatcher` - Default: [getGlobalDispatcher] + * **method** `String` - Default: `GET` +* **handler** `Dispatcher.pipeline.handler` + +Returns: `stream.Duplex` + +`url` may contain pathname. `options` may not contain path. + +Calls `options.dispatch.pipeline(options, handler)`. + +See [Dispatcher.pipeline](docs/api/Dispatcher.md#dispatcherpipeline) for more details. + +### `undici.connect(options[, callback])` + +Starts two-way communications with the requested resource using [HTTP CONNECT](https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/CONNECT). + +Arguments: + +* **options** [`ConnectOptions`] + * **dispatcher** `Dispatcher` - Default: [getGlobalDispatcher] + * **method** `String` - Default: `GET` +* **callback** `(err: Error | null, data: ConnectData | null) => void` (optional) + +Returns a promise with the result of the `Dispatcher.connect` method. + +`url` may contain pathname. `options` may not contain path. + +Calls `options.dispatch.connect(options)`. + +See [Dispatcher.connect](docs/api/Dispatcher.md#dispatcherconnect) for more details. + +### `undici.upgrade(options[, callback])` + +Upgrade to a different protocol. See [MDN - HTTP - Protocol upgrade mechanism](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism) for more details. + +Arguments: + +* **options** [`UpgradeOptions`] + * **dispatcher** `Dispatcher` - Default: [getGlobalDispatcher] + * **method** `String` - Default: `GET` +* **callback** `(error: Error | null, data: UpgradeData) => void` (optional) + +Returns a promise with the result of the `Dispatcher.upgrade` method. + +`url` may contain pathname. `options` may not contain path. + +Calls `options.dispatcher.upgrade(options)`. + +See [Dispatcher.upgrade](docs/api/Dispatcher.md#clientpipelining) for more details. + +## `undici.setGlobalDispatcher(dispatcher)` + +* dispatcher `Dispatcher` + +Sets the global dispatcher used by global API methods. + +## `undici.getGlobalDispatcher()` + +Gets the global dispatcher used by global API methods. + +Returns: `Dispatcher` + ## Specification Compliance This section documents parts of the HTTP/1.1 specification which Undici does diff --git a/docs/api/Agent.md b/docs/api/Agent.md index 00f158a9abd..027085cafea 100644 --- a/docs/api/Agent.md +++ b/docs/api/Agent.md @@ -1,119 +1,95 @@ # Agent -## `new undici.Agent(opts)` +Extends: `undici.Dispatcher` -Arguments: +Agent allow dispatching requests against multiple different origins. -* **factory** - Default: `(origin, opts) => new Pool(origin, opts)` -* // TODO: document rest opts? +Requests are not guaranteed to be dispatched in order of invocation. -Returns: `Agent` +## `new undici.Agent([options])` -Returns a new Agent instance used for dispatching requests. +Arguments: -### `Agent.get(origin)` +* **options** `AgentOptions` (optional) -* origin `string` - A origin to be retrieved from the Agent. +Returns: `Agent` -This method retrieves Client instances from the Agent. If the client does not exist it is automatically added by calling -the `factory` method passed through the `Agent` constructor. +### Parameter: `AgentOptions` -### `Agent.dispatch(options, handlers)` +Extends: [`ClientOptions`](docs/api/Pool.md#parameter-pooloptions) -Dispatches a request. +* **factory** `(origin: URL, opts: Object) => Dispatcher` - Default: `(origin, opts) => new Pool(origin, opts)` +* **maxRedirections** `Integer` - Default: `0`. -This API is expected to evolve through semver-major versions and is less stable than the preceding higher level APIs. It is primarily intended for library developers who implement higher level APIs on top of this. +## Instance Properties -Arguments: +### `Agent.closed` -* **options** `DispatchOptions` -* **handlers** `DispatchHandlers` +Implements [Client.closed](docs/api/Client.md#clientclosed) -Returns: `void` +### `Agent.connected` -#### Parameter: `DispatchOptions` +Implements [Client.connected](docs/api/Client.md#clientconnected) -* **origin** `string | URL` -* **path** `string` -* **method** `string` -* **body** `string | Buffer | Uint8Array | stream.Readable | null` (optional) - Default: `null` -* **headers** `UndiciHeaders` (optional) - Default: `null` -* **idempotent** `boolean` (optional) - Default: `true` if `method` is `'HEAD'` or `'GET'` - Whether the requests can be safely retried or not. If `false` the request won't be sent until all preceeding requests in the pipeline has completed. -* **upgrade** `string | null` (optional) - Default: `method === 'CONNECT' || null` - Upgrade the request. Should be used to specify the kind of upgrade i.e. `'Websocket'`. +### `Agent.destroyed` -#### Parameter: `DispatchHandlers` +Implements [Client.destroyed](docs/api/Client.md#clientdestroyed) -* **onConnect** `(abort: () => void) => void` - Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. -* **onError** `(error: Error) => void` - Invoked when an error has occurred. -* **onUpgrade** `(statusCode: number, headers: string[] | null, socket: Duplex) => void` (optional) - Invoked when request is upgraded. Required if `DispatchOptions.upgrade` is defined or `DispatchOptions.method === 'CONNECT'`. -* **onHeaders** `(statusCode: number, headers: string[] | null, resume: () => void) => boolean` - Invoked when statusCode and headers have been received. May be invoked multiple times due to 1xx informational headers. Not required for `upgrade` requests. -* **onData** `(chunk: Buffer) => boolean` - Invoked when response payload data is received. Not required for `upgrade` requests. -* **onComplete** `(trailers: string[] | null) => void` - Invoked when response payload and trailers have been received and the request has completed. Not required for `upgrade` requests. +### `Agent.pending` -## `agent.close(): Promise` +Implements [Client.pending](docs/api/Client.md#clientpending) -Returns a `Promise.all` operation closing all of the pool instances in the Agent instance. This calls `pool.close` under the hood. +### `Agent.running` -## `agent.destroy(): Promise` +Implements [Client.running](docs/api/Client.md#clientrunning) -Returns a `Promise.all` operation destroying all of the pool instances in the Agent instance. This calls `pool.destroy` under the hood. +### `Agent.size` -## `undici.setGlobalAgent(agent)` +Implements [Client.size](docs/api/Client.md#clientsize) -* agent `Agent` +## Instance Methods -Sets the global agent used by `request`, `pipeline`, and `stream` methods. -The default global agent creates `undici.Pool`s with no max number of -connections. +### `Agent.close([callback])` -The agent must only **implement** the `Agent` API; not necessary extend from it. +Implements [`Dispatcher.close([callback])`](docs/api/Dispatcher.md#clientclose-callback-). -## `undici.getGlobalAgent(agent)` +### `Agent.destroy([error, callback])` -TODO: document +Implements [`Dispatcher.destroy([error, callback])`](docs/api/Dispatcher.md#dispatcher-callback-). -## `undici.request(url[, opts]): Promise` +### `Agent.dispatch(options, handlers: AgentDispatchOptions)` -* url `string | URL | object` -* opts `{ agent: Agent } & client.request.opts` -* // TODO: document maxRedirections? +Implements [`Dispatcher.dispatch(options, handlers)`](docs/api/Dispatcher.md#clientdispatchoptions-handlers). -`url` may contain path. `opts` may not contain path. `opts.method` is `GET` by default. -Calls `pool.request(opts)` on the pool returned from either the globalAgent (see [setGlobalAgent](#undicisetglobalagentagent)) or the agent passed to the `opts` argument. +#### Parameter: `AgentDispatchOptions` -Returns a promise with the result of the `request` method. +Extends: [`DispatchOptions``](docs/api/Dispatcher.md#parameter-dispatchoptions) -## `undici.stream(url, opts, factory): Promise` +* **origin** `string | URL` +* **maxRedirections** `Integer`. -* url `string | URL | object` -* opts `{ agent: Agent } & client.stream.opts` -* factory `client.stream.factory` -* // TODO: document maxRedirections? +Implements [`Dispatcher.destroy([error, callback])`](docs/api/Dispatcher.md#dispatcher-callback-). -`url` may contain path. `opts` may not contain path. -See [client.stream](docs/api/Client.md#clientstreamoptions-factory--callback) for details on the `opts` and `factory` arguments. -Calls `pool.stream(opts, factory)` on the pool returned from either the globalAgent (see [setGlobalAgent](#undicisetglobalagentagent)) or the agent passed to the `opts` argument. -Result is returned in the factory function. See [client.stream](docs/api/Client.md#clientstreamoptions-factory--callback) for more details. +### `Agent.connect(options[, callback])` -## `undici.pipeline(url, opts, handler): Duplex` +See [`Dispatcher.connect(options[, callback])`](docs/api/Dispatcher.md#clientconnectoptions--callback). + +### `Agent.dispatch(options, handlers)` -* url `string | URL | object` -* opts `{ agent: Agent } & client.pipeline.opts` -* handler `client.pipeline.handler` -* // TODO: document maxRedirections? +Implements [`Dispatcher.dispatch(options, handlers)`](docs/api/Dispatcher.md#clientdispatchoptions-handlers). -`url` may contain path. `opts` may not contain path. +### `Agent.pipeline(options, handler)` -See [client.pipeline](docs/api/Client.md#clientpipelining) for details on the `opts` and `handler` arguments. +See [`Dispatcher.pipeline(options, handler)`](docs/api/Dispatcher.md#clientpipelineoptions-handler). -Calls `pool.pipeline(opts, factory)` on the pool returned from either the globalAgent (see [setGlobalAgent](#undicisetglobalagentagent)) or the agent passed to the `opts` argument. +### `Agent.request(options[, callback])` -See [client.pipeline](docs/api/Client.md#clientpipelining) for more details. +See [`Dispatcher.request(options [, callback])`](docs/api/Dispatcher.md#clientrequestoptions--callback). -### `undici.connect(options[, callback])` +### `Agent.stream(options, factory[, callback])` -TODO: document +See [`Dispatcher.stream(options, factory[, callback])`](docs/api/Dispatcher.md#clientstreamoptions-factory--callback). -### `undici.upgrade(options[, callback])` +### `Agent.upgrade(options[, callback])` -TODO: document +See [`Dispatcher.upgrade(options[, callback])`](docs/api/Dispatcher.md#clientupgradeoptions-callback). diff --git a/docs/api/Client.md b/docs/api/Client.md index eb09bc2c248..9b605c12531 100644 --- a/docs/api/Client.md +++ b/docs/api/Client.md @@ -1,16 +1,16 @@ # Class: Client -Extends: `events.EventEmitter` +Extends: `undici.Dispatcher` A basic HTTP/1.1 client, mapped on top a single TCP/TLS connection. Pipelining is disabled by default. -Imports: `http`, `stream`, `events` +Requests are not guaranteed to be dispatched in order of invocation. ## `new Client(url[, options])` Arguments: -* **url** `URL | string` - It should only include the **protocol, hostname, and port**. +* **url** `URL | string` - Should only include the **protocol, hostname, and port**. * **options** `ClientOptions` (optional) Returns: `Client` @@ -43,718 +43,39 @@ const client = new Client('http://localhost:3000') ### `Client.close([callback])` -Closes the client and gracefully waits for enqueued requests to complete before resolving. +Implements [`Dispatcher.close([callback])`](docs/api/Dispatcher.md#clientclose-callback-). -Arguments: - -* **callback** `(error: Error | null, data: null) => void` (optional) - -Returns: `void | Promise` - Only returns a `Promise` if no `callback` argument was passed - -#### Example - Request resolves before Client closes - -```js -'use strict' -const { createServer } = require('http') -const { Client } = require('undici') - -const server = createServer((request, response) => { - response.end('undici') -}) +### `Client.destroy([error, callback])` -server.listen(() => { - const client = new Client(`http://localhost:${server.address().port}`) +Implements [`Dispatcher.destroy([error, callback])`](docs/api/Dispatcher.md#dispatcher-callback-). - const request = client.request({ - path: '/', - method: 'GET' - }) +Waits until socket is closed before invoking the callback (or returning a promise if no callback is provided). - client.close() - .then(() => { - // This waits for the previous request to complete - console.log('Client closed') - server.close() - }) - - request.then(({ body }) => { - body.setEncoding('utf8') - body.on('data', console.log) // This logs before 'Client closed' - }) -}) -``` +Implements [`Dispatcher.destroy([error, callback])`](docs/api/Dispatcher.md#dispatcher-callback-). ### `Client.connect(options[, callback])` -Starts two-way communications with the requested resource using [HTTP CONNECT](https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/CONNECT). - -Arguments: - -* **options** `ConnectOptions` -* **callback** `(err: Error | null, data: ConnectData | null) => void` (optional) - -Returns: `void | Promise` - Only returns a `Promise` if no `callback` argument was passed - -#### Parameter: `ConnectOptions` - -* **path** `string` -* **headers** `UndiciHeaders` (optional) - Default: `null` -* **signal** `AbortSignal | events.EventEmitter | null` (optional) - Default: `null` -* **opaque** `unknown` (optional) - This argument parameter is passed through to `ConnectData` - -#### Parameter: `ConnectData` - -* **statusCode** `number` -* **headers** `http.IncomingHttpHeaders` -* **socket** `stream.Duplex` -* **opaque** `unknown` - -#### Example - Connect request with echo - -```js -'use strict' -const { createServer } = require('http') -const { Client } = require('undici') - -const server = createServer((request, response) => { - throw Error('should never get here') -}) - -server.on('connect', (req, socket, head) => { - socket.write('HTTP/1.1 200 Connection established\r\n\r\n') - - let data = head.toString() - socket.on('data', (buf) => { - data += buf.toString() - }) - - socket.on('end', () => { - socket.end(data) - }) -}) - -server.listen(() => { - const client = new Client(`http://localhost:${server.address().port}`) - - client - .connect({ path: '/' }) - .then(({ socket }) => { - const wanted = 'Body' - let data = '' - socket.on('data', d => { data += d }) - socket.on('end', () => { - console.log(`Data received: ${data.toString()} | Data wanted: ${wanted}`) - client.close() - server.close() - }) - socket.write(wanted) - socket.end() - }) -}) -``` - -### `Client.destroy([error][, callback])` - -Destroy the client abruptly with the given error. All the pending and running requests will be asynchronously aborted and error. Waits until socket is closed before invoking the callback (or returning a promise if no callback is provided). Since this operation is asynchronously dispatched there might still be some progress on dispatched requests. - -Both arguments are optional; the method can be called in four different ways: - -```js -client.destroy() // -> Promise -client.destroy(new Error()) // -> Promise -client.destroy(() => {}) // -> void -client.destroy(new Error(), () => {}) // -> void -``` - -Arguments: - -* **error** `Error | null` (optional) -* **callback** `() => void` (optional) - -Returns: `void | Promise` - Only returns a `Promise` if no `callback` argument was passed - -#### Example - Request is aborted when Client is destroyed - -```js -'use strict' -const { createServer } = require('http') -const { Client } = require('undici') - -const server = createServer((request, response) => { - response.end('undici') -}) - -server.listen(() => { - const client = new Client(`http://localhost:${server.address().port}`) - - const request = client.request({ - path: '/', - method: 'GET' - }) - - client.destroy() - .then(() => { - // Still waits for requests to complete - console.log('Client destroyed') - server.close() - }) - - // The request promise will reject with an Undici Client Destroyed error - request.catch(error => { - console.error(error) - }) -}) -``` +See [`Dispatcher.connect(options[, callback])`](docs/api/Dispatcher.md#clientconnectoptions--callback). ### `Client.dispatch(options, handlers)` -This is the low level API which all the preceding APIs are implemented on top of. - -This API is expected to evolve through semver-major versions and is less stable than the preceding higher level APIs. It is primarily intended for library developers who implement higher level APIs on top of this. - -Arguments: - -* **options** `DispatchOptions` -* **handlers** `DispatchHandlers` - -Returns: `void` - -#### Parameter: `DispatchOptions` - -* **path** `string` -* **method** `string` -* **body** `string | Buffer | Uint8Array | stream.Readable | null` (optional) - Default: `null` -* **headers** `UndiciHeaders` (optional) - Default: `null` -* **idempotent** `boolean` (optional) - Default: `true` if `method` is `'HEAD'` or `'GET'` - Whether the requests can be safely retried or not. If `false` the request won't be sent until all preceding requests in the pipeline has completed. -* **upgrade** `string | null` (optional) - Default: `method === 'CONNECT' || null` - Upgrade the request. Should be used to specify the kind of upgrade i.e. `'Websocket'`. - -#### Parameter: `DispatchHandlers` - -* **onConnect** `(abort: () => void) => void` - Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. -* **onError** `(error: Error) => void` - Invoked when an error has occurred. -* **onUpgrade** `(statusCode: number, headers: string[] | null, socket: Duplex) => void` (optional) - Invoked when request is upgraded. Required if `DispatchOptions.upgrade` is defined or `DispatchOptions.method === 'CONNECT'`. -* **onHeaders** `(statusCode: number, headers: string[] | null, resume: () => void) => boolean` - Invoked when statusCode and headers have been received. May be invoked multiple times due to 1xx informational headers. Not required for `upgrade` requests. -* **onData** `(chunk: Buffer) => boolean` - Invoked when response payload data is received. Not required for `upgrade` requests. -* **onComplete** `(trailers: string[] | null) => void` - Invoked when response payload and trailers have been received and the request has completed. Not required for `upgrade` requests. - -#### Example 1 - Dispatch GET request - -```js -'use strict' -const { createServer } = require('http') -const { Client } = require('undici') - -const server = createServer((request, response) => { - response.end('Hello, World!') -}) -server.listen(() => { - const client = new Client(`http://localhost:${server.address().port}`) - - const data = [] - - client.dispatch({ - path: '/', - method: 'GET', - headers: { - 'x-foo': 'bar' - } - }, { - onConnect: () => { - console.log('Connected!') - }, - onError: (error) => { - console.error(error) - }, - onHeaders: (statusCode, headers) => { - console.log(`onHeaders | statusCode: ${statusCode} | headers: ${headers}`) - }, - onData: (chunk) => { - console.log('onData : chunk received') - data.push(chunk) - }, - onComplete: (trailers) => { - console.log(`onComplete | trailers: ${trailers}`) - const res = Buffer.concat(data).toString('utf8') - console.log(`Data: ${res}`) - client.close() - server.close() - } - }) -}) -``` - -#### Example 2 - Dispatch Upgrade Request - -```js -'use strict' -const { createServer } = require('http') -const { Client } = require('undici') - -const server = createServer((request, response) => { - response.end() -}) - -server.on('upgrade', (request, socket, head) => { - console.log('Node.js Server - upgrade event') - socket.write('HTTP/1.1 101 Web Socket Protocol Handshake\r\n') - socket.write('Upgrade: WebSocket\r\n') - socket.write('Connection: Upgrade\r\n') - socket.write('\r\n') - socket.end() -}) - -server.listen(() => { - const client = new Client(`http://localhost:${server.address().port}`) - - client.dispatch({ - path: '/', - method: 'GET', - upgrade: 'websocket' - }, { - onConnect: () => { - console.log('Undici Client - onConnect') - }, - onError: (error) => { - console.log('onError') // shouldn't print - }, - onUpgrade: (statusCode, headers, socket) => { - console.log('Undici Client - onUpgrade') - console.log(`onUpgrade Headers: ${headers}`) - socket.on('data', buffer => { - console.log(buffer.toString('utf8')) - }) - socket.on('end', () => { - client.close() - server.close() - }) - socket.end() - } - }) -}) -``` +Implements [`Dispatcher.dispatch(options, handlers)`](docs/api/Dispatcher.md#clientdispatchoptions-handlers). ### `Client.pipeline(options, handler)` -For easy use with [stream.pipeline](https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback). The `handler` argument should return a `Readable` from which the result will be read. Usually it should just return the `body` argument unless some kind of transformation needs to be performed based on e.g. `headers` or `statusCode`. The `handler` should validate the response and save any required state. If there is an error, it should be thrown. The function returns a `Duplex` which writes to the request and reads from the response. - -Arguments: - -* **options** `PipelineOptions` -* **handler** `(data: PipelineHandlerData) => stream.Readable` - -Returns: `stream.Duplex` - -#### Parameter: PipelineOptions - -Extends: [`RequestOptions`](#parameter-requestoptions) - -* **objectMode** `boolean` (optional) - Default: `false` - Set to `true` if the `handler` will return an object stream. - -#### Parameter: PipelineHandlerData - -* **statusCode** `number` -* **headers** `IncomingHttpHeaders` -* **opaque** `unknown` -* **body** `stream.Readable` - -#### Example 1 - Pipeline Echo - -```js -'use strict' -const { Readable, Writable, PassThrough, pipeline } = require('stream') -const { createServer } = require('http') -const { Client } = require('undici') - - -const server = createServer((request, response) => { - request.pipe(response) -}) - -server.listen(() => { - const client = new Client(`http://localhost:${server.address().port}`) - - let res = '' - - pipeline( - new Readable({ - read () { - this.push(Buffer.from('undici')) - this.push(null) - } - }), - client.pipeline({ - path: '/', - method: 'GET' - }, ({ statusCode, headers, body }) => { - console.log(`response received ${statusCode}`) - console.log('headers', headers) - return pipeline(body, new PassThrough(), () => {}) - }), - new Writable({ - write (chunk, _, callback) { - res += chunk.toString() - callback() - }, - final (callback) { - console.log(`Response pipelined to writable: ${res}`) - callback() - } - }), - error => { - if (error) { - console.error(error) - } - - client.close() - server.close() - } - ) -}) -``` +See [`Dispatcher.pipeline(options, handler)`](docs/api/Dispatcher.md#clientpipelineoptions-handler). ### `Client.request(options[, callback])` -Performs a HTTP request. - -Non-idempotent requests will not be pipelined in order -to avoid indirect failures. - -Idempotent requests will be automatically retried if -they fail due to indirect failure from the request -at the head of the pipeline. This does not apply to -idempotent requests with a stream request body. - -Arguments: - -* **options** `RequestOptions` -* **callback** `(error: Error | null, data: ResponseData) => void` (optional) - -Returns: `void | Promise` - Only returns a `Promise` if no `callback` argument was passed - -#### Parameter: `RequestOptions` - -Extends: [`DispatchOptions`](#parameter-dispatchoptions) - -* **opaque** `unknown` (optional) - Default: `null` - Used for passing through context to `ResponseData` -* **signal** `AbortSignal | events.EventEmitter | null` (optional) - Default: `null` - -The `RequestOptions.method` property should not be value `'CONNECT'`. - -#### Parameter: `ResponseData` - -* **statusCode** `number` -* **headers** `http.IncomingHttpHeaders` -* **body** `stream.Readable` -* **trailers** `Record` - This object starts out - as empty and will be mutated to contain trailers after `body` has emitted `'end'`. -* **opaque** `unknown` - -#### Example 1 - Basic GET Request - -```js -'use strict' -const { createServer } = require('http') -const { Client } = require('undici') - -const server = createServer((request, response) => { - response.end('Hello, World!') -}) - -server.listen(() => { - const client = new Client(`http://localhost:${server.address().port}`) - - client.request({ - path: '/', - method: 'GET' - }).then(({ body, headers, statusCode, trailers }) => { - console.log(`response received ${statusCode}`) - console.log('headers', headers) - body.setEncoding('utf8') - body.on('data', console.log) - body.on('end', () => { - console.log('trailers', trailers) - }) - - client.close() - server.close() - }).catch(error => { - console.error(error) - }) -}) -``` - -#### Example 2 - Aborting a request - -> Node.js v15+ is required to run this example - -```js -'use strict' -const { createServer } = require('http') -const { Client } = require('undici') - -const server = createServer((request, response) => { - response.end('Hello, World!') -}) - -server.listen(() => { - const client = new Client(`http://localhost:${server.address().port}`) - const abortController = new AbortController() - - client.request({ - path: '/', - method: 'GET', - signal: abortController.signal - }).catch(error => { - console.error(error) // should print an RequestAbortedError - client.close() - server.close() - }) - - abortController.abort() - -}) -``` - -Alternatively, any `EventEmitter` that emits an `'abort'` event may be used as an abort controller: - -```js -'use strict' -const EventEmitter = require('events') -const { createServer } = require('http') -const { Client } = require('undici') - -const server = createServer((request, response) => { - response.end('Hello, World!') -}) - -server.listen(() => { - const client = new Client(`http://localhost:${server.address().port}`) - const ee = new EventEmitter() - - client.request({ - path: '/', - method: 'GET', - signal: ee - }).catch(error => { - console.error(error) // should print an RequestAbortedError - client.close() - server.close() - }) - - ee.emit('abort') -}) -``` - -Destroying the request or response body will have the same effect. - -```js -'use strict' -const { createServer } = require('http') -const { Client } = require('undici') - -const server = createServer((request, response) => { - response.end('Hello, World!') -}) - -server.listen(() => { - const client = new Client(`http://localhost:${server.address().port}`) - - client.request({ - path: '/', - method: 'GET', - }).then(({ body }) => { - body.destroy() - }).catch(error => { - console.error(error) // should print an RequestAbortedError - client.close() - server.close() - }) -}) -``` +See [`Dispatcher.request(options [, callback])`](docs/api/Dispatcher.md#clientrequestoptions--callback). ### `Client.stream(options, factory[, callback])` -A faster version of `Client.request`. This method expects the second argument `factory` to return a [`stream.Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable) stream which the response will be written to. This improves performance by avoiding creating an intermediate [`stream.Readable`](https://nodejs.org/api/stream.html#stream_readable_streams) stream when the user expects to directly pipe the response body to a [`stream.Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable) stream. - -As demonstrated in [Example 1 - Basic GET stream request](#example-1---basic-get-stream-request), it is recommended to use the `option.opaque` property to avoid creating a closure for the `factory` method. This pattern works well with Node.js Web Frameworks such as [Fastify](https://fastify.io). See [Example 2 - Stream to Fastify Response](#example-2---stream-to-fastify-response) for more details. - -Arguments: - -* **options** `RequestOptions` -* **factory** `(data: StreamFactoryData) => stream.Writable` -* **callback** `(error: Error | null, data: StreamData) => void` (optional) - -Returns: `void | Promise` - Only returns a `Promise` if no `callback` argument was passed - -#### Parameter: `StreamFactoryData` - -* **statusCode** `number` -* **headers** `http.IncomingHttpHeaders` -* **opaque** `unknown` - -#### Parameter: `StreamData` - -* **opaque** `unknown` -* **trailers** `Record` - -#### Example 1 - Basic GET stream request - -```js -'use strict' -const { createServer } = require('http') -const { Client } = require('undici') -const { Writable } = require('stream') - -const server = createServer((request, response) => { - response.end('Hello, World!') -}) - -server.listen(() => { - const client = new Client(`http://localhost:${server.address().port}`) - - const bufs = [] - - client.stream({ - path: '/', - method: 'GET', - opaque: { bufs } - }, ({ statusCode, headers, opaque: { bufs } }) => { - console.log(`response received ${statusCode}`) - console.log('headers', headers) - return new Writable({ - write (chunk, encoding, callback) { - bufs.push(chunk) - callback() - } - }) - }).then(({ opaque: { bufs } }) => { - console.log(Buffer.concat(bufs).toString('utf-8')) - - client.close() - server.close() - }).catch(error => { - console.error(error) - }) -}) -``` - -#### Example 2 - Stream to Fastify Response - -In this example, a (fake) request is made to the fastify server using `fastify.inject()`. This request then executes the fastify route handler which makes a subsequent request to the raw Node.js http server using `undici.client.stream()`. The fastify response is passed to the `opaque` option so that undici can tap into the underlying writable stream using `response.raw`. This methodology demonstrates how one could use undici and fastify together to create fast-as-possible requests from one backend server to another. - -```js -'use strict' - -const { createServer } = require('http') -const undici = require('undici') -const fastify = require('fastify') - -const nodeServer = createServer((request, response) => { - response.end('Hello, World! From Node.js HTTP Server') -}) - -nodeServer.listen(() => { - console.log('Node Server listening') - - const nodeServerUndiciClient = new undici.Client(`http://localhost:${nodeServer.address().port}`) - - const fastifyServer = fastify() - - fastifyServer.route({ - url: '/', - method: 'GET', - handler: (request, response) => { - nodeServerUndiciClient.stream({ - path: '/', - method: 'GET', - opaque: response - }, ({ opaque }) => opaque.raw) - } - }) - - fastifyServer - .listen() - .then(() => { - console.log('Fastify Server listening') - const fastifyServerUndiciClient = new undici.Client(`http://localhost:${fastifyServer.server.address().port}`) - - fastifyServerUndiciClient.request({ - path: '/', - method: 'GET' - }).then(({ statusCode, body }) => { - console.log(`response received ${statusCode}`) - body.setEncoding('utf8') - body.on('data', console.log) - - nodeServerUndiciClient.close() - fastifyServerUndiciClient.close() - fastifyServer.close() - nodeServer.close() - }) - }) -}) -``` +See [`Dispatcher.stream(options, factory[, callback])`](docs/api/Dispatcher.md#clientstreamoptions-factory--callback). ### `Client.upgrade(options[, callback])` -Upgrade the client to a different protocol. Visit [MDN - HTTP - Protocol upgrade mechanism](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism) for more details. - -Arguments: - -* **options** `UpgradeOptions` - -* **callback** `(error: Error | null, data: UpgradeData) => void` (optional) - -Returns: `void | Promise` - Only returns a `Promise` if no `callback` argument was passed - -#### Parameter: `UpgradeOptions` - -* **path** `string` -* **method** `string` (optional) - Default: `'GET'` -* **headers** `UndiciHeaders` (optional) - Default: `null` -* **protocol** `string` (optional) - Default: `'Websocket'` - A string of comma separated protocols, in descending preference order. -* **signal** `AbortSignal | EventEmitter | null` (optional) - Default: `null` - -#### Parameter: `UpgradeData` - -* **headers** `http.IncomingHeaders` -* **socket** `stream.Duplex` -* **opaque** `unknown` - -#### Example 1 - Basic Upgrade Request - -```js -'use strict' -const { Client } = require('undici') -const { createServer } = require('http') - -const server = createServer((request, response) => { - response.statusCode = 101 - response.setHeader('connection', 'upgrade') - response.setHeader('upgrade', request.headers.upgrade) - response.end() -}) - -server.listen(() => { - const client = new Client(`http://localhost:${server.address().port}`) - - client - .upgrade({ path: '/' }) - .then(({ headers, socket }) => { - socket.on('end', () => { - console.log(`upgrade: ${headers.upgrade}`) - client.close() - server.close() - }) - socket.end() - }) - .catch(error => { - console.error(error) - client.close() - server.close() - }) -}) -``` +See [`Dispatcher.upgrade(options[, callback])`](docs/api/Dispatcher.md#clientupgradeoptions-callback). ## Instance Properties @@ -816,9 +137,12 @@ The URL of the Client instance. ### Event: `'connect'` +See [Dispatcher Event: `'connect'`](docs/api/Dispatcher.md#event-connect). + Parameters: -* **client** `Client` +* **origin** `URL` +* **targets** `Array` Emitted when a socket has been created and connected. The client will connect once `client.size > 0`. @@ -836,8 +160,8 @@ const server = createServer((request, response) => { server.listen(() => { const client = new Client(`http://localhost:${server.address().port}`) - client.on('connect', client => { - console.log(`Connected to ${client.url}`) // should print before the request body statement + client.on('connect', (origin) => { + console.log(`Connected to ${origin}`) // should print before the request body statement }) client.request({ @@ -858,9 +182,12 @@ server.listen(() => { ### Event: `'disconnect'` +See [Dispatcher Event: `'disconnect'`](docs/api/Dispatcher.md#event-disconnect). + Parameters: -* **client** `Client` +* **origin** `URL` +* **targets** `Array` * **error** `Error` Emitted when socket has disconnected. The error argument of the event is the error which caused the socket to disconnect. The client will reconnect if or once `client.size > 0`. @@ -879,8 +206,8 @@ const server = createServer((request, response) => { server.listen(() => { const client = new Client(`http://localhost:${server.address().port}`) - client.on('disconnect', client => { - console.log(`Disconnected from ${client.url}`) // should print before the SocketError + client.on('disconnect', (origin) => { + console.log(`Disconnected from ${origin}`) // should print before the SocketError }) client.request({ @@ -898,6 +225,8 @@ server.listen(() => { Emitted when pipeline is no longer [`busy`](#clientbusy). +See [Dispatcher Event: `'drain'`](docs/api/Dispatcher.md#event-drain). + #### Example - Client drain event ```js diff --git a/docs/api/Dispatcher.md b/docs/api/Dispatcher.md new file mode 100644 index 00000000000..9301abff15a --- /dev/null +++ b/docs/api/Dispatcher.md @@ -0,0 +1,754 @@ +# Dispatcher + +Extends: `events.EventEmitter` + +Dispatcher is the core API used to dispatch requests. + +Requests are not guaranteed to be dispatched in order of invocation. + +### `Dispatcher.dispatch(options, handler)` + +#### Example 1 - Dispatch GET request + +```js +'use strict' +const { createServer } = require('http') +const { Client } = require('undici') + +const server = createServer((request, response) => { + response.end('Hello, World!') +}) +server.listen(() => { + const client = new Client(`http://localhost:${server.address().port}`) + + const data = [] + + client.dispatch({ + path: '/', + method: 'GET', + headers: { + 'x-foo': 'bar' + } + }, { + onConnect: () => { + console.log('Connected!') + }, + onError: (error) => { + console.error(error) + }, + onHeaders: (statusCode, headers) => { + console.log(`onHeaders | statusCode: ${statusCode} | headers: ${headers}`) + }, + onData: (chunk) => { + console.log('onData : chunk received') + data.push(chunk) + }, + onComplete: (trailers) => { + console.log(`onComplete | trailers: ${trailers}`) + const res = Buffer.concat(data).toString('utf8') + console.log(`Data: ${res}`) + client.close() + server.close() + } + }) +}) +``` + +#### Example 2 - Dispatch Upgrade Request + +```js +'use strict' +const { createServer } = require('http') +const { Client } = require('undici') + +const server = createServer((request, response) => { + response.end() +}) + +server.on('upgrade', (request, socket, head) => { + console.log('Node.js Server - upgrade event') + socket.write('HTTP/1.1 101 Web Socket Protocol Handshake\r\n') + socket.write('Upgrade: WebSocket\r\n') + socket.write('Connection: Upgrade\r\n') + socket.write('\r\n') + socket.end() +}) + +server.listen(() => { + const client = new Client(`http://localhost:${server.address().port}`) + + client.dispatch({ + path: '/', + method: 'GET', + upgrade: 'websocket' + }, { + onConnect: () => { + console.log('Undici Client - onConnect') + }, + onError: (error) => { + console.log('onError') // shouldn't print + }, + onUpgrade: (statusCode, headers, socket) => { + console.log('Undici Client - onUpgrade') + console.log(`onUpgrade Headers: ${headers}`) + socket.on('data', buffer => { + console.log(buffer.toString('utf8')) + }) + socket.on('end', () => { + client.close() + server.close() + }) + socket.end() + } + }) +}) +``` +### `Dispatcher.connect(options[, callback])` + +Starts two-way communications with the requested resource using [HTTP CONNECT](https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/CONNECT). + +Arguments: + +* **options** `ConnectOptions` +* **callback** `(err: Error | null, data: ConnectData | null) => void` (optional) + +Returns: `void | Promise` - Only returns a `Promise` if no `callback` argument was passed + +#### Parameter: `ConnectOptions` + +* **path** `string` +* **headers** `UndiciHeaders` (optional) - Default: `null` +* **signal** `AbortSignal | events.EventEmitter | null` (optional) - Default: `null` +* **opaque** `unknown` (optional) - This argument parameter is passed through to `ConnectData` + +#### Parameter: `ConnectData` + +* **statusCode** `number` +* **headers** `http.IncomingHttpHeaders` +* **socket** `stream.Duplex` +* **opaque** `unknown` + +#### Example - Connect request with echo + +```js +'use strict' +const { createServer } = require('http') +const { Client } = require('undici') + +const server = createServer((request, response) => { + throw Error('should never get here') +}) + +server.on('connect', (req, socket, head) => { + socket.write('HTTP/1.1 200 Connection established\r\n\r\n') + + let data = head.toString() + socket.on('data', (buf) => { + data += buf.toString() + }) + + socket.on('end', () => { + socket.end(data) + }) +}) + +server.listen(() => { + const client = new Client(`http://localhost:${server.address().port}`) + + client + .connect({ path: '/' }) + .then(({ socket }) => { + const wanted = 'Body' + let data = '' + socket.on('data', d => { data += d }) + socket.on('end', () => { + console.log(`Data received: ${data.toString()} | Data wanted: ${wanted}`) + client.close() + server.close() + }) + socket.write(wanted) + socket.end() + }) +}) +``` + +### `Dispatcher.pipeline(options, handler)` + +For easy use with [stream.pipeline](https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback). The `handler` argument should return a `Readable` from which the result will be read. Usually it should just return the `body` argument unless some kind of transformation needs to be performed based on e.g. `headers` or `statusCode`. The `handler` should validate the response and save any required state. If there is an error, it should be thrown. The function returns a `Duplex` which writes to the request and reads from the response. + +Arguments: + +* **options** `PipelineOptions` +* **handler** `(data: PipelineHandlerData) => stream.Readable` + +Returns: `stream.Duplex` + +#### Parameter: PipelineOptions + +Extends: [`RequestOptions`](#parameter-requestoptions) + +* **objectMode** `boolean` (optional) - Default: `false` - Set to `true` if the `handler` will return an object stream. + +#### Parameter: PipelineHandlerData + +* **statusCode** `number` +* **headers** `IncomingHttpHeaders` +* **opaque** `unknown` +* **body** `stream.Readable` + +#### Example 1 - Pipeline Echo + +```js +'use strict' +const { Readable, Writable, PassThrough, pipeline } = require('stream') +const { createServer } = require('http') +const { Client } = require('undici') + + +const server = createServer((request, response) => { + request.pipe(response) +}) + +server.listen(() => { + const client = new Client(`http://localhost:${server.address().port}`) + + let res = '' + + pipeline( + new Readable({ + read () { + this.push(Buffer.from('undici')) + this.push(null) + } + }), + client.pipeline({ + path: '/', + method: 'GET' + }, ({ statusCode, headers, body }) => { + console.log(`response received ${statusCode}`) + console.log('headers', headers) + return pipeline(body, new PassThrough(), () => {}) + }), + new Writable({ + write (chunk, _, callback) { + res += chunk.toString() + callback() + }, + final (callback) { + console.log(`Response pipelined to writable: ${res}`) + callback() + } + }), + error => { + if (error) { + console.error(error) + } + + client.close() + server.close() + } + ) +}) +``` + +### `Dispatcher.request(options[, callback])` + +Performs a HTTP request. + +Non-idempotent requests will not be pipelined in order +to avoid indirect failures. + +Idempotent requests will be automatically retried if +they fail due to indirect failure from the request +at the head of the pipeline. This does not apply to +idempotent requests with a stream request body. + +Arguments: + +* **options** `RequestOptions` +* **callback** `(error: Error | null, data: ResponseData) => void` (optional) + +Returns: `void | Promise` - Only returns a `Promise` if no `callback` argument was passed + +#### Parameter: `RequestOptions` + +Extends: [`DispatchOptions`](#parameter-dispatchoptions) + +* **opaque** `unknown` (optional) - Default: `null` - Used for passing through context to `ResponseData` +* **signal** `AbortSignal | events.EventEmitter | null` (optional) - Default: `null` + +The `RequestOptions.method` property should not be value `'CONNECT'`. + +#### Parameter: `ResponseData` + +* **statusCode** `number` +* **headers** `http.IncomingHttpHeaders` +* **body** `stream.Readable` +* **trailers** `Record` - This object starts out + as empty and will be mutated to contain trailers after `body` has emitted `'end'`. +* **opaque** `unknown` + +#### Example 1 - Basic GET Request + +```js +'use strict' +const { createServer } = require('http') +const { Client } = require('undici') + +const server = createServer((request, response) => { + response.end('Hello, World!') +}) + +server.listen(() => { + const client = new Client(`http://localhost:${server.address().port}`) + + client.request({ + path: '/', + method: 'GET' + }).then(({ body, headers, statusCode, trailers }) => { + console.log(`response received ${statusCode}`) + console.log('headers', headers) + body.setEncoding('utf8') + body.on('data', console.log) + body.on('end', () => { + console.log('trailers', trailers) + }) + + client.close() + server.close() + }).catch(error => { + console.error(error) + }) +}) +``` + +#### Example 2 - Aborting a request + +> Node.js v15+ is required to run this example + +```js +'use strict' +const { createServer } = require('http') +const { Client } = require('undici') + +const server = createServer((request, response) => { + response.end('Hello, World!') +}) + +server.listen(() => { + const client = new Client(`http://localhost:${server.address().port}`) + const abortController = new AbortController() + + client.request({ + path: '/', + method: 'GET', + signal: abortController.signal + }).catch(error => { + console.error(error) // should print an RequestAbortedError + client.close() + server.close() + }) + + abortController.abort() + +}) +``` + +Alternatively, any `EventEmitter` that emits an `'abort'` event may be used as an abort controller: + +```js +'use strict' +const EventEmitter = require('events') +const { createServer } = require('http') +const { Client } = require('undici') + +const server = createServer((request, response) => { + response.end('Hello, World!') +}) + +server.listen(() => { + const client = new Client(`http://localhost:${server.address().port}`) + const ee = new EventEmitter() + + client.request({ + path: '/', + method: 'GET', + signal: ee + }).catch(error => { + console.error(error) // should print an RequestAbortedError + client.close() + server.close() + }) + + ee.emit('abort') +}) +``` + +Destroying the request or response body will have the same effect. + +```js +'use strict' +const { createServer } = require('http') +const { Client } = require('undici') + +const server = createServer((request, response) => { + response.end('Hello, World!') +}) + +server.listen(() => { + const client = new Client(`http://localhost:${server.address().port}`) + + client.request({ + path: '/', + method: 'GET', + }).then(({ body }) => { + body.destroy() + }).catch(error => { + console.error(error) // should print an RequestAbortedError + client.close() + server.close() + }) +}) +``` + +### `Dispatcher.stream(options, factory[, callback])` + +A faster version of `Dispatcher.request`. This method expects the second argument `factory` to return a [`stream.Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable) stream which the response will be written to. This improves performance by avoiding creating an intermediate [`stream.Readable`](https://nodejs.org/api/stream.html#stream_readable_streams) stream when the user expects to directly pipe the response body to a [`stream.Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable) stream. + +As demonstrated in [Example 1 - Basic GET stream request](#example-1---basic-get-stream-request), it is recommended to use the `option.opaque` property to avoid creating a closure for the `factory` method. This pattern works well with Node.js Web Frameworks such as [Fastify](https://fastify.io). See [Example 2 - Stream to Fastify Response](#example-2---stream-to-fastify-response) for more details. + +Arguments: + +* **options** `RequestOptions` +* **factory** `(data: StreamFactoryData) => stream.Writable` +* **callback** `(error: Error | null, data: StreamData) => void` (optional) + +Returns: `void | Promise` - Only returns a `Promise` if no `callback` argument was passed + +#### Parameter: `StreamFactoryData` + +* **statusCode** `number` +* **headers** `http.IncomingHttpHeaders` +* **opaque** `unknown` + +#### Parameter: `StreamData` + +* **opaque** `unknown` +* **trailers** `Record` + +#### Example 1 - Basic GET stream request + +```js +'use strict' +const { createServer } = require('http') +const { Client } = require('undici') +const { Writable } = require('stream') + +const server = createServer((request, response) => { + response.end('Hello, World!') +}) + +server.listen(() => { + const client = new Client(`http://localhost:${server.address().port}`) + + const bufs = [] + + client.stream({ + path: '/', + method: 'GET', + opaque: { bufs } + }, ({ statusCode, headers, opaque: { bufs } }) => { + console.log(`response received ${statusCode}`) + console.log('headers', headers) + return new Writable({ + write (chunk, encoding, callback) { + bufs.push(chunk) + callback() + } + }) + }).then(({ opaque: { bufs } }) => { + console.log(Buffer.concat(bufs).toString('utf-8')) + + client.close() + server.close() + }).catch(error => { + console.error(error) + }) +}) +``` + +#### Example 2 - Stream to Fastify Response + +In this example, a (fake) request is made to the fastify server using `fastify.inject()`. This request then executes the fastify route handler which makes a subsequent request to the raw Node.js http server using `undici.dispatcher.stream()`. The fastify response is passed to the `opaque` option so that undici can tap into the underlying writable stream using `response.raw`. This methodology demonstrates how one could use undici and fastify together to create fast-as-possible requests from one backend server to another. + +```js +'use strict' + +const { createServer } = require('http') +const undici = require('undici') +const fastify = require('fastify') + +const nodeServer = createServer((request, response) => { + response.end('Hello, World! From Node.js HTTP Server') +}) + +nodeServer.listen(() => { + console.log('Node Server listening') + + const nodeServerUndiciClient = new undici.Client(`http://localhost:${nodeServer.address().port}`) + + const fastifyServer = fastify() + + fastifyServer.route({ + url: '/', + method: 'GET', + handler: (request, response) => { + nodeServerUndiciClient.stream({ + path: '/', + method: 'GET', + opaque: response + }, ({ opaque }) => opaque.raw) + } + }) + + fastifyServer + .listen() + .then(() => { + console.log('Fastify Server listening') + const fastifyServerUndiciClient = new undici.Client(`http://localhost:${fastifyServer.server.address().port}`) + + fastifyServerUndiciClient.request({ + path: '/', + method: 'GET' + }).then(({ statusCode, body }) => { + console.log(`response received ${statusCode}`) + body.setEncoding('utf8') + body.on('data', console.log) + + nodeServerUndiciClient.close() + fastifyServerUndiciClient.close() + fastifyServer.close() + nodeServer.close() + }) + }) +}) +``` + +### `Dispatcher.upgrade(options[, callback])` + +Upgrade to a different protocol. Visit [MDN - HTTP - Protocol upgrade mechanism](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism) for more details. + +Arguments: + +* **options** `UpgradeOptions` + +* **callback** `(error: Error | null, data: UpgradeData) => void` (optional) + +Returns: `void | Promise` - Only returns a `Promise` if no `callback` argument was passed + +#### Parameter: `UpgradeOptions` + +* **path** `string` +* **method** `string` (optional) - Default: `'GET'` +* **headers** `UndiciHeaders` (optional) - Default: `null` +* **protocol** `string` (optional) - Default: `'Websocket'` - A string of comma separated protocols, in descending preference order. +* **signal** `AbortSignal | EventEmitter | null` (optional) - Default: `null` + +#### Parameter: `UpgradeData` + +* **headers** `http.IncomingHeaders` +* **socket** `stream.Duplex` +* **opaque** `unknown` + +#### Example 1 - Basic Upgrade Request + +```js +'use strict' +const { Client } = require('undici') +const { createServer } = require('http') + +const server = createServer((request, response) => { + response.statusCode = 101 + response.setHeader('connection', 'upgrade') + response.setHeader('upgrade', request.headers.upgrade) + response.end() +}) + +server.listen(() => { + const client = new Client(`http://localhost:${server.address().port}`) + + client + .upgrade({ path: '/' }) + .then(({ headers, socket }) => { + socket.on('end', () => { + console.log(`upgrade: ${headers.upgrade}`) + client.close() + server.close() + }) + socket.end() + }) + .catch(error => { + console.error(error) + client.close() + server.close() + }) +}) +``` + +## `Dispatcher.close([callback]): Promise` + +Closes the dispatcher and gracefully waits for enqueued requests to complete before resolving. + +Arguments: + +* **callback** `(error: Error | null, data: null) => void` (optional) + +Returns: `void | Promise` - Only returns a `Promise` if no `callback` argument was passed + +```js +dispatcher.close() // -> Promise +dispatcher.close(() => {}) // -> void +``` + +#### Example - Request resolves before Client closes + +```js +'use strict' +const { createServer } = require('http') +const { Client } = require('undici') + +const server = createServer((request, response) => { + response.end('undici') +}) + +server.listen(() => { + const client = new Client(`http://localhost:${server.address().port}`) + + const request = client.request({ + path: '/', + method: 'GET' + }) + + client.close() + .then(() => { + // This waits for the previous request to complete + console.log('Client closed') + server.close() + }) + + request.then(({ body }) => { + body.setEncoding('utf8') + body.on('data', console.log) // This logs before 'Client closed' + }) +}) +``` + +## `Dispatcher.destroy([error, callback]): Promise` + +Destroy the dispatcher abruptly with the given error. All the pending and running requests will be asynchronously aborted and error. Since this operation is asynchronously dispatched there might still be some progress on dispatched requests. + +Both arguments are optional; the method can be called in four different ways: + +Arguments: + +* **error** `Error | null` (optional) +* **callback** `(error: Error | null, data: null) => void` (optional) + +Returns: `void | Promise` - Only returns a `Promise` if no `callback` argument was passed + +```js +dispatcher.destroy() // -> Promise +dispatcher.destroy(new Error()) // -> Promise +dispatcher.destroy(() => {}) // -> void +dispatcher.destroy(new Error(), () => {}) // -> void +``` + +#### Example - Request is aborted when Client is destroyed + +```js +'use strict' +const { createServer } = require('http') +const { Client } = require('undici') + +const server = createServer((request, response) => { + response.end('undici') +}) + +server.listen(() => { + const client = new Client(`http://localhost:${server.address().port}`) + + const request = client.request({ + path: '/', + method: 'GET' + }) + + client.destroy() + .then(() => { + // Still waits for requests to complete + console.log('Client destroyed') + server.close() + }) + + // The request promise will reject with an Undici Client Destroyed error + request.catch(error => { + console.error(error) + }) +}) +``` + +### `Dispatcher.dispatch(options, handlers)` + +Dispatches a request. + +This API is expected to evolve through semver-major versions and is less stable than the preceding higher level APIs. It is primarily intended for library developers who implement higher level APIs on top of this. + +Arguments: + +* **options** `DispatchOptions` +* **handlers** `DispatchHandlers` + +Returns: `Boolean`, `false` if user should wait for `'drain'` event before calling `Dispatcher.dispatch` again. + +#### Parameter: `DispatchOptions` + +* **origin** `string | URL` +* **path** `string` +* **method** `string` +* **body** `string | Buffer | Uint8Array | stream.Readable | null` (optional) - Default: `null` +* **headers** `UndiciHeaders` (optional) - Default: `null` +* **idempotent** `boolean` (optional) - Default: `true` if `method` is `'HEAD'` or `'GET'` - Whether the requests can be safely retried or not. If `false` the request won't be sent until all preceeding requests in the pipeline has completed. +* **upgrade** `string | null` (optional) - Default: `method === 'CONNECT' || null` - Upgrade the request. Should be used to specify the kind of upgrade i.e. `'Websocket'`. + +#### Parameter: `DispatchHandlers` + +* **onConnect** `(abort: () => void) => void` - Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. +* **onError** `(error: Error) => void` - Invoked when an error has occurred. +* **onUpgrade** `(statusCode: number, headers: string[] | null, socket: Duplex) => void` (optional) - Invoked when request is upgraded. Required if `DispatchOptions.upgrade` is defined or `DispatchOptions.method === 'CONNECT'`. +* **onHeaders** `(statusCode: number, headers: string[] | null, resume: () => void) => boolean` - Invoked when statusCode and headers have been received. May be invoked multiple times due to 1xx informational headers. Not required for `upgrade` requests. +* **onData** `(chunk: Buffer) => boolean` - Invoked when response payload data is received. Not required for `upgrade` requests. +* **onComplete** `(trailers: string[] | null) => void` - Invoked when response payload and trailers have been received and the request has completed. Not required for `upgrade` requests. + +## Instance Events + +### Event: `'connect'` + +Parameters: + +* **origin** `URL` +* **targets** `Array` + +### Event: `'disconnect'` + +Parameters: + +* **origin** `URL` +* **targets** `Array` +* **error** `Error` + +### Event: `'drain'` + +Parameters: + +* **origin** `URL` + +Emitted when dispatcher is no longer busy. diff --git a/docs/api/Pool.md b/docs/api/Pool.md index 4283faa5e3c..e381f6e1c34 100644 --- a/docs/api/Pool.md +++ b/docs/api/Pool.md @@ -1,8 +1,8 @@ # Class: Pool -Extends: `events.EventEmitter` +Extends: `undici.Dispatcher` -A pool of [Client](docs/api/Client.md) instances connected to the same upstream target. Implements the same api as [Client](docs/api/Client.md). +A pool of [Client](docs/api/Client.md) instances connected to the same upstream target. Requests are not guaranteed to be dispatched in order of invocation. @@ -17,6 +17,7 @@ Arguments: Extends: [`ClientOptions`](docs/api/Client.md#parameter-clientoptions) +* **factory** `(origin: URL, opts: Object) => Dispatcher` - Default: `(origin, opts) => new Client(origin, opts)` * **connections** `number | null` (optional) - Default: `null` - The number of `Client` instances to create. When set to `null`, the `Pool` instance will create an unlimited amount of `Client` instances. ## Instance Properties @@ -41,11 +42,6 @@ Implements [Client.destroyed](docs/api/Client.md#clientdestroyed) Implements [Client.pending](docs/api/Client.md#clientpending) - - ### `Pool.running` Implements [Client.running](docs/api/Client.md#clientrunning) @@ -56,52 +52,52 @@ Implements [Client.size](docs/api/Client.md#clientsize) ### `Pool.url` -Implements [Client.url](docs/api/Client.md#clienturl) +Implements [Client.url](docs/api/Client.md#clienturl). ## Instance Methods -### `Pool.close(callback)` +### `Pool.close([callback])` -Implements [`Client.close([ callback ])`](docs/api/Client.md#clientclose-callback-) +Implements [`Dispatcher.close([callback])`](docs/api/Dispatcher.md#clientclose-callback-). -### `Pool.connect(options[, callback])` +### `Pool.destroy([error, callback])` -Implements [`Client.connect(options [, callback])`](docs/api/Client.md#clientconnectoptions--callback) +Implements [`Dispatcher.destroy([error, callback])`](docs/api/Dispatcher.md#dispatcher-callback-). -### `Pool.destroy(error)` +### `Pool.connect(options[, callback])` -Implements [`Client.destroy(error)`](docs/api/Client.md#clientdestroyerror) +See [`Dispatcher.connect(options[, callback])`](docs/api/Dispatcher.md#clientconnectoptions--callback). ### `Pool.dispatch(options, handlers)` -Implements [`Client.dispatch(options, handlers)`](docs/api/Client.md#clientdispatchoptions-handlers) +Implements [`Dispatcher.dispatch(options, handlers)`](docs/api/Dispatcher.md#clientdispatchoptions-handlers). ### `Pool.pipeline(options, handler)` -Implements [`Client.pipeline(options, handler)`](docs/api/Client.md#clientpipelineoptions-handler) +See [`Dispatcher.pipeline(options, handler)`](docs/api/Dispatcher.md#clientpipelineoptions-handler). ### `Pool.request(options[, callback])` -Implements [`Client.request(options [, callback])`](docs/api/Client.md#clientrequestoptions--callback) +See [`Dispatcher.request(options [, callback])`](docs/api/Dispatcher.md#clientrequestoptions--callback). ### `Pool.stream(options, factory[, callback])` -Implements [`Client.stream(options, factory [, callback])`](docs/api/Client.md#clientstreamoptions-factory--callback) +See [`Dispatcher.stream(options, factory[, callback])`](docs/api/Dispatcher.md#clientstreamoptions-factory--callback). ### `Pool.upgrade(options[, callback])` -Implements [`Client.upgrade(options[, callback])`](docs/api/Client.md#clientupgradeoptions-callback) +See [`Dispatcher.upgrade(options[, callback])`](docs/api/Dispatcher.md#clientupgradeoptions-callback). ## Instance Events ### Event: `'connect'` -Implements [Client Event: `'connect'`](docs/api/Client.md#event-connect) +See [Dispatcher Event: `'connect'`](docs/api/Dispatcher.md#event-connect). ### Event: `'disconnect'` -Implements [Client Event: `'disconnect'`](docs/api/Client.md#event-connect) +See [Dispatcher Event: `'disconnect'`](docs/api/Dispatcher.md#event-connect). ### Event: `'drain'` -Implements [Client Event: `'drain'`](docs/api/Client.md#event-connect) +See [Dispatcher Event: `'drain'`](docs/api/Dispatcher.md#event-connect). diff --git a/index.js b/index.js index f1caae4da03..ba1d24fc56e 100644 --- a/index.js +++ b/index.js @@ -1,45 +1,58 @@ 'use strict' -const Client = require('./lib/core/client') +const Client = require('./lib/client') +const Dispatcher = require('./lib/dispatcher') const errors = require('./lib/core/errors') -const Pool = require('./lib/client-pool') -const { Agent, getGlobalAgent, setGlobalAgent } = require('./lib/agent') +const Pool = require('./lib/pool') +const Agent = require('./lib/agent') const util = require('./lib/core/util') const { InvalidArgumentError } = require('./lib/core/errors') const api = require('./lib/api') -Object.assign(Client.prototype, api) -Object.assign(Pool.prototype, api) +Object.assign(Dispatcher.prototype, api) -function undici (url, opts) { - return new Pool(url, opts) -} - -module.exports = undici - -module.exports.Pool = Pool +module.exports.Dispatcher = Dispatcher module.exports.Client = Client +module.exports.Pool = Pool +module.exports.Agent = Agent + module.exports.errors = errors -module.exports.Agent = Agent -module.exports.setGlobalAgent = setGlobalAgent -module.exports.getGlobalAgent = getGlobalAgent +let globalDispatcher = new Agent() -function dispatchFromAgent (fn) { - return (url, { agent = getGlobalAgent(), method = 'GET', ...opts } = {}, ...additionalArgs) => { +function setGlobalDispatcher (agent) { + if (!agent || typeof agent.dispatch !== 'function') { + throw new InvalidArgumentError('Argument agent must implement Agent') + } + globalDispatcher = agent +} + +function getGlobalDispatcher () { + return globalDispatcher +} + +function makeDispatcher (fn) { + return (url, { agent, dispatcher = getGlobalDispatcher(), method = 'GET', ...opts } = {}, ...additionalArgs) => { if (opts.path != null) { throw new InvalidArgumentError('unsupported opts.path') } + if (agent) { + throw new InvalidArgumentError('unsupported opts.agent. Did you mean opts.client?') + } + const { origin, pathname, search } = util.parseURL(url) const path = search ? `${pathname}${search}` : pathname - return fn.call(agent, { ...opts, origin, method, path }, ...additionalArgs) + return fn.call(dispatcher, { ...opts, origin, method, path }, ...additionalArgs) } } -module.exports.request = dispatchFromAgent(api.request) -module.exports.stream = dispatchFromAgent(api.stream) -module.exports.pipeline = dispatchFromAgent(api.pipeline) -module.exports.connect = dispatchFromAgent(api.connect) -module.exports.upgrade = dispatchFromAgent(api.upgrade) +module.exports.setGlobalDispatcher = setGlobalDispatcher +module.exports.getGlobalDispatcher = getGlobalDispatcher + +module.exports.request = makeDispatcher(api.request) +module.exports.stream = makeDispatcher(api.stream) +module.exports.pipeline = makeDispatcher(api.pipeline) +module.exports.connect = makeDispatcher(api.connect) +module.exports.upgrade = makeDispatcher(api.upgrade) diff --git a/lib/agent.js b/lib/agent.js new file mode 100644 index 00000000000..4e8ee0f5e2b --- /dev/null +++ b/lib/agent.js @@ -0,0 +1,227 @@ +'use strict' + +const { + ClientClosedError, + InvalidArgumentError, + ClientDestroyedError +} = require('./core/errors') +const Dispatcher = require('./dispatcher') +const Pool = require('./pool') +const Client = require('./client') +const util = require('./core/util') +const assert = require('assert') +const RedirectHandler = require('./handler/redirect') + +const kDestroyed = Symbol('destroyed') +const kClosed = Symbol('closed') +const kOnConnect = Symbol('onConnect') +const kOnDisconnect = Symbol('onDisconnect') +const kClients = Symbol('clients') +const kMaxRedirections = Symbol('maxRedirections') +const kOnDrain = Symbol('onDrain') +const kFactory = Symbol('factory') +const kFinalizer = Symbol('finalizer') +const kOptions = Symbol('options') + +function defaultFactory (origin, opts) { + return opts && opts.connections === 1 + ? new Client(origin, opts) + : new Pool(origin, opts) +} + +const WeakRef = global.WeakRef || class CompatWeakRef { + constructor (value) { + this.value = value + } + + deref () { + return this.value.connected === 0 && this.value.size === 0 + ? undefined + : this.value + } +} +const FinalizationRegistry = global.FinalizationRegistry || class CompatFinalizer { + constructor (finalizer) { + this.finalizer = finalizer + } + + register (dispatcher, key) { + dispatcher.on('disconnect', () => { + if (dispatcher.connected === 0 && dispatcher.size === 0) { + this.finalizer(key) + } + }) + } +} + +class Agent extends Dispatcher { + constructor ({ factory = defaultFactory, maxRedirections = 0, ...options } = {}) { + super() + + if (typeof factory !== 'function') { + throw new InvalidArgumentError('factory must be a function.') + } + + if (!Number.isInteger(maxRedirections) || maxRedirections < 0) { + throw new InvalidArgumentError('maxRedirections must be a positive number') + } + + this[kOptions] = JSON.parse(JSON.stringify(options)) + this[kMaxRedirections] = maxRedirections + this[kFactory] = factory + this[kClients] = new Map() + this[kFinalizer] = new FinalizationRegistry(key => { + const ref = this[kClients].get(key) + if (ref !== undefined && ref.deref() === undefined) { + this[kClients].delete(key) + } + }) + this[kClosed] = false + this[kDestroyed] = false + + const agent = this + + this[kOnDrain] = (origin, targets) => { + agent.emit('drain', origin, [agent, ...targets]) + } + + this[kOnConnect] = (origin, targets) => { + agent.emit('connect', origin, [agent, ...targets]) + } + + this[kOnDisconnect] = (origin, targets, err) => { + agent.emit('disconnect', origin, [agent, ...targets], err) + } + } + + get connected () { + let ret = 0 + for (const { connected } of this[kClients].values()) { + ret += connected + } + return ret + } + + get size () { + let ret = 0 + for (const { size } of this[kClients].values()) { + ret += size + } + return ret + } + + get pending () { + let ret = 0 + for (const { pending } of this[kClients].values()) { + ret += pending + } + return ret + } + + get running () { + let ret = 0 + for (const { running } of this[kClients].values()) { + ret += running + } + return ret + } + + dispatch (opts, handler) { + if (!handler || typeof handler !== 'object') { + throw new InvalidArgumentError('handler') + } + + try { + if (!opts || typeof opts !== 'object') { + throw new InvalidArgumentError('opts must be a object.') + } + + if (typeof opts.origin !== 'string' || opts.origin === '') { + throw new InvalidArgumentError('opts.origin must be a non-empty string.') + } + + if (this[kDestroyed]) { + throw new ClientDestroyedError() + } + + if (this[kClosed]) { + throw new ClientClosedError() + } + + const ref = this[kClients].get(opts.origin) + + let dispatcher = ref ? ref.deref() : null + if (!dispatcher) { + dispatcher = this[kFactory](opts.origin, this[kOptions]) + .on('connect', this[kOnConnect]) + .on('disconnect', this[kOnDisconnect]) + .on('drain', this[kOnDrain]) + + this[kClients].set(opts.origin, new WeakRef(dispatcher)) + this[kFinalizer].register(dispatcher, opts.origin) + } + + const { maxRedirections = this[kMaxRedirections] } = opts + + if (!Number.isInteger(maxRedirections) || maxRedirections < 0) { + throw new InvalidArgumentError('maxRedirections must be a positive number') + } + + if (!maxRedirections) { + return dispatcher.dispatch(opts, handler) + } + + if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) { + // TODO (fix): Provide some way for the user to cache the file to e.g. /tmp + // so that it can be dispatched again? + // TODO (fix): Do we need 100-expect support to provide a way to do this properly? + return dispatcher.dispatch(opts, handler) + } + + /* istanbul ignore next */ + if (util.isStream(opts.body)) { + opts.body + .on('data', function () { + assert(false) + }) + } + + return dispatcher.dispatch(opts, new RedirectHandler(this, opts, handler)) + } catch (err) { + if (typeof handler.onError !== 'function') { + throw new InvalidArgumentError('invalid onError method') + } + + handler.onError(err) + } + } + + close () { + this[kClosed] = true + + const closePromises = [] + for (const ref of this[kClients].values()) { + const client = ref.deref() + if (client) { + closePromises.push(client.close()) + } + } + return Promise.all(closePromises) + } + + destroy () { + this[kClosed] = true + this[kDestroyed] = true + + const destroyPromises = [] + for (const ref of this[kClients].values()) { + const client = ref.deref() + if (client) { + destroyPromises.push(client.destroy()) + } + } + return Promise.all(destroyPromises) + } +} + +module.exports = Agent diff --git a/lib/core/client.js b/lib/client.js similarity index 97% rename from lib/core/client.js rename to lib/client.js index 055a5346c60..53744690f59 100644 --- a/lib/core/client.js +++ b/lib/client.js @@ -2,11 +2,11 @@ const net = require('net') const tls = require('tls') -const HTTPParser = require('../llhttp/parser') -const EventEmitter = require('events') +const HTTPParser = require('./llhttp/parser') const assert = require('assert') -const util = require('./util') -const Request = require('./request') +const util = require('./core/util') +const Request = require('./core/request') +const Dispatcher = require('./dispatcher') const { ContentLengthMismatchError, TrailerMismatchError, @@ -19,7 +19,7 @@ const { SocketError, InformationalError, BodyTimeoutError -} = require('./errors') +} = require('./core/errors') const { kUrl, kReset, @@ -57,7 +57,7 @@ const { kHeadersTimeout, kBodyTimeout, kStrictContentLength -} = require('./symbols') +} = require('./core/symbols') const insecureHTTPParser = process.execArgv.includes('--insecure-http-parser') @@ -70,7 +70,7 @@ function getServerName (client, host) { ) } -class Client extends EventEmitter { +class Client extends Dispatcher { constructor (url, { maxHeaderSize, headersTimeout, @@ -183,14 +183,17 @@ class Client extends EventEmitter { this[kPendingIdx] = 0 } + // TODO: Make private? get url () { return this[kUrl] } + // TODO: Make private? get pipelining () { return this[kPipelining] } + // TODO: Make private? set pipelining (value) { this[kPipelining] = value resume(this, true) @@ -220,13 +223,9 @@ class Client extends EventEmitter { return this[kQueue].length - this[kRunningIdx] } + // TODO: Make private? get busy () { - const socket = this[kSocket] - return ( - (socket && (socket[kReset] || socket[kWriting])) || - (this.size >= (this[kPipelining] || 1)) || - this.pending > 0 - ) + return this[kNeedDrain] > 1 } get destroyed () { @@ -277,6 +276,10 @@ class Client extends EventEmitter { } else { resume(this, true) } + + if (this[kResuming] && this[kNeedDrain] !== 2 && isBusy(this)) { + this[kNeedDrain] = 2 + } } catch (err) { if (typeof handler.onError !== 'function') { throw new InvalidArgumentError('invalid onError method') @@ -284,6 +287,8 @@ class Client extends EventEmitter { handler.onError(err) } + + return this[kNeedDrain] < 2 } close (callback) { @@ -473,7 +478,7 @@ class Parser extends HTTPParser { detachSocket(socket) client[kSocket] = null client[kQueue][client[kRunningIdx]++] = null - client.emit('disconnect', client, new InformationalError('upgrade')) + client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade')) try { request.onUpgrade(statusCode, headers, socket) @@ -756,7 +761,7 @@ function onSocketConnect () { clearTimeout(this[kConnectTimeout]) this[kConnectTimeout] = null - client.emit('connect', client) + client.emit('connect', client[kUrl], [client]) resume(client) } @@ -844,7 +849,7 @@ function onSocketClose () { // Retry remaining requests. client[kPendingIdx] = client[kRunningIdx] - client.emit('disconnect', client, err) + client.emit('disconnect', client[kUrl], [client], err) } resume(client) @@ -919,7 +924,16 @@ function connect (client) { function emitDrain (client) { client[kNeedDrain] = 0 - client.emit('drain') + client.emit('drain', client[kUrl], [client]) +} + +function isBusy (client) { + const socket = client[kSocket] + return ( + (socket && (socket[kReset] || socket[kWriting])) || + (client.size >= (client[kPipelining] || 1)) || + client.pending > 0 + ) } function resume (client, sync) { @@ -974,7 +988,7 @@ function _resume (client, sync) { } } - if (client.busy) { + if (isBusy(client)) { client[kNeedDrain] = 2 } else if (client[kNeedDrain] === 2) { if (sync) { diff --git a/lib/dispatcher.js b/lib/dispatcher.js new file mode 100644 index 00000000000..9b809d892d0 --- /dev/null +++ b/lib/dispatcher.js @@ -0,0 +1,19 @@ +'use strict' + +const EventEmitter = require('events') + +class Dispatcher extends EventEmitter { + dispatch () { + throw new Error('not implemented') + } + + close () { + throw new Error('not implemented') + } + + destroy () { + throw new Error('not implemented') + } +} + +module.exports = Dispatcher diff --git a/lib/agent/redirect.js b/lib/handler/redirect.js similarity index 98% rename from lib/agent/redirect.js rename to lib/handler/redirect.js index 351169f1c47..5169de557eb 100644 --- a/lib/agent/redirect.js +++ b/lib/handler/redirect.js @@ -4,8 +4,6 @@ const { InvalidArgumentError } = require('../core/errors') const util = require('../core/util') const assert = require('assert') -const kAgent = Symbol('agent') - class RedirectHandler { constructor (agent, opts, handler) { this.agent = agent @@ -26,10 +24,6 @@ class RedirectHandler { this.handler.onError(err) } - get (origin) { - return this[kAgent].get(origin) - } - onConnect (abort) { if (this.destroyed) { abort() diff --git a/lib/client-pool.js b/lib/pool.js similarity index 79% rename from lib/client-pool.js rename to lib/pool.js index 071a74630ee..b4f15cacb33 100644 --- a/lib/client-pool.js +++ b/lib/pool.js @@ -1,7 +1,7 @@ 'use strict' -const EventEmitter = require('events') -const Client = require('./core/client') +const Dispatcher = require('./dispatcher') +const Client = require('./client') const { ClientClosedError, InvalidArgumentError, @@ -10,6 +10,7 @@ const { const FixedQueue = require('./node/fixed-queue') const util = require('./core/util') const { kTLSSession } = require('./core/symbols') +const assert = require('assert') const kClients = Symbol('clients') const kNeedDrain = Symbol('needDrain') @@ -24,7 +25,6 @@ const kOnConnect = Symbol('onConnect') const kOnDisconnect = Symbol('onDisconnect') const kOnTLSSession = Symbol('onTLSSession') const kPending = Symbol('pending') -const kConnected = Symbol('connected') const kConnections = Symbol('connections') const kFactory = Symbol('factory') @@ -32,9 +32,10 @@ function defaultFactory (origin, opts) { return new Client(origin, opts) } -class Pool extends EventEmitter { +class Pool extends Dispatcher { constructor (origin, { connections, factory = defaultFactory, ...options } = {}) { super() + if (connections != null && (!Number.isFinite(connections) || connections < 0)) { throw new InvalidArgumentError('invalid connections') } @@ -53,26 +54,33 @@ class Pool extends EventEmitter { this[kClients] = [] this[kNeedDrain] = false this[kPending] = 0 - this[kConnected] = 0 this[kFactory] = factory const pool = this - this[kOnDrain] = function onDrain () { + this[kOnDrain] = function onDrain (url, targets) { + assert(pool[kUrl].origin === url.origin) + const queue = pool[kQueue] - while (!this.busy) { + this[kNeedDrain] = false + + let needDrain = false + + while (!needDrain) { const item = queue.shift() if (!item) { break } pool[kPending]-- - this.dispatch(item.opts, item.handler) + needDrain = !this.dispatch(item.opts, item.handler) } - if (pool[kNeedDrain] && !this.busy) { + if (needDrain) { + this[kNeedDrain] = true + } else if (pool[kNeedDrain]) { pool[kNeedDrain] = false - pool.emit('drain') + pool.emit('drain', origin, [pool, ...targets]) } if (pool[kClosedResolve] && queue.isEmpty()) { @@ -82,14 +90,12 @@ class Pool extends EventEmitter { } } - this[kOnConnect] = function onConnect (client) { - pool[kConnected]++ - pool.emit('connect', client) + this[kOnConnect] = (origin, targets) => { + pool.emit('connect', origin, [pool, ...targets]) } - this[kOnDisconnect] = function onDisconnect (client, err) { - pool[kConnected]-- - pool.emit('disconnect', client, err) + this[kOnDisconnect] = (origin, targets, err) => { + pool.emit('disconnect', origin, [pool, ...targets], err) } this[kOnTLSSession] = function cacheClientTLSSession (session) { @@ -99,29 +105,22 @@ class Pool extends EventEmitter { } } + // TODO: Make private? get url () { return this[kUrl] } get connected () { - return this[kConnected] + let ret = 0 + for (const { connected } of this[kClients]) { + ret += connected + } + return ret } + // TODO: Make private? get busy () { - if (this[kPending] > 0) { - return true - } - - if (this[kConnections] && this[kClients].length === this[kConnections]) { - for (const { busy } of this[kClients]) { - if (!busy) { - return false - } - } - return true - } - - return false + return this[kNeedDrain] } get pending () { @@ -180,9 +179,9 @@ class Pool extends EventEmitter { throw new ClientClosedError() } - let client = this[kClients].find(client => !client.busy) + let dispatcher = this[kClients].find(dispatcher => !dispatcher[kNeedDrain]) - if (!client) { + if (!dispatcher) { if (!this[kConnections] || this[kClients].length < this[kConnections]) { let options = this[kOptions] @@ -195,28 +194,26 @@ class Pool extends EventEmitter { options = { ...options, tls: { ...options.tls, session: this[kTLSSession] } } } - client = this[kFactory](this[kUrl], options) + dispatcher = this[kFactory](this[kUrl], options) .on('drain', this[kOnDrain]) .on('connect', this[kOnConnect]) .on('disconnect', this[kOnDisconnect]) if (!options.tls || (options.tls.reuseSessions !== false && !options.tls.session)) { - client.on('session', this[kOnTLSSession]) + dispatcher.on('session', this[kOnTLSSession]) } - this[kClients].push(client) + this[kClients].push(dispatcher) } } - if (!client) { + if (!dispatcher) { this[kNeedDrain] = true this[kQueue].push({ opts, handler }) this[kPending]++ - } else { - client.dispatch(opts, handler) - if (client.busy && this.busy) { - this[kNeedDrain] = true - } + } else if (!dispatcher.dispatch(opts, handler)) { + dispatcher[kNeedDrain] = true + this[kNeedDrain] = this[kConnections] && this[kClients].length === this[kConnections] } } catch (err) { if (typeof handler.onError !== 'function') { @@ -225,6 +222,8 @@ class Pool extends EventEmitter { handler.onError(err) } + + return !this[kNeedDrain] } close (cb) { diff --git a/test/agent.js b/test/agent.js index 70448c000c4..7f156dced53 100644 --- a/test/agent.js +++ b/test/agent.js @@ -2,39 +2,35 @@ const { test } = require('tap') const http = require('http') -const { Agent, request, stream, pipeline, setGlobalAgent } = require('../') +const { Agent, request, stream, pipeline, setGlobalDispatcher } = require('../') const { PassThrough } = require('stream') const { InvalidArgumentError } = require('../lib/core/errors') -const { Client, Pool, errors } = require('../index') -const { promisify } = require('util') +const { errors } = require('../index') -test('setGlobalAgent', t => { +test('setGlobalDispatcher', t => { t.plan(2) t.test('fails if agent does not implement `get` method', t => { t.plan(1) - t.throw(() => setGlobalAgent({ get: 'not a function' }), InvalidArgumentError) + t.throw(() => setGlobalDispatcher({ dispatch: 'not a function' }), InvalidArgumentError) }) t.test('sets global agent', t => { t.plan(2) - t.notThrow(() => setGlobalAgent(new Agent())) - t.notThrow(() => setGlobalAgent({ get: () => {} })) + t.notThrow(() => setGlobalDispatcher(new Agent())) + t.notThrow(() => setGlobalDispatcher({ dispatch: () => {} })) }) t.tearDown(() => { // reset globalAgent to a fresh Agent instance for later tests - setGlobalAgent(new Agent()) + setGlobalDispatcher(new Agent()) }) }) test('Agent', t => { - t.plan(4) + t.plan(1) t.notThrow(() => new Agent()) - t.notThrow(() => new Agent({ connections: 5 })) - t.throw(() => new Agent().get(), InvalidArgumentError) - t.throw(() => new Agent().get(''), InvalidArgumentError) }) test('agent should close internal pools', t => { @@ -50,11 +46,11 @@ test('agent should close internal pools', t => { t.tearDown(server.close.bind(server)) server.listen(0, () => { - const agent = new Agent() + const dispatcher = new Agent() const origin = `http://localhost:${server.address().port}` - request(origin, { agent }) + request(origin, { dispatcher }) .then(() => { t.pass('first request should resolve') }) @@ -62,17 +58,15 @@ test('agent should close internal pools', t => { t.fail(err) }) - const pool = agent.get(origin) - pool.once('connect', () => { - agent.close().then(() => { - request(origin, { agent }) - .then(() => { - t.fail('second request should not resolve') - }) - .catch(err => { - t.error(err instanceof errors.ClientClosedError) - }) - }) + dispatcher.once('connect', () => { + dispatcher.close() + .then(() => request(origin, { dispatcher })) + .then(() => { + t.fail('second request should not resolve') + }) + .catch(err => { + t.ok(err instanceof errors.ClientClosedError) + }) }) }) }) @@ -90,11 +84,11 @@ test('agent should destroy internal pools', t => { t.tearDown(server.close.bind(server)) server.listen(0, () => { - const agent = new Agent() + const dispatcher = new Agent() const origin = `http://localhost:${server.address().port}` - request(origin, { agent }) + request(origin, { dispatcher }) .then(() => { t.fail() }) @@ -102,17 +96,15 @@ test('agent should destroy internal pools', t => { t.ok(err instanceof errors.ClientDestroyedError) }) - const pool = agent.get(origin) - pool.once('connect', () => { - agent.destroy().then(() => { - request(origin, { agent }) - .then(() => { - t.fail() - }) - .catch(err => { - t.ok(err instanceof errors.ClientDestroyedError) - }) - }) + dispatcher.once('connect', () => { + dispatcher.destroy() + .then(() => request(origin, { dispatcher })) + .then(() => { + t.fail() + }) + .catch(err => { + t.ok(err instanceof errors.ClientDestroyedError) + }) }) }) }) @@ -132,22 +124,22 @@ test('multiple connections', t => { server.listen(0, async () => { const origin = `http://localhost:${server.address().port}` - const agent = new Agent({ connections }) + const dispatcher = new Agent({ connections }) - t.tearDown(agent.close.bind(agent)) + t.tearDown(dispatcher.close.bind(dispatcher)) - agent.on('connect', (client) => { - t.ok(client) + dispatcher.on('connect', (origin, [dispatcher]) => { + t.ok(dispatcher) }) - agent.on('disconnect', (client, error) => { - t.ok(client) + dispatcher.on('disconnect', (origin, [dispatcher], error) => { + t.ok(dispatcher) t.true(error instanceof errors.InformationalError) t.strictEqual(error.code, 'UND_ERR_INFO') t.strictEqual(error.message, 'reset') }) for (let i = 0; i < connections; i++) { - await request(origin, { agent }) + await request(origin, { dispatcher }) .then(() => { t.pass('should pass') }) @@ -158,73 +150,6 @@ test('multiple connections', t => { }) }) -test('remove disconnect listeners when destroyed', t => { - t.plan(3) - - const server = http.createServer((req, res) => { - res.writeHead(200, { - Connection: 'keep-alive', - 'Keep-Alive': 'timeout=1s' - }) - res.end('ok') - }) - t.tearDown(server.close.bind(server)) - - server.listen(0, async () => { - const origin = `http://localhost:${server.address().port}` - const agent = new Agent() - - t.tearDown(agent.close.bind(agent)) - - const pool = agent.get(origin) - t.true(pool.listeners('disconnect').length === 1) - - agent.on('disconnect', () => { - t.true(pool.listeners('disconnect').length === 0) - }) - - await request(origin, { agent }) - .then(() => { - t.pass('should pass') - }) - .catch(err => { - t.fail(err) - }) - }) -}) - -test('check if pool', async t => { - t.plan(1) - - const server = http.createServer() - t.tearDown(server.close.bind(server)) - await promisify(server.listen.bind(server))(0) - - const origin = `http://localhost:${server.address().port}` - const agent = new Agent() - - t.tearDown(agent.close.bind(agent)) - - const pool = agent.get(origin) - t.true(pool instanceof Pool) -}) - -test('check if client', async t => { - t.plan(1) - - const server = http.createServer() - t.tearDown(server.close.bind(server)) - await promisify(server.listen.bind(server))(0) - - const origin = `http://localhost:${server.address().port}` - const agent = new Agent({ connections: 1 }) - - t.tearDown(agent.close.bind(agent)) - - const pool = agent.get(origin) - t.true(pool instanceof Client) -}) - test('with globalAgent', t => { t.plan(6) const wanted = 'payload' @@ -272,10 +197,10 @@ test('with local agent', t => { t.tearDown(server.close.bind(server)) - const agent = new Agent() + const dispatcher = new Agent() server.listen(0, () => { - request(`http://localhost:${server.address().port}`, { agent }) + request(`http://localhost:${server.address().port}`, { dispatcher }) .then(({ statusCode, headers, body }) => { t.strictEqual(statusCode, 200) t.strictEqual(headers['content-type'], 'text/plain') @@ -361,13 +286,13 @@ test('with a local agent', t => { t.tearDown(server.close.bind(server)) - const agent = new Agent() + const dispatcher = new Agent() server.listen(0, () => { stream( `http://localhost:${server.address().port}`, { - agent, + dispatcher, opaque: new PassThrough() }, ({ statusCode, headers, opaque: pt }) => { @@ -450,14 +375,14 @@ test('with a local agent', t => { t.tearDown(server.close.bind(server)) - const agent = new Agent() + const dispatcher = new Agent() server.listen(0, () => { const bufs = [] pipeline( `http://localhost:${server.address().port}`, - { agent }, + { dispatcher }, ({ statusCode, headers, body }) => { t.strictEqual(statusCode, 200) t.strictEqual(headers['content-type'], 'text/plain') @@ -494,8 +419,8 @@ test('constructor validations', t => { }) test('dispatch validations', t => { - const agent = new Agent() + const dispatcher = new Agent() t.plan(1) - t.throw(() => agent.dispatch('ASD'), InvalidArgumentError, 'throws on invalid opts argument') + t.throw(() => dispatcher.dispatch('ASD'), InvalidArgumentError, 'throws on invalid opts argument') }) diff --git a/test/client-upgrade.js b/test/client-upgrade.js index 4626efddac7..c5c486b4b9f 100644 --- a/test/client-upgrade.js +++ b/test/client-upgrade.js @@ -396,7 +396,7 @@ test('upgrade disconnect', (t) => { const client = new Client(`http://localhost:${server.address().port}`) t.tearDown(client.close.bind(client)) - client.on('disconnect', (self, error) => { + client.on('disconnect', (origin, [self], error) => { t.strictEqual(client, self) t.ok(error instanceof Error) }) diff --git a/test/client.js b/test/client.js index 2276ee35eb6..fbd2d038a9a 100644 --- a/test/client.js +++ b/test/client.js @@ -1001,7 +1001,7 @@ test('busy', (t) => { }) test('connected', (t) => { - t.plan(5) + t.plan(7) const server = createServer((req, res) => { req.pipe(res) @@ -1009,15 +1009,18 @@ test('connected', (t) => { t.tearDown(server.close.bind(server)) server.listen(0, () => { - const client = new Client(`http://localhost:${server.address().port}`, { + const url = new URL(`http://localhost:${server.address().port}`) + const client = new Client(url, { pipelining: 1 }) t.tearDown(client.close.bind(client)) - client.on('connect', self => { + client.on('connect', (origin, [self]) => { + t.strictEqual(origin, url) t.strictEqual(client, self) }) - client.on('disconnect', self => { + client.on('disconnect', (origin, [self]) => { + t.strictEqual(origin, url) t.strictEqual(client, self) }) diff --git a/test/content-length.js b/test/content-length.js index bbc830d17a4..39e2f0bc349 100644 --- a/test/content-length.js +++ b/test/content-length.js @@ -256,7 +256,7 @@ test('response invalid content length with close', (t) => { }) t.teardown(client.destroy.bind(client)) - client.on('disconnect', (client, err) => { + client.on('disconnect', (origin, client, err) => { t.strictEqual(err.code, 'UND_ERR_SOCKET') }) diff --git a/test/pool.js b/test/pool.js index c149856e9a1..618ae0d2fc5 100644 --- a/test/pool.js +++ b/test/pool.js @@ -2,7 +2,6 @@ const proxyquire = require('proxyquire') const { test } = require('tap') -const undici = require('..') const { Client, Pool, errors } = require('..') const { createServer } = require('http') const { EventEmitter } = require('events') @@ -33,10 +32,10 @@ test('connect/disconnect event(s)', (t) => { }) t.tearDown(pool.close.bind(pool)) - pool.on('connect', (client) => { + pool.on('connect', (origin, [pool, client]) => { t.strictEqual(client instanceof Client, true) }) - pool.on('disconnect', (client, error) => { + pool.on('disconnect', (origin, [pool, client], error) => { t.true(client instanceof Client) t.true(error instanceof errors.InformationalError) t.strictEqual(error.code, 'UND_ERR_INFO') @@ -67,7 +66,7 @@ test('basic get', (t) => { t.tearDown(server.close.bind(server)) server.listen(0, async () => { - const client = undici(`http://localhost:${server.address().port}`) + const client = new Pool(`http://localhost:${server.address().port}`) t.tearDown(client.destroy.bind(client)) t.strictEqual(client.url.origin, `http://localhost:${server.address().port}`) @@ -115,7 +114,7 @@ test('URL as arg', (t) => { server.listen(0, async () => { const url = new URL('http://localhost') url.port = server.address().port - const client = undici(url) + const client = new Pool(url) t.tearDown(client.destroy.bind(client)) client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { @@ -152,7 +151,7 @@ test('basic get error async/await', (t) => { t.tearDown(server.close.bind(server)) server.listen(0, async () => { - const client = undici(`http://localhost:${server.address().port}`) + const client = new Pool(`http://localhost:${server.address().port}`) t.tearDown(client.destroy.bind(client)) await client.request({ path: '/', method: 'GET' }) @@ -221,7 +220,7 @@ test('stream get error async/await', (t) => { t.tearDown(server.close.bind(server)) server.listen(0, async () => { - const client = undici(`http://localhost:${server.address().port}`) + const client = new Pool(`http://localhost:${server.address().port}`) t.tearDown(client.destroy.bind(client)) await client.stream({ path: '/', method: 'GET' }, () => { @@ -245,7 +244,7 @@ test('pipeline get', (t) => { t.tearDown(server.close.bind(server)) server.listen(0, async () => { - const client = undici(`http://localhost:${server.address().port}`) + const client = new Pool(`http://localhost:${server.address().port}`) t.tearDown(client.destroy.bind(client)) const bufs = [] @@ -268,48 +267,46 @@ test('backpressure algorithm', (t) => { const seen = [] let total = 0 + let writeMore = true + class FakeClient extends EventEmitter { constructor () { super() this.id = total++ - this._busy = false - } - - get busy () { - return this._busy - } - - get connected () { - return true } - dispatch (req, cb) { - seen.push({ req, cb, client: this, id: this.id }) + dispatch (req, handler) { + seen.push({ req, client: this, id: this.id }) + return writeMore } } - const Pool = proxyquire('../lib/client-pool', { - './core/client': FakeClient + const Pool = proxyquire('../lib/pool', { + './client': FakeClient }) - const noopHandler = {} + const noopHandler = { + onError (err) { + throw err + } + } - const pool = new Pool('http://notanhost') + const pool = new Pool('http://notahost') pool.dispatch({}, noopHandler) pool.dispatch({}, noopHandler) const d1 = seen.shift() // d1 = c0 t.strictEqual(d1.id, 0) - const d2 = seen.shift() // d1 = c0 - t.strictEqual(d1.id, 0) + const d2 = seen.shift() // d2 = c0 + t.strictEqual(d2.id, 0) t.strictEqual(d1.id, d2.id) - pool.dispatch({}, noopHandler) // d3 = c0 + writeMore = false - d1.client._busy = true + pool.dispatch({}, noopHandler) // d3 = c0 pool.dispatch({}, noopHandler) // d4 = c1 @@ -321,9 +318,13 @@ test('backpressure algorithm', (t) => { t.strictEqual(d3.id, d2.id) t.notStrictEqual(d3.id, d4.id) + writeMore = true + + d4.client.emit('drain', new URL('http://notahost')) + pool.dispatch({}, noopHandler) // d5 = c1 - d1.client._busy = false + d3.client.emit('drain', new URL('http://notahost')) pool.dispatch({}, noopHandler) // d6 = c0 @@ -352,7 +353,7 @@ test('busy', (t) => { t.tearDown(server.close.bind(server)) server.listen(0, async () => { - const client = undici(`http://localhost:${server.address().port}`, { + const client = new Pool(`http://localhost:${server.address().port}`, { connections: 2, pipelining: 2 }) diff --git a/test/redirect-request.js b/test/redirect-request.js index c7f5b8180e3..7b84f5888eb 100644 --- a/test/redirect-request.js +++ b/test/redirect-request.js @@ -2,7 +2,7 @@ const t = require('tap') const { request } = require('..') -const RedirectHandler = require('../lib/agent/redirect') +const RedirectHandler = require('../lib/handler/redirect') const { InvalidArgumentError } = require('../lib/core/errors') const { nop } = require('../lib/core/util') const { diff --git a/test/utils/esm-wrapper.mjs b/test/utils/esm-wrapper.mjs index b18f9890566..67e9f8e97d1 100644 --- a/test/utils/esm-wrapper.mjs +++ b/test/utils/esm-wrapper.mjs @@ -1,6 +1,6 @@ import { createServer } from 'http' import tap from 'tap' -import { Agent, Client, errors, pipeline, Pool, request, setGlobalAgent, stream } from '../../index.js' +import { Agent, Client, errors, pipeline, Pool, request, setGlobalDispatcher, stream } from '../../index.js' const { test } = tap @@ -82,6 +82,6 @@ test('name dexports', (t) => { t.is(typeof request, 'function') t.is(typeof stream, 'function') t.is(typeof pipeline, 'function') - t.is(typeof setGlobalAgent, 'function') + t.is(typeof setGlobalDispatcher, 'function') t.end() })