Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Apr 3, 2024
1 parent 1be5231 commit c4b1630
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 29 deletions.
12 changes: 9 additions & 3 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class RequestHandler extends AsyncResource {
this.context = context
}

onStart (controller) {
this.controller = controller
}

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this

Expand All @@ -92,7 +96,7 @@ class RequestHandler extends AsyncResource {
const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
const contentType = parsedHeaders['content-type']
const contentLength = parsedHeaders['content-length']
const body = new Readable({ resume, abort, contentType, contentLength, highWaterMark })
const body = new Readable({ resume: this.controller?.resume ?? resume, abort, contentType, contentLength, highWaterMark })

this.callback = null
this.res = body
Expand All @@ -115,8 +119,10 @@ class RequestHandler extends AsyncResource {
}

onData (chunk) {
const { res } = this
return res.push(chunk)
const { res, controller } = this
if (!res.push(chunk)) {
controller.pause()
}
}

onComplete (trailers) {
Expand Down
9 changes: 7 additions & 2 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ class Request {
}

onConnect (abort) {
assert(abort)
assert(!this.aborted)
assert(!this.completed)

Expand All @@ -227,7 +228,11 @@ class Request {
}
}

onStart (resume) {
onStart (controller) {
assert(controller)
assert(!this.aborted)
assert(!this.completed)

try {
let pause = false

Expand All @@ -237,7 +242,7 @@ class Request {
}

if (this[kHandler].onStart) {
pause ||= this[kHandler].onStart(resume) === false
pause ||= this[kHandler].onStart(controller) === false
}

return !pause
Expand Down
42 changes: 38 additions & 4 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,30 @@ const TIMEOUT_HEADERS = 1
const TIMEOUT_BODY = 2
const TIMEOUT_IDLE = 3

class Controller {
#parser
#paused = false

constructor (parser) {
this.#parser = parser
this.resume = this.resume.bind(this)
this.pause = this.pause.bind(this)
}

pause () {
this.#paused = true
}

resume () {
this.#paused = false
this.#parser.resume()
}

get paused () {
return this.#paused
}
}

class Parser {
constructor (client, socket, { exports }) {
assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0)
Expand All @@ -169,6 +193,8 @@ class Parser {
this.paused = false
this.resume = this.resume.bind(this)

this.controller = new Controller(this)

this.bytesRead = 0

this.keepAlive = ''
Expand Down Expand Up @@ -326,7 +352,11 @@ class Parser {
return -1
}

return request.onStart(this.resume) === false ? constants.ERROR.PAUSED : 0
if (request.onStart(this.controller) === false) {
this.controller.pause()
}

return this.controller.paused ? constants.ERROR.PAUSED : 0
}

onHeaderField (buf) {
Expand Down Expand Up @@ -505,7 +535,9 @@ class Parser {
socket[kReset] = true
}

const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
if (request.onHeaders(statusCode, headers, this.controller.resume, statusText) === false) {
this.controller.pause()
}

if (request.aborted) {
return -1
Expand All @@ -524,7 +556,7 @@ class Parser {
client[kResume]()
}

return pause ? constants.ERROR.PAUSED : 0
return this.controller.paused ? constants.ERROR.PAUSED : 0
}

onBody (buf) {
Expand Down Expand Up @@ -555,8 +587,10 @@ class Parser {
this.bytesRead += buf.length

if (request.onData(buf) === false) {
return constants.ERROR.PAUSED
this.controller.pause()
}

return this.controller.paused ? constants.ERROR.PAUSED : 0
}

onMessageComplete () {
Expand Down
27 changes: 26 additions & 1 deletion lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,31 @@ try {
http2 = { constants: {} }
}

class Controller {
#stream
#paused = false

constructor (stream) {
this.#stream = stream
this.resume = this.resume.bind(this)
this.pause = this.pause.bind(this)
}

pause () {
this.#paused = true
this.#stream.pause()
}

resume () {
this.#paused = false
this.#stream.resume()
}

get paused () {
return this.#paused
}
}

const {
constants: {
HTTP2_HEADER_AUTHORITY,
Expand Down Expand Up @@ -392,7 +417,7 @@ function writeH2 (client, request) {

const resume = stream.resume.bind(stream)

if (request.onStart(resume) === false) {
if (request.onStart(new Controller(stream)) === false) {
stream.pause()
}

Expand Down
18 changes: 11 additions & 7 deletions lib/handler/decorator-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,34 @@ module.exports = class DecoratorHandler {
}

onConnect (...args) {
return this.handler.onConnect(...args)
return this.handler.onConnect?.(...args)
}

onStart (...args) {
return this.handler.onStart?.(...args)
}

onError (...args) {
return this.handler.onError(...args)
return this.handler.onError?.(...args)
}

onUpgrade (...args) {
return this.handler.onUpgrade(...args)
return this.handler.onUpgrade?.(...args)
}

onHeaders (...args) {
return this.handler.onHeaders(...args)
return this.handler.onHeaders?.(...args)
}

onData (...args) {
return this.handler.onData(...args)
return this.handler.onData?.(...args)
}

onComplete (...args) {
return this.handler.onComplete(...args)
return this.handler.onComplete?.(...args)
}

onBodySent (...args) {
return this.handler.onBodySent(...args)
return this.handler.onBodySent?.(...args)
}
}
16 changes: 10 additions & 6 deletions lib/handler/redirect-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,19 @@ class RedirectHandler {

onConnect (abort) {
this.abort = abort
this.handler.onConnect(abort, { history: this.history })
this.handler.onConnect?.(abort, { history: this.history })
}

onStart (controller) {
this.handler.onStart?.(controller)
}

onUpgrade (statusCode, headers, socket) {
this.handler.onUpgrade(statusCode, headers, socket)
this.handler.onUpgrade?.(statusCode, headers, socket)
}

onError (error) {
this.handler.onError(error)
this.handler.onError?.(error)
}

onHeaders (statusCode, headers, resume, statusText) {
Expand All @@ -107,7 +111,7 @@ class RedirectHandler {
}

if (!this.location) {
return this.handler.onHeaders(statusCode, headers, resume, statusText)
return this.handler.onHeaders?.(statusCode, headers, resume, statusText)
}

const { origin, pathname, search } = util.parseURL(new URL(this.location, this.opts.origin && new URL(this.opts.path, this.opts.origin)))
Expand Down Expand Up @@ -150,7 +154,7 @@ class RedirectHandler {
servers and browsers implementors, we ignore the body as there is no specified way to eventually parse it.
*/
} else {
return this.handler.onData(chunk)
return this.handler.onData?.(chunk)
}
}

Expand All @@ -170,7 +174,7 @@ class RedirectHandler {

this.dispatch(this.opts, this)
} else {
this.handler.onComplete(trailers)
this.handler.onComplete?.(trailers)
}
}

Expand Down
4 changes: 4 additions & 0 deletions lib/handler/retry-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ class RetryHandler {
}
}

onStart (controller) {
this.handler.onStart?.(controller)
}

onBodySent (chunk) {
if (this.handler.onBodySent) return this.handler.onBodySent(chunk)
}
Expand Down
22 changes: 20 additions & 2 deletions lib/mock/mock-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,32 @@ function mockDispatch (opts, handler) {
const responseHeaders = generateKeyValues(headers)
const responseTrailers = generateKeyValues(trailers)

const controller = new Controller()
handler.abort = nop
handler.onHeaders(statusCode, responseHeaders, resume, getStatusText(statusCode))
handler.onStart?.(controller)
handler.onHeaders(statusCode, responseHeaders, controller.resume, getStatusText(statusCode))
handler.onData(Buffer.from(responseData))
handler.onComplete(responseTrailers)
deleteMockDispatch(mockDispatches, key)
}

function resume () {}
class Controller {
#paused = false

resume () {
this.#paused = false
this.resume = this.resume.bind(this)
this.pause = this.pause.bind(this)
}

pause () {
this.#paused = true
}

get paused () {
return this.#paused
}
}

return true
}
Expand Down
10 changes: 6 additions & 4 deletions test/node-test/client-dispatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -881,8 +881,9 @@ test('dispatches in expected order', async (t) => {
onBodySent () {
dispatches.push('onBodySent')
},
onStart (resume) {
assert(typeof resume === 'function')
onStart (controller) {
assert(typeof controller.resume === 'function')
assert(typeof controller.pause === 'function')
dispatches.push('onStart')
},
onResponseStarted () {
Expand Down Expand Up @@ -942,8 +943,9 @@ test('dispatches in expected order for http2', async (t) => {
onBodySent () {
dispatches.push('onBodySent')
},
onStart (resume) {
assert(typeof resume === 'function')
onStart (controller) {
assert(typeof controller.resume === 'function')
assert(typeof controller.pause === 'function')
dispatches.push('onStart')
},
onResponseStarted () {
Expand Down
9 changes: 9 additions & 0 deletions types/dispatcher.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,18 @@ declare namespace Dispatcher {
context: object;
}
export type StreamFactory = (data: StreamFactoryData) => Writable;

export interface Controller {
pause (): void;
resume (): void;
readonly paused: boolean;
}

export interface DispatchHandlers {
/** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */
onConnect?(abort: () => void): void;
/** */
onStart?(controller: Controller): void;
/** Invoked when an error has occurred. */
onError?(err: Error): void;
/** Invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method. */
Expand Down

0 comments on commit c4b1630

Please sign in to comment.