Skip to content

Commit

Permalink
fix: remove abort handler on close (#3211)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed May 7, 2024
1 parent 9f26aff commit 8dc6a7e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 22 deletions.
39 changes: 19 additions & 20 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,18 @@ class RequestHandler extends AsyncResource {
this.reason = this.signal.reason ?? new RequestAbortedError()
} else {
this.removeAbortListener = util.addAbortListener(this.signal, () => {
this.removeAbortListener?.()
this.removeAbortListener = null

this.reason = this.signal.reason ?? new RequestAbortedError()
if (this.res) {
util.destroy(this.res, this.reason)
} else if (this.abort) {
this.abort(this.reason)
}

if (this.removeAbortListener) {
this.res?.off('close', this.removeAbortListener)
this.removeAbortListener()
this.removeAbortListener = null
}
})
}
}
Expand Down Expand Up @@ -111,54 +114,44 @@ class RequestHandler extends AsyncResource {
const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
const contentType = parsedHeaders['content-type']
const contentLength = parsedHeaders['content-length']
const body = new Readable({ resume, abort, contentType, contentLength, highWaterMark })
const res = new Readable({ resume, abort, contentType, contentLength, highWaterMark })

if (this.removeAbortListener) {
// TODO (fix): 'close' is sufficient but breaks tests.
body
.on('end', this.removeAbortListener)
.on('error', this.removeAbortListener)
res.on('close', this.removeAbortListener)
}

this.callback = null
this.res = body
this.res = res
if (callback !== null) {
if (this.throwOnError && statusCode >= 400) {
this.runInAsyncScope(getResolveErrorBodyCallback, null,
{ callback, body, contentType, statusCode, statusMessage, headers }
{ callback, body: res, contentType, statusCode, statusMessage, headers }
)
} else {
this.runInAsyncScope(callback, null, null, {
statusCode,
headers,
trailers: this.trailers,
opaque,
body,
body: res,
context
})
}
}
}

onData (chunk) {
const { res } = this
return res.push(chunk)
return this.res.push(chunk)
}

onComplete (trailers) {
const { res } = this

util.parseHeaders(trailers, this.trailers)

res.push(null)
this.res.push(null)
}

onError (err) {
const { res, callback, body, opaque } = this

this.removeAbortListener?.()
this.removeAbortListener = null

if (callback) {
// TODO: Does this need queueMicrotask?
this.callback = null
Expand All @@ -179,6 +172,12 @@ class RequestHandler extends AsyncResource {
this.body = null
util.destroy(body, err)
}

if (this.removeAbortListener) {
res?.off('close', this.removeAbortListener)
this.removeAbortListener()
this.removeAbortListener = null
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions test/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ test('basic get', async (t) => {
body.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
body.on('close', () => {
t.strictEqual(signal.listenerCount('abort'), 0)
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8'))
})
Expand Down Expand Up @@ -135,7 +135,7 @@ test('basic get with custom request.reset=true', async (t) => {
body.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
body.on('close', () => {
t.strictEqual(signal.listenerCount('abort'), 0)
t.strictEqual('hello', Buffer.concat(bufs).toString('utf8'))
})
Expand Down

0 comments on commit 8dc6a7e

Please sign in to comment.