Skip to content

Commit

Permalink
feat: new hooks API
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Apr 4, 2024
1 parent cae5625 commit 16dea7e
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 17 deletions.
101 changes: 84 additions & 17 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

const {
InvalidArgumentError,
NotSupportedError
NotSupportedError,
AbortError
} = require('./errors')
const assert = require('node:assert')
const { parseHeaders } = require('./util')
const {
isValidHTTPToken,
isValidHeaderChar,
Expand All @@ -25,6 +27,45 @@ const { headerNameLowerCasedRecord } = require('./constants')
const invalidPathRegex = /[^\u0021-\u00ff]/

const kHandler = Symbol('handler')
const kResume = Symbol('resume')
const kAbort = Symbol('abort')

class Controller {
#paused = false
#reason = null
#abort = null

constructor (abort) {
this.#abort = abort
this[kResume] = null
}

pause () {
this.#paused = true
}

resume () {
this.#paused = false
this[kResume]?.()
}

abort (reason) {
this.#reason = reason ?? new AbortError()
this[kAbort]?.(this.#reason)
}

get paused () {
return this.#paused
}

get aborted () {
return this.#reason !== null
}

get reason () {
return this.#reason
}
}

class Request {
constructor (origin, {
Expand Down Expand Up @@ -91,6 +132,8 @@ class Request {

this.abort = null

this.controller = new Controller()

if (body == null) {
this.body = null
} else if (isStream(body)) {
Expand Down Expand Up @@ -192,12 +235,11 @@ class Request {
}

onBodySent (chunk) {
if (this[kHandler].onBodySent) {
try {
return this[kHandler].onBodySent(chunk)
} catch (err) {
this.abort(err)
}
try {
this[kHandler].onRequestData?.(chunk)
this[kHandler].onBodySent?.(chunk)
} catch (err) {
this.abort(err)
}
}

Expand All @@ -206,19 +248,20 @@ class Request {
channels.bodySent.publish({ request: this })
}

if (this[kHandler].onRequestSent) {
try {
return this[kHandler].onRequestSent()
} catch (err) {
this.abort(err)
}
try {
this[kHandler].onRequestEnd?.()
this[kHandler].onRequestSent?.()
} catch (err) {
this.abort(err)
}
}

onConnect (abort) {
assert(!this.aborted)
assert(!this.completed)

this.controller[kAbort] = abort

if (this.error) {
abort(this.error)
} else {
Expand All @@ -228,7 +271,12 @@ class Request {
}

onResponseStarted () {
return this[kHandler].onResponseStarted?.()
try {
this[kHandler].onResponseStart?.(this.controller)
this[kHandler].onResponseStarted?.()
} catch (err) {
this.abort(err)
}
}

onHeaders (statusCode, headers, resume, statusText) {
Expand All @@ -239,8 +287,18 @@ class Request {
channels.headers.publish({ request: this, response: { statusCode, headers, statusText } })
}

this.controller[kResume] = resume

try {
return this[kHandler].onHeaders(statusCode, headers, resume, statusText)
// TODO (fix): what if this.controller.paused?

this[kHandler].onResponseHeaders?.(parseHeaders(headers), statusCode, statusText)

if (this[kHandler].onHeaders?.(statusCode, headers, () => this.controller.resume(), statusText) === false) {
this.controller.pause()
}

return !this.controller.paused
} catch (err) {
this.abort(err)
}
Expand All @@ -251,7 +309,13 @@ class Request {
assert(!this.completed)

try {
return this[kHandler].onData(chunk)
this[kHandler].onResponseData?.(chunk)

if (this[kHandler].onData?.(chunk) === false) {
this.controller.pause()
}

return !this.controller.paused
} catch (err) {
this.abort(err)
return false
Expand All @@ -276,7 +340,10 @@ class Request {
}

try {
return this[kHandler].onComplete(trailers)
this[kHandler].onResponseTrailers?.(parseHeaders(trailers))
this[kHandler].onResponseEnd?.()

this[kHandler].onComplete(trailers)
} catch (err) {
// TODO (fix): This might be a bad idea?
this.onError(err)
Expand Down
31 changes: 31 additions & 0 deletions types/dispatcher.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ declare namespace Dispatcher {
context: object;
}
export type StreamFactory = (data: StreamFactoryData) => Writable;
export interface Controller {
readonly aborted: boolean;
readonly reason: Error | null;
readonly paused: boolean;

pause(): void;
resume(): void;
abort(reason: Error): void;
}
export interface DispatchHandlers {
/** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */
onConnect?(abort: () => void): void;
Expand All @@ -232,6 +241,28 @@ declare namespace Dispatcher {
onComplete?(trailers: string[] | null): void;
/** Invoked when a body chunk is sent to the server. May be invoked multiple times for chunked requests */
onBodySent?(chunkSize: number, totalBytesSent: number): void;

// New API

/** Invoked after request is starting to be processed */
onRequestStart?(/* controller: Controller */): void;
/** Invoked after headers data is sent */
// onRequestHeaders?(headers: Record<string, string>): void;
/** Invoked after payload data is sent. */
onRequestData?(chunk: Buffer | string): void;
/** Invoked after request has finished sending */
onRequestEnd?(): void;

/** Invoked after response is starting to be processed */
onResponseStart?(controller: Controller): void;
/** Invoked after headers data has been received */
onResponseHeaders?(headers: Record<string, string>, statusCode: number, statusText?: string): void;
/** Invoked after response payload data is received. */
onResponseData?(chunk: Buffer | string): void;
/** Invoked after trailers data has been received */
onResponseTrailers?(trailers: Record<string, string>): void;
/** Invoked after response has finished */
onResponseEnd?(): void;
}
export type PipelineHandler = (data: PipelineHandlerData) => Readable;
export type HttpMethod = 'GET' | 'HEAD' | 'POST' | 'PUT' | 'DELETE' | 'CONNECT' | 'OPTIONS' | 'TRACE' | 'PATCH';
Expand Down

0 comments on commit 16dea7e

Please sign in to comment.