Skip to content

Commit

Permalink
fixuP
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Apr 3, 2024
1 parent c23cb57 commit 714d50f
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 87 deletions.
8 changes: 4 additions & 4 deletions lib/api/api-connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ class ConnectHandler extends AsyncResource {
this.opaque = opaque || null
this.responseHeaders = responseHeaders || null
this.callback = callback
this.controller = null
this.abort = null

addSignal(this, signal)
}

onConnect (controller, context) {
onConnect (abort, context) {
if (this.reason) {
controller.abort(this.reason)
abort(this.reason)
return
}

assert(this.callback)

this.controller = controller
this.abort = abort
this.context = context
}

Expand Down
14 changes: 7 additions & 7 deletions lib/api/api-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class PipelineHandler extends AsyncResource {
this.opaque = opaque || null
this.responseHeaders = responseHeaders || null
this.handler = handler
this.controller = null
this.abort = null
this.context = null
this.onInfo = onInfo || null

Expand All @@ -114,14 +114,14 @@ class PipelineHandler extends AsyncResource {
}
},
destroy: (err, callback) => {
const { body, req, res, ret, controller } = this
const { body, req, res, ret, abort } = this

if (!err && !ret._readableState.endEmitted) {
err = new RequestAbortedError()
}

if (controller && err) {
controller.abort()
if (abort && err) {
abort()
}

util.destroy(body, err)
Expand All @@ -144,18 +144,18 @@ class PipelineHandler extends AsyncResource {
addSignal(this, signal)
}

onConnect (controller, context) {
onConnect (abort, context) {
const { ret, res } = this

if (this.reason) {
controller.abort(this.reason)
abort(this.reason)
return
}

assert(!res, 'pipeline cannot be retried')
assert(!ret.destroyed)

this.controller = controller
this.abort = abort
this.context = context
}

Expand Down
17 changes: 8 additions & 9 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class RequestHandler extends AsyncResource {
this.opaque = opaque || null
this.callback = callback
this.res = null
this.controller = null
this.abort = null
this.body = body
this.trailers = {}
this.context = null
Expand All @@ -66,15 +66,15 @@ class RequestHandler extends AsyncResource {
addSignal(this, signal)
}

onConnect (controller, context) {
onConnect (abort, context) {
if (this.reason) {
controller.abort(this.reason)
abort(this.reason)
return
}

assert(this.callback)

this.controller = controller
this.abort = abort
this.context = context
}

Expand All @@ -94,7 +94,8 @@ class RequestHandler extends AsyncResource {
const contentType = parsedHeaders['content-type']
const contentLength = parsedHeaders['content-length']
const body = new Readable({
controller: this.controller,
resume,
abort: this.abort,
contentType,
contentLength,
highWaterMark
Expand All @@ -121,10 +122,8 @@ class RequestHandler extends AsyncResource {
}

onData (chunk) {
const { res, controller } = this
if (!res.push(chunk)) {
controller.pause()
}
const { res } = this
return res.push(chunk)
}

onComplete (trailers) {
Expand Down
12 changes: 6 additions & 6 deletions lib/api/api-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class StreamHandler extends AsyncResource {
this.factory = factory
this.callback = callback
this.res = null
this.controller = null
this.abort = null
this.context = null
this.trailers = null
this.body = body
Expand All @@ -69,15 +69,15 @@ class StreamHandler extends AsyncResource {
addSignal(this, signal)
}

onConnect (controller, context) {
onConnect (abort, context) {
if (this.reason) {
controller.abort(this.reason)
abort(this.reason)
return
}

assert(this.callback)

this.controller = controller
this.abort = abort
this.context = context
}

Expand Down Expand Up @@ -129,7 +129,7 @@ class StreamHandler extends AsyncResource {

// TODO: Avoid finished. It registers an unnecessary amount of listeners.
finished(res, { readable: false }, (err) => {
const { callback, res, opaque, trailers, controller } = this
const { callback, res, opaque, trailers, abort } = this

this.res = null
if (err || !res.readable) {
Expand All @@ -140,7 +140,7 @@ class StreamHandler extends AsyncResource {
this.runInAsyncScope(callback, null, err || null, { opaque, trailers })

if (err) {
controller.abort()
abort()
}
})
}
Expand Down
8 changes: 4 additions & 4 deletions lib/api/api-upgrade.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ class UpgradeHandler extends AsyncResource {
this.responseHeaders = responseHeaders || null
this.opaque = opaque || null
this.callback = callback
this.controller = null
this.abort = null
this.context = null

addSignal(this, signal)
}

onConnect (controller, context) {
onConnect (abort, context) {
if (this.reason) {
controller.abort(this.reason)
abort(this.reason)
return
}

assert(this.callback)

this.controller = controller
this.abort = abort
this.context = null
}

Expand Down
21 changes: 10 additions & 11 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,29 @@ const { ReadableStreamFrom } = require('../core/util')
const kConsume = Symbol('kConsume')
const kReading = Symbol('kReading')
const kBody = Symbol('kBody')
const kController = Symbol('kController')
const kAbort = Symbol('kAbort')
const kContentType = Symbol('kContentType')
const kContentLength = Symbol('kContentLength')

const noop = () => {}

class BodyReadable extends Readable {
constructor ({
controller,
resume,
abort,
contentType = '',
contentLength,
highWaterMark = 64 * 1024 // Same as nodejs fs streams.
}) {
assert(controller)

super({ autoDestroy: true, highWaterMark })
super({
autoDestroy: true,
read: resume,
highWaterMark
})

this._readableState.dataEmitted = false

this[kController] = controller
this[kAbort] = abort
this[kConsume] = null
this[kBody] = null
this[kContentType] = contentType
Expand All @@ -49,16 +52,12 @@ class BodyReadable extends Readable {
}

if (err) {
this[kController].abort(err)
this[kAbort]()
}

return super.destroy(err)
}

_read () {
this[kController].resume()
}

_destroy (err, callback) {
// Workaround for Node "bug". If the stream is destroyed in same
// tick as it is created, then a user who is waiting for a
Expand Down
46 changes: 24 additions & 22 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class Request {

this.method = method

this.abort = null
this.controller = null

if (body == null) {
this.body = null
Expand All @@ -105,7 +105,7 @@ class Request {
}

this.errorHandler = err => {
if (this.abort) {
if (this.controller) {
this.controller.abort(err)
} else {
this.error = err
Expand Down Expand Up @@ -223,24 +223,24 @@ class Request {
// TODO(fix): Hack for compatibility with the old API.
// Remove and use proper class with encapsulation in
// semver major.
abort.paused = false
abort.aborted = false
abort._internal = null
abort._abort = abort
abort.abort = function (err) {
this.aborted = true
this._abort(err)
}
abort.pause = function () {
this.paused = true
this._internal?.pause?.()
}
abort.resume = function () {
this.paused = false
this._internal?.resume?.()
this.controller = abort.controller = {
paused: false,
aborted: false,
_internal: null,
_abort: abort,
abort (err) {
this.aborted = true
this._abort(err)
},
pause () {
this.paused = true
this._internal?.pause?.()
},
resume () {
this.paused = false
this._internal?.resume?.()
}
}
abort.controller = abort
const controller = abort

// TODO (fix): This should be enabled. Needs some thinking
// in regards to pipeline retries.
Expand All @@ -250,16 +250,16 @@ class Request {
// }

if (this.error) {
controller.abort(this.error)
abort(this.error)
} else {
this.controller = controller
this[kHandler].onConnect(abort)
}
}

onStart () {
assert(!this.aborted)
assert(!this.completed)
assert(this.controller)

try {
// Compatibility with the old API.
Expand All @@ -273,6 +273,7 @@ class Request {
onHeaders (statusCode, statusText, headers) {
assert(!this.aborted)
assert(!this.completed)
assert(this.controller)

if (channels.headers.hasSubscribers) {
channels.headers.publish({ request: this, response: { statusCode, headers, statusText } })
Expand All @@ -290,6 +291,7 @@ class Request {
onData (chunk) {
assert(!this.aborted)
assert(!this.completed)
assert(this.controller)

try {
if (this[kHandler].onData?.(chunk) === false) {
Expand All @@ -303,6 +305,7 @@ class Request {
onUpgrade (statusCode, headers, socket) {
assert(!this.aborted)
assert(!this.completed)
assert(this.controller)

this[kHandler].onUpgrade(statusCode, headers, socket)
}
Expand All @@ -320,7 +323,6 @@ class Request {
try {
this[kHandler].onComplete(trailers)
} catch (err) {
// TODO (fix): This might be a bad idea?
this.onError(err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/handler/redirect-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ function shouldRemoveHeader (header, removeContent, unknownOrigin) {
if (removeContent && util.headerNameToString(header).startsWith('content-')) {
return true
}
if (unknownOrigin && (header.length === 13 || header.length === 6)) {
if (unknownOrigin && (header.length === 13 || header.length === 6 || header.length === 19)) {
const name = util.headerNameToString(header)
return name === 'authorization' || name === 'cookie'
return name === 'authorization' || name === 'cookie' || name === 'proxy-authorization'
}
return false
}
Expand Down
4 changes: 2 additions & 2 deletions lib/mock/mock-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,15 @@ function mockDispatch (opts, handler) {
handler.abort = nop

if (!controller.aborted) {
handler.onConnect?.(controller)
handler.onConnect?.(err => controller.abort(err))
}

if (!controller.aborted) {
handler.onStart?.()
}

if (!controller.aborted) {
handler.onHeaders(statusCode, responseHeaders, controller.resume, getStatusText(statusCode))
handler.onHeaders(statusCode, responseHeaders, () => controller.resume(), getStatusText(statusCode))
}

if (!controller.aborted) {
Expand Down
Loading

0 comments on commit 714d50f

Please sign in to comment.