From baf26a27a985922265b3d210100df4d4380e1878 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 4 Apr 2024 15:58:39 +0100 Subject: [PATCH] feat: new hooks API --- lib/core/request.js | 104 ++++++++++++++++++++++++++++++------ lib/dispatcher/client-h1.js | 5 +- types/dispatcher.d.ts | 31 +++++++++++ 3 files changed, 122 insertions(+), 18 deletions(-) diff --git a/lib/core/request.js b/lib/core/request.js index 37839d3c949..043e58ac081 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -2,9 +2,11 @@ const { InvalidArgumentError, - NotSupportedError + NotSupportedError, + AbortError } = require('./errors') const assert = require('node:assert') +const { parseHeaders } = require('./util') const { isValidHTTPToken, isValidHeaderChar, @@ -25,6 +27,45 @@ const { headerNameLowerCasedRecord } = require('./constants') const invalidPathRegex = /[^\u0021-\u00ff]/ const kHandler = Symbol('handler') +const kResume = Symbol('resume') +const kAbort = Symbol('abort') + +class Controller { + #paused = false + #reason = null + #abort = null + + constructor (abort) { + this.#abort = abort + this[kResume] = null + } + + pause () { + this.#paused = true + } + + resume () { + this.#paused = false + this[kResume]?.() + } + + abort (reason) { + this.#reason = reason ?? new AbortError() + this[kAbort]?.(this.#reason) + } + + get paused () { + return this.#paused + } + + get aborted () { + return this.#reason !== null + } + + get reason () { + return this.#reason + } +} class Request { constructor (origin, { @@ -91,6 +132,8 @@ class Request { this.abort = null + this.controller = new Controller() + if (body == null) { this.body = null } else if (isStream(body)) { @@ -192,12 +235,11 @@ class Request { } onBodySent (chunk) { - if (this[kHandler].onBodySent) { - try { - return this[kHandler].onBodySent(chunk) - } catch (err) { - this.abort(err) - } + try { + this[kHandler].onRequestData?.(chunk) + this[kHandler].onBodySent?.(chunk) + } catch (err) { + this.abort(err) } } @@ -206,12 +248,11 @@ class Request { channels.bodySent.publish({ request: this }) } - if (this[kHandler].onRequestSent) { - try { - return this[kHandler].onRequestSent() - } catch (err) { - this.abort(err) - } + try { + this[kHandler].onRequestEnd?.() + this[kHandler].onRequestSent?.() + } catch (err) { + this.abort(err) } } @@ -219,6 +260,8 @@ class Request { assert(!this.aborted) assert(!this.completed) + this.controller[kAbort] = abort + if (this.error) { abort(this.error) } else { @@ -228,7 +271,17 @@ class Request { } onResponseStarted () { - return this[kHandler].onResponseStarted?.() + try { + this[kHandler].onResponseStart?.(this.controller) + + if (this[kHandler].onResponseStarted?.() === false) { + this.controller.pause() + } + + return !this.controller.paused + } catch (err) { + this.abort(err) + } } onHeaders (statusCode, headers, resume, statusText) { @@ -239,8 +292,16 @@ class Request { channels.headers.publish({ request: this, response: { statusCode, headers, statusText } }) } + this.controller[kResume] = resume + try { - return this[kHandler].onHeaders(statusCode, headers, resume, statusText) + this[kHandler].onResponseHeaders?.(parseHeaders(headers), statusCode, statusText) + + if (this[kHandler].onHeaders?.(statusCode, headers, () => this.controller.resume(), statusText) === false) { + this.controller.pause() + } + + return !this.controller.paused } catch (err) { this.abort(err) } @@ -251,7 +312,13 @@ class Request { assert(!this.completed) try { - return this[kHandler].onData(chunk) + this[kHandler].onResponseData?.(chunk) + + if (this[kHandler].onData?.(chunk) === false) { + this.controller.pause() + } + + return !this.controller.paused } catch (err) { this.abort(err) return false @@ -276,7 +343,10 @@ class Request { } try { - return this[kHandler].onComplete(trailers) + this[kHandler].onResponseTrailers?.(parseHeaders(trailers)) + this[kHandler].onResponseEnd?.() + + this[kHandler].onComplete(trailers) } catch (err) { // TODO (fix): This might be a bad idea? this.onError(err) diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index 62a3e29ef24..eb64cf20b20 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -325,7 +325,10 @@ class Parser { if (!request) { return -1 } - request.onResponseStarted() + + const pause = request.onResponseStarted() === false + + return pause ? constants.ERROR.PAUSED : 0 } onHeaderField (buf) { diff --git a/types/dispatcher.d.ts b/types/dispatcher.d.ts index 21f9b456aa8..b369edf1c3b 100644 --- a/types/dispatcher.d.ts +++ b/types/dispatcher.d.ts @@ -215,6 +215,15 @@ declare namespace Dispatcher { context: object; } export type StreamFactory = (data: StreamFactoryData) => Writable; + export interface Controller { + readonly aborted: boolean; + readonly reason: Error | null; + readonly paused: boolean; + + pause(): void; + resume(): void; + abort(reason: Error): void; + } export interface DispatchHandlers { /** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */ onConnect?(abort: () => void): void; @@ -232,6 +241,28 @@ declare namespace Dispatcher { onComplete?(trailers: string[] | null): void; /** Invoked when a body chunk is sent to the server. May be invoked multiple times for chunked requests */ onBodySent?(chunkSize: number, totalBytesSent: number): void; + + // New API + + /** Invoked after request is starting to be processed */ + onRequestStart?(/* controller: Controller */): void; + /** Invoked after headers data is sent */ + // onRequestHeaders?(headers: Record): void; + /** Invoked after payload data is sent. */ + onRequestData?(chunk: Buffer | string): void; + /** Invoked after request has finished sending */ + onRequestEnd?(): void; + + /** Invoked after response is starting to be processed */ + onResponseStart?(controller: Controller): void; + /** Invoked after headers data has been received */ + onResponseHeaders?(headers: Record, statusCode: number, statusText?: string): void; + /** Invoked after response payload data is received. */ + onResponseData?(chunk: Buffer | string): void; + /** Invoked after trailers data has been received */ + onResponseTrailers?(trailers: Record): void; + /** Invoked after response has finished */ + onResponseEnd?(): void; } export type PipelineHandler = (data: PipelineHandlerData) => Readable; export type HttpMethod = 'GET' | 'HEAD' | 'POST' | 'PUT' | 'DELETE' | 'CONNECT' | 'OPTIONS' | 'TRACE' | 'PATCH';