From 63b7794af7c282d543a56d13521f9d5e92fa61ae Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 7 May 2024 17:14:44 +0200 Subject: [PATCH] use FinalizationRegistry to cancel the body if response is collected (#3199) * use FinalizationRegistry to cancel the body if response is collected Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * fixup Signed-off-by: Matteo Collina * Update lib/dispatcher/client-h2.js Co-authored-by: Aras Abbasi --------- Signed-off-by: Matteo Collina Co-authored-by: Aras Abbasi --- lib/web/fetch/index.js | 34 ++++++++++++++++------ lib/web/fetch/response.js | 19 +++++++++++++ package.json | 2 +- test/fetch/fire-and-forget.js | 53 +++++++++++++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 10 deletions(-) create mode 100644 test/fetch/fire-and-forget.js diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index e7e7acfb0ea..5fbef495ad6 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -120,12 +120,16 @@ class Fetch extends EE { } } +function handleFetchDone (response) { + finalizeAndReportTiming(response, 'fetch') +} + // https://fetch.spec.whatwg.org/#fetch-method function fetch (input, init = undefined) { webidl.argumentLengthCheck(arguments, 1, 'globalThis.fetch') // 1. Let p be a new promise. - const p = createDeferredPromise() + let p = createDeferredPromise() // 2. Let requestObject be the result of invoking the initial value of // Request as constructor with input and init as arguments. If this throws @@ -185,16 +189,17 @@ function fetch (input, init = undefined) { // 3. Abort controller with requestObject’s signal’s abort reason. controller.abort(requestObject.signal.reason) + const realResponse = responseObject?.deref() + // 4. Abort the fetch() call with p, request, responseObject, // and requestObject’s signal’s abort reason. - abortFetch(p, request, responseObject, requestObject.signal.reason) + abortFetch(p, request, realResponse, requestObject.signal.reason) } ) // 12. Let handleFetchDone given response response be to finalize and // report timing with response, globalObject, and "fetch". - const handleFetchDone = (response) => - finalizeAndReportTiming(response, 'fetch') + // see function handleFetchDone // 13. Set controller to the result of calling fetch given request, // with processResponseEndOfBody set to handleFetchDone, and processResponse @@ -228,10 +233,11 @@ function fetch (input, init = undefined) { // 4. Set responseObject to the result of creating a Response object, // given response, "immutable", and relevantRealm. - responseObject = fromInnerResponse(response, 'immutable') + responseObject = new WeakRef(fromInnerResponse(response, 'immutable')) // 5. Resolve p with responseObject. - p.resolve(responseObject) + p.resolve(responseObject.deref()) + p = null } controller = fetching({ @@ -314,7 +320,10 @@ const markResourceTiming = performance.markResourceTiming // https://fetch.spec.whatwg.org/#abort-fetch function abortFetch (p, request, responseObject, error) { // 1. Reject promise with error. - p.reject(error) + if (p) { + // We might have already resolved the promise at this stage + p.reject(error) + } // 2. If request’s body is not null and is readable, then cancel request’s // body with error. @@ -1066,7 +1075,10 @@ function fetchFinale (fetchParams, response) { // 4. If fetchParams’s process response is non-null, then queue a fetch task to run fetchParams’s // process response given response, with fetchParams’s task destination. if (fetchParams.processResponse != null) { - queueMicrotask(() => fetchParams.processResponse(response)) + queueMicrotask(() => { + fetchParams.processResponse(response) + fetchParams.processResponse = null + }) } // 5. Let internalResponse be response, if response is a network error; otherwise response’s internal response. @@ -1884,7 +1896,11 @@ async function httpNetworkFetch ( // 12. Let cancelAlgorithm be an algorithm that aborts fetchParams’s // controller with reason, given reason. const cancelAlgorithm = (reason) => { - fetchParams.controller.abort(reason) + // If the aborted fetch was already terminated, then we do not + // need to do anything. + if (!isCancelled(fetchParams)) { + fetchParams.controller.abort(reason) + } } // 13. Let highWaterMark be a non-negative, non-NaN number, chosen by diff --git a/lib/web/fetch/response.js b/lib/web/fetch/response.js index 222a9a5b2f7..81c32fe3e51 100644 --- a/lib/web/fetch/response.js +++ b/lib/web/fetch/response.js @@ -26,9 +26,23 @@ const { URLSerializer } = require('./data-url') const { kHeadersList, kConstruct } = require('../../core/symbols') const assert = require('node:assert') const { types } = require('node:util') +const { isDisturbed, isErrored } = require('node:stream') const textEncoder = new TextEncoder('utf-8') +const hasFinalizationRegistry = globalThis.FinalizationRegistry && process.version.indexOf('v18') !== 0 +let registry + +if (hasFinalizationRegistry) { + registry = new FinalizationRegistry((stream) => { + if (!stream.locked && !isDisturbed(stream) && !isErrored(stream)) { + stream.cancel('Response object has been garbage collected').catch(noop) + } + }) +} + +function noop () {} + // https://fetch.spec.whatwg.org/#response-class class Response { // Creates network error Response. @@ -510,6 +524,11 @@ function fromInnerResponse (innerResponse, guard) { response[kHeaders] = new Headers(kConstruct) response[kHeaders][kHeadersList] = innerResponse.headersList response[kHeaders][kGuard] = guard + + if (hasFinalizationRegistry && innerResponse.body?.stream) { + registry.register(response, innerResponse.body.stream) + } + return response } diff --git a/package.json b/package.json index 0b8f491a59b..29f3d88ec95 100644 --- a/package.json +++ b/package.json @@ -77,7 +77,7 @@ "test:eventsource:nobuild": "borp --expose-gc -p \"test/eventsource/*.js\"", "test:fuzzing": "node test/fuzzing/fuzzing.test.js", "test:fetch": "npm run build:node && npm run test:fetch:nobuild", - "test:fetch:nobuild": "borp --expose-gc -p \"test/fetch/*.js\" && npm run test:webidl && npm run test:busboy", + "test:fetch:nobuild": "borp --timeout 180000 --expose-gc --concurrency 1 -p \"test/fetch/*.js\" && npm run test:webidl && npm run test:busboy", "test:interceptors": "borp -p \"test/interceptors/*.js\"", "test:jest": "cross-env NODE_V8_COVERAGE= jest", "test:unit": "borp --expose-gc -p \"test/*.js\"", diff --git a/test/fetch/fire-and-forget.js b/test/fetch/fire-and-forget.js new file mode 100644 index 00000000000..b72365c4adb --- /dev/null +++ b/test/fetch/fire-and-forget.js @@ -0,0 +1,53 @@ +'use strict' + +const { randomFillSync } = require('node:crypto') +const { setTimeout: sleep } = require('timers/promises') +const { test } = require('node:test') +const { fetch, Agent, setGlobalDispatcher } = require('../..') +const { createServer } = require('node:http') +const { closeServerAsPromise } = require('../utils/node-http') + +const blob = randomFillSync(new Uint8Array(1024 * 512)) + +// Enable when/if FinalizationRegistry in Node.js 18 becomes stable again +const isNode18 = process.version.startsWith('v18') + +test('does not need the body to be consumed to continue', { timeout: 180_000, skip: isNode18 }, async (t) => { + const agent = new Agent({ + keepAliveMaxTimeout: 10, + keepAliveTimeoutThreshold: 10 + }) + setGlobalDispatcher(agent) + const server = createServer((req, res) => { + res.writeHead(200) + res.end(blob) + }) + t.after(closeServerAsPromise(server)) + + await new Promise((resolve) => { + server.listen(0, resolve) + }) + + const url = new URL(`http://127.0.0.1:${server.address().port}`) + + const batch = 50 + const delay = 0 + let total = 0 + while (total < 10000) { + // eslint-disable-next-line no-undef + gc(true) + const array = new Array(batch) + for (let i = 0; i < batch; i++) { + array[i] = fetch(url).catch(() => {}) + } + await Promise.all(array) + await sleep(delay) + + console.log( + 'RSS', + (process.memoryUsage.rss() / 1024 / 1024) | 0, + 'MB after', + (total += batch) + ' fetch() requests' + ) + } +})