Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Apr 3, 2024
1 parent c4b1630 commit 6fcd8f4
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 31 deletions.
2 changes: 2 additions & 0 deletions lib/api/abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const kSignal = Symbol('kSignal')
function abort (self) {
if (self.abort) {
self.abort(self[kSignal]?.reason)
} else if (self.controller?.abort) {
self.controller.abort(self[kSignal]?.reason)
} else {
self.onError(self[kSignal]?.reason ?? new RequestAbortedError())
}
Expand Down
19 changes: 10 additions & 9 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class RequestHandler extends AsyncResource {
this.opaque = opaque || null
this.callback = callback
this.res = null
this.abort = null
this.body = body
this.trailers = {}
this.context = null
Expand All @@ -68,21 +67,17 @@ class RequestHandler extends AsyncResource {
addSignal(this, signal)
}

onConnect (abort, context) {
onConnect (controller, context) {
if (!this.callback) {
throw new RequestAbortedError()
}

this.abort = abort
this.context = context
}

onStart (controller) {
this.controller = controller
this.context = context
}

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this
const { callback, opaque, context, responseHeaders, highWaterMark } = this

const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)

Expand All @@ -96,7 +91,13 @@ class RequestHandler extends AsyncResource {
const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
const contentType = parsedHeaders['content-type']
const contentLength = parsedHeaders['content-length']
const body = new Readable({ resume: this.controller?.resume ?? resume, abort, contentType, contentLength, highWaterMark })
const body = new Readable({
resume: this.controller?.resume ?? resume,
abort: this.controller?.abort ?? this.controller,
contentType,
contentLength,
highWaterMark
})

this.callback = null
this.res = body
Expand Down
27 changes: 23 additions & 4 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,31 @@ class Request {
abort(this.error)
} else {
this.abort = abort
return this[kHandler].onConnect(abort)

// Hack for compatibility with the old API.
abort.paused = false
abort.aborted = false
abort.internal = null
abort.abort = function (...args) {
abort.aborted = true
abort(...args)
}
abort.pause = function () {
abort.paused = true
abort.internal?.pause()
}
abort.resume = function () {
abort.paused = false
abort.internal?.resume()
}

this.controller = abort

return this[kHandler].onConnect(this.controller)
}
}

onStart (controller) {
assert(controller)
onStart () {
assert(!this.aborted)
assert(!this.completed)

Expand All @@ -242,7 +261,7 @@ class Request {
}

if (this[kHandler].onStart) {
pause ||= this[kHandler].onStart(controller) === false
pause ||= this[kHandler].onStart() === false
}

return !pause
Expand Down
7 changes: 6 additions & 1 deletion lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,12 @@ class Parser {
return -1
}

if (request.onStart(this.controller) === false) {
request.controller.internal = this.controller
if (request.controller.paused) {
request.controller.internal.pause()
}

if (request.onStart() === false) {
this.controller.pause()
}

Expand Down
7 changes: 6 additions & 1 deletion lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,12 @@ function writeH2 (client, request) {

const resume = stream.resume.bind(stream)

if (request.onStart(new Controller(stream)) === false) {
request.controller.internal = new Controller(stream)
if (request.controller.paused) {
request.controller.internal.pause()
}

if (request.onStart() === false) {
stream.pause()
}

Expand Down
62 changes: 46 additions & 16 deletions lib/mock/mock-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,31 +285,61 @@ function mockDispatch (opts, handler) {
const responseHeaders = generateKeyValues(headers)
const responseTrailers = generateKeyValues(trailers)

class Controller extends Function {
#paused = false
#aborted = false

constructor () {
super()
this.resume = this.resume.bind(this)
this.pause = this.pause.bind(this)
}

resume () {
this.#paused = false
}

pause () {
this.#paused = true
}

abort () {
this.#aborted = true
}

get paused () {
return this.#paused
}

get aborted () {
return this.#aborted
}
}

const controller = new Controller()
handler.abort = nop
handler.onStart?.(controller)
handler.onHeaders(statusCode, responseHeaders, controller.resume, getStatusText(statusCode))
handler.onData(Buffer.from(responseData))
handler.onComplete(responseTrailers)
deleteMockDispatch(mockDispatches, key)
}

class Controller {
#paused = false
if (!controller.aborted) {
handler.onConnect?.(controller)
}

if (!controller.aborted) {
handler.onStart?.()
}

resume () {
this.#paused = false
this.resume = this.resume.bind(this)
this.pause = this.pause.bind(this)
if (!controller.aborted) {
handler.onHeaders(statusCode, responseHeaders, controller.resume, getStatusText(statusCode))
}

pause () {
this.#paused = true
if (!controller.aborted) {
handler.onData(Buffer.from(responseData))
}

get paused () {
return this.#paused
if (!controller.aborted) {
handler.onComplete(responseTrailers)
}

deleteMockDispatch(mockDispatches, key)
}

return true
Expand Down

0 comments on commit 6fcd8f4

Please sign in to comment.