Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Apr 3, 2024
1 parent 9ec58f6 commit dc978ac
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 198 deletions.
5 changes: 4 additions & 1 deletion lib/api/abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ function abort (self) {
} else if (self.controller?.abort) {
self.controller.abort(self[kSignal]?.reason)
} else {
self.onError(self[kSignal]?.reason ?? new RequestAbortedError())
self.reason = self[kSignal]?.reason ?? new RequestAbortedError()
}
removeSignal(self)
}

function addSignal (self, signal) {
self.reason = null

self[kSignal] = null
self[kListener] = null

Expand Down
16 changes: 10 additions & 6 deletions lib/api/api-connect.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
'use strict'

const assert = require('node:assert')
const { AsyncResource } = require('node:async_hooks')
const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const { InvalidArgumentError, SocketError } = require('../core/errors')
const util = require('../core/util')
const { addSignal, removeSignal } = require('./abort-signal')

Expand All @@ -26,17 +27,20 @@ class ConnectHandler extends AsyncResource {
this.opaque = opaque || null
this.responseHeaders = responseHeaders || null
this.callback = callback
this.abort = null
this.controller = null

addSignal(this, signal)
}

onConnect (abort, context) {
if (!this.callback) {
throw new RequestAbortedError()
onConnect (controller, context) {
if (this.reason) {
controller.abort(this.reason)
return
}

this.abort = abort
assert(this.callback)

this.controller = controller
this.context = context
}

Expand Down
22 changes: 12 additions & 10 deletions lib/api/api-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class PipelineHandler extends AsyncResource {
this.opaque = opaque || null
this.responseHeaders = responseHeaders || null
this.handler = handler
this.abort = null
this.controller = null
this.context = null
this.onInfo = onInfo || null

Expand All @@ -114,14 +114,14 @@ class PipelineHandler extends AsyncResource {
}
},
destroy: (err, callback) => {
const { body, req, res, ret, abort } = this
const { body, req, res, ret, controller } = this

if (!err && !ret._readableState.endEmitted) {
err = new RequestAbortedError()
}

if (abort && err) {
abort()
if (controller && err) {
controller.abort()
}

util.destroy(body, err)
Expand All @@ -144,16 +144,18 @@ class PipelineHandler extends AsyncResource {
addSignal(this, signal)
}

onConnect (abort, context) {
onConnect (controller, context) {
const { ret, res } = this

assert(!res, 'pipeline cannot be retried')

if (ret.destroyed) {
throw new RequestAbortedError()
if (this.reason) {
controller.abort(this.reason)
return
}

this.abort = abort
assert(!res, 'pipeline cannot be retried')
assert(!ret.destroyed)

this.controller = controller
this.context = context
}

Expand Down
17 changes: 9 additions & 8 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
'use strict'

const assert = require('node:assert')
const { Readable } = require('./readable')
const {
InvalidArgumentError,
RequestAbortedError
} = require('../core/errors')
const { InvalidArgumentError } = require('../core/errors')
const util = require('../core/util')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('node:async_hooks')
Expand Down Expand Up @@ -51,6 +49,7 @@ class RequestHandler extends AsyncResource {
this.opaque = opaque || null
this.callback = callback
this.res = null
this.controller = null
this.body = body
this.trailers = {}
this.context = null
Expand All @@ -68,10 +67,13 @@ class RequestHandler extends AsyncResource {
}

onConnect (controller, context) {
if (!this.callback) {
throw new RequestAbortedError()
if (this.reason) {
controller.abort(this.reason)
return
}

assert(this.callback)

this.controller = controller
this.context = context
}
Expand All @@ -92,8 +94,7 @@ class RequestHandler extends AsyncResource {
const contentType = parsedHeaders['content-type']
const contentLength = parsedHeaders['content-length']
const body = new Readable({
resume: this.controller?.resume ?? resume,
abort: this.controller?.abort ?? this.controller,
controller: this.controller,
contentType,
contentLength,
highWaterMark
Expand Down
21 changes: 12 additions & 9 deletions lib/api/api-stream.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
'use strict'

const assert = require('node:assert')
const { finished, PassThrough } = require('node:stream')
const {
InvalidArgumentError,
InvalidReturnValueError,
RequestAbortedError
InvalidReturnValueError
} = require('../core/errors')
const util = require('../core/util')
const { getResolveErrorBodyCallback } = require('./util')
Expand Down Expand Up @@ -53,7 +53,7 @@ class StreamHandler extends AsyncResource {
this.factory = factory
this.callback = callback
this.res = null
this.abort = null
this.controller = null
this.context = null
this.trailers = null
this.body = body
Expand All @@ -69,12 +69,15 @@ class StreamHandler extends AsyncResource {
addSignal(this, signal)
}

onConnect (abort, context) {
if (!this.callback) {
throw new RequestAbortedError()
onConnect (controller, context) {
if (this.reason) {
controller.abort(this.reason)
return
}

this.abort = abort
assert(this.callback)

this.controller = controller
this.context = context
}

Expand Down Expand Up @@ -126,7 +129,7 @@ class StreamHandler extends AsyncResource {

// TODO: Avoid finished. It registers an unnecessary amount of listeners.
finished(res, { readable: false }, (err) => {
const { callback, res, opaque, trailers, abort } = this
const { callback, res, opaque, trailers, controller } = this

this.res = null
if (err || !res.readable) {
Expand All @@ -137,7 +140,7 @@ class StreamHandler extends AsyncResource {
this.runInAsyncScope(callback, null, err || null, { opaque, trailers })

if (err) {
abort()
controller.abort()
}
})
}
Expand Down
15 changes: 9 additions & 6 deletions lib/api/api-upgrade.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const { InvalidArgumentError, SocketError } = require('../core/errors')
const { AsyncResource } = require('node:async_hooks')
const util = require('../core/util')
const { addSignal, removeSignal } = require('./abort-signal')
Expand All @@ -27,18 +27,21 @@ class UpgradeHandler extends AsyncResource {
this.responseHeaders = responseHeaders || null
this.opaque = opaque || null
this.callback = callback
this.abort = null
this.controller = null
this.context = null

addSignal(this, signal)
}

onConnect (abort, context) {
if (!this.callback) {
throw new RequestAbortedError()
onConnect (controller, context) {
if (this.reason) {
controller.abort(this.reason)
return
}

this.abort = abort
assert(this.callback)

this.controller = controller
this.context = null
}

Expand Down
21 changes: 11 additions & 10 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,26 @@ const { ReadableStreamFrom } = require('../core/util')
const kConsume = Symbol('kConsume')
const kReading = Symbol('kReading')
const kBody = Symbol('kBody')
const kAbort = Symbol('kAbort')
const kController = Symbol('kController')
const kContentType = Symbol('kContentType')
const kContentLength = Symbol('kContentLength')

const noop = () => {}

class BodyReadable extends Readable {
constructor ({
resume,
abort,
controller,
contentType = '',
contentLength,
highWaterMark = 64 * 1024 // Same as nodejs fs streams.
}) {
super({
autoDestroy: true,
read: resume,
highWaterMark
})
assert(controller)

super({ autoDestroy: true, highWaterMark })

this._readableState.dataEmitted = false

this[kAbort] = abort
this[kController] = controller
this[kConsume] = null
this[kBody] = null
this[kContentType] = contentType
Expand All @@ -52,12 +49,16 @@ class BodyReadable extends Readable {
}

if (err) {
this[kAbort]()
this[kController].abort(err)
}

return super.destroy(err)
}

_read () {
this[kController].resume()
}

_destroy (err, callback) {
// Workaround for Node "bug". If the stream is destroyed in same
// tick as it is created, then a user who is waiting for a
Expand Down
Loading

0 comments on commit dc978ac

Please sign in to comment.