From c4b4e021b3beb4e0e0ec11acdc31d8383a35036e Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Wed, 22 May 2024 12:40:56 +0200 Subject: [PATCH 1/4] test: add testing --- lib/handler/retry-handler.js | 31 ++++- test/retry-handler.js | 213 ++++++++++++++++++++++++++++++++++- 2 files changed, 239 insertions(+), 5 deletions(-) diff --git a/lib/handler/retry-handler.js b/lib/handler/retry-handler.js index 56ea4be79be..2056f361892 100644 --- a/lib/handler/retry-handler.js +++ b/lib/handler/retry-handler.js @@ -3,7 +3,14 @@ const assert = require('node:assert') const { kRetryHandlerDefaultRetry } = require('../core/symbols') const { RequestRetryError } = require('../core/errors') -const { isDisturbed, parseHeaders, parseRangeHeader } = require('../core/util') +const { + isStream, + isDisturbed, + isIterable, + isAsyncIterable, + parseHeaders, + parseRangeHeader +} = require('../core/util') function calculateRetryAfterHeader (retryAfter) { const current = Date.now() @@ -11,6 +18,7 @@ function calculateRetryAfterHeader (retryAfter) { } class RetryHandler { + #isBodyIteratorLike = false constructor (opts, handlers) { const { retryOptions, ...dispatchOpts } = opts const { @@ -63,6 +71,10 @@ class RetryHandler { this.end = null this.etag = null this.resume = null + this.#isBodyIteratorLike = + typeof this.opts.body === 'object' && + !isStream(this.opts.body) && + (isIterable(this.opts.body) || isAsyncIterable(this.opts.body)) // Handle possible onConnect duplication this.handler.onConnect(reason => { @@ -174,7 +186,9 @@ class RetryHandler { this.abort( new RequestRetryError('Request failed', statusCode, { headers, - count: this.retryCount + data: { + count: this.retryCount + } }) ) return false @@ -278,7 +292,7 @@ class RetryHandler { const err = new RequestRetryError('Request failed', statusCode, { headers, - count: this.retryCount + data: { count: this.retryCount } }) this.abort(err) @@ -298,7 +312,16 @@ class RetryHandler { } onError (err) { - if (this.aborted || isDisturbed(this.opts.body)) { + console.log( + this.aborted, + isDisturbed(this.opts.body), + this.#isBodyIteratorLike + ) + if ( + this.aborted || + isDisturbed(this.opts.body) || + this.#isBodyIteratorLike + ) { return this.handler.onError(err) } diff --git a/test/retry-handler.js b/test/retry-handler.js index 8894ea86668..ef994ebeb10 100644 --- a/test/retry-handler.js +++ b/test/retry-handler.js @@ -4,6 +4,7 @@ const { tspl } = require('@matteo.collina/tspl') const { test, after } = require('node:test') const { createServer } = require('node:http') const { once } = require('node:events') +const { Readable } = require('node:stream') const { RetryHandler, Client } = require('..') const { RequestHandler } = require('../lib/api/api-request') @@ -204,6 +205,75 @@ test('Should account for network and response errors', async t => { await t.completed }) +test('Issue #3288', { only: true }, async t => { + t = tspl(t, { plan: 6 }) + const server = createServer() + const dispatchOptions = { + method: 'POST', + path: '/', + headers: { + 'content-type': 'application/json' + }, + body: (function * () { + yield 'hello' + yield 'world' + })() + } + + server.on('request', (req, res) => { + res.writeHead(500, { + 'content-type': 'application/json' + }) + + res.end('{"message": "failed"}') + }) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + const handler = new RetryHandler(dispatchOptions, { + dispatch: client.dispatch.bind(client), + handler: { + onConnect () { + t.ok(true, 'pass') + }, + onBodySent () { + t.ok(true, 'pass') + }, + onHeaders (status, _rawHeaders, resume, _statusMessage) { + t.strictEqual(status, 500) + return true + }, + onData (chunk) { + console.log('chunk', chunk) + return true + }, + onComplete () { + t.fail() + }, + onError (err) { + t.equal(err.message, 'Request failed') + t.equal(err.statusCode, 500) + t.equal(err.data.count, 1) + } + } + }) + + after(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + client.dispatch( + dispatchOptions, + handler + ) + }) + + await t.completed +}) + test('Should use retry-after header for retries', async t => { t = tspl(t, { plan: 4 }) @@ -655,7 +725,7 @@ test('Should handle 206 partial content - bad-etag', async t => { await t.completed }) -test('retrying a request with a body', async t => { +test('retrying a request with a body', { only: true }, async t => { let counter = 0 const server = createServer() const dispatchOptions = { @@ -685,6 +755,7 @@ test('retrying a request with a body', async t => { t = tspl(t, { plan: 1 }) server.on('request', (req, res) => { + console.log({ counter }) switch (counter) { case 0: req.destroy() @@ -734,6 +805,146 @@ test('retrying a request with a body', async t => { await t.completed }) +test('retrying a request with a body (stream)', { only: true }, async t => { + let counter = 0 + const server = createServer() + const dispatchOptions = { + retryOptions: { + retry: (err, { state, opts }, done) => { + counter++ + + if ( + err.statusCode === 500 || + err.message.includes('other side closed') + ) { + setTimeout(done, 500) + return + } + + return done(err) + } + }, + method: 'POST', + path: '/', + headers: { + 'content-type': 'application/json' + }, + body: Readable.from(Buffer.from(JSON.stringify({ hello: 'world' }))) + } + + t = tspl(t, { plan: 3 }) + + server.on('request', (req, res) => { + console.log({ counter }) + switch (counter) { + case 0: + res.writeHead(500) + res.end('failed') + return + default: + t.fail() + } + }) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + const handler = new RetryHandler(dispatchOptions, { + dispatch: client.dispatch.bind(client), + handler: new RequestHandler(dispatchOptions, (err, data) => { + t.equal(err.statusCode, 500) + t.equal(err.data.count, 1) + t.equal(err.code, 'UND_ERR_REQ_RETRY') + }) + }) + + after(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + client.dispatch( + dispatchOptions, + handler + ) + }) + + await t.completed +}) + +test('retrying a request with a body (buffer)', { only: true }, async t => { + let counter = 0 + const server = createServer() + const dispatchOptions = { + retryOptions: { + retry: (err, { state, opts }, done) => { + counter++ + + if ( + err.statusCode === 500 || + err.message.includes('other side closed') + ) { + setTimeout(done, 500) + return + } + + return done(err) + } + }, + method: 'POST', + path: '/', + headers: { + 'content-type': 'application/json' + }, + body: Buffer.from(JSON.stringify({ hello: 'world' })) + } + + t = tspl(t, { plan: 1 }) + + server.on('request', (req, res) => { + switch (counter) { + case 0: + req.destroy() + return + case 1: + res.writeHead(500) + res.end('failed') + return + case 2: + res.writeHead(200) + res.end('hello world!') + return + default: + t.fail() + } + }) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + const handler = new RetryHandler(dispatchOptions, { + dispatch: client.dispatch.bind(client), + handler: new RequestHandler(dispatchOptions, (err, data) => { + t.ifError(err) + }) + }) + + after(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + client.dispatch( + dispatchOptions, + handler + ) + }) + + await t.completed +}) + test('should not error if request is not meant to be retried', async t => { t = tspl(t, { plan: 3 }) From 38e6a6c0c073bfa80189261148094f738e983ff3 Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Fri, 24 May 2024 12:39:35 +0200 Subject: [PATCH 2/4] refactor: enhance body wrapping --- lib/core/symbols.js | 1 + lib/core/util.js | 60 ++++++++++++++++++++++++++++++++++-- lib/handler/retry-handler.js | 23 +++----------- test/retry-handler.js | 8 ++--- 4 files changed, 66 insertions(+), 26 deletions(-) diff --git a/lib/core/symbols.js b/lib/core/symbols.js index b58fc90a69f..c8ba5dd8ec5 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -20,6 +20,7 @@ module.exports = { kHost: Symbol('host'), kNoRef: Symbol('no ref'), kBodyUsed: Symbol('used'), + kBody: Symbol('abstracted request body'), kRunning: Symbol('running'), kBlocking: Symbol('blocking'), kPending: Symbol('pending'), diff --git a/lib/core/util.js b/lib/core/util.js index 2bad24df2f6..ddb72d226ce 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -1,19 +1,72 @@ 'use strict' const assert = require('node:assert') -const { kDestroyed, kBodyUsed, kListeners } = require('./symbols') +const { kDestroyed, kBodyUsed, kListeners, kBody } = require('./symbols') const { IncomingMessage } = require('node:http') const stream = require('node:stream') const net = require('node:net') -const { InvalidArgumentError } = require('./errors') const { Blob } = require('node:buffer') const nodeUtil = require('node:util') const { stringify } = require('node:querystring') +const { EventEmitter: EE } = require('node:events') +const { InvalidArgumentError } = require('./errors') const { headerNameLowerCasedRecord } = require('./constants') const { tree } = require('./tree') const [nodeMajor, nodeMinor] = process.versions.node.split('.').map(v => Number(v)) +class BodyAsyncIterable { + constructor (body) { + this[kBody] = body + this[kBodyUsed] = false + } + + async * [Symbol.asyncIterator] () { + assert(!this[kBodyUsed], 'disturbed') + this[kBodyUsed] = true + yield * this[kBody] + } +} + +function wrapRequestBody (body) { + if (isStream(body)) { + // TODO (fix): Provide some way for the user to cache the file to e.g. /tmp + // so that it can be dispatched again? + // TODO (fix): Do we need 100-expect support to provide a way to do this properly? + if (bodyLength(body) === 0) { + body + .on('data', function () { + assert(false) + }) + } + + if (typeof body.readableDidRead !== 'boolean') { + body[kBodyUsed] = false + EE.prototype.on.call(body, 'data', function () { + this[kBodyUsed] = true + }) + } + + return body + } else if (body && typeof body.pipeTo === 'function') { + // TODO (fix): We can't access ReadableStream internal state + // to determine whether or not it has been disturbed. This is just + // a workaround. + return new BodyAsyncIterable(body) + } else if ( + body && + typeof body !== 'string' && + !ArrayBuffer.isView(body) && + isIterable(body) + ) { + // TODO: Should we allow re-using iterable if !this.opts.idempotent + // or through some other flag? + return new BodyAsyncIterable(body) + } else { + return body + } +} + function nop () {} function isStream (obj) { @@ -634,5 +687,6 @@ module.exports = { isHttpOrHttpsPrefixed, nodeMajor, nodeMinor, - safeHTTPMethods: ['GET', 'HEAD', 'OPTIONS', 'TRACE'] + safeHTTPMethods: ['GET', 'HEAD', 'OPTIONS', 'TRACE'], + wrapRequestBody } diff --git a/lib/handler/retry-handler.js b/lib/handler/retry-handler.js index 2056f361892..e75a9aa29ab 100644 --- a/lib/handler/retry-handler.js +++ b/lib/handler/retry-handler.js @@ -4,12 +4,10 @@ const assert = require('node:assert') const { kRetryHandlerDefaultRetry } = require('../core/symbols') const { RequestRetryError } = require('../core/errors') const { - isStream, isDisturbed, - isIterable, - isAsyncIterable, parseHeaders, - parseRangeHeader + parseRangeHeader, + wrapRequestBody } = require('../core/util') function calculateRetryAfterHeader (retryAfter) { @@ -18,7 +16,6 @@ function calculateRetryAfterHeader (retryAfter) { } class RetryHandler { - #isBodyIteratorLike = false constructor (opts, handlers) { const { retryOptions, ...dispatchOpts } = opts const { @@ -71,10 +68,7 @@ class RetryHandler { this.end = null this.etag = null this.resume = null - this.#isBodyIteratorLike = - typeof this.opts.body === 'object' && - !isStream(this.opts.body) && - (isIterable(this.opts.body) || isAsyncIterable(this.opts.body)) + this.opts.body = wrapRequestBody(this.opts.body) // Handle possible onConnect duplication this.handler.onConnect(reason => { @@ -312,16 +306,7 @@ class RetryHandler { } onError (err) { - console.log( - this.aborted, - isDisturbed(this.opts.body), - this.#isBodyIteratorLike - ) - if ( - this.aborted || - isDisturbed(this.opts.body) || - this.#isBodyIteratorLike - ) { + if (this.aborted || isDisturbed(this.opts.body)) { return this.handler.onError(err) } diff --git a/test/retry-handler.js b/test/retry-handler.js index ef994ebeb10..85f5b5c0379 100644 --- a/test/retry-handler.js +++ b/test/retry-handler.js @@ -205,7 +205,7 @@ test('Should account for network and response errors', async t => { await t.completed }) -test('Issue #3288', { only: true }, async t => { +test('Issue #3288', async t => { t = tspl(t, { plan: 6 }) const server = createServer() const dispatchOptions = { @@ -725,7 +725,7 @@ test('Should handle 206 partial content - bad-etag', async t => { await t.completed }) -test('retrying a request with a body', { only: true }, async t => { +test('retrying a request with a body', async t => { let counter = 0 const server = createServer() const dispatchOptions = { @@ -805,7 +805,7 @@ test('retrying a request with a body', { only: true }, async t => { await t.completed }) -test('retrying a request with a body (stream)', { only: true }, async t => { +test('retrying a request with a body (stream)', async t => { let counter = 0 const server = createServer() const dispatchOptions = { @@ -873,7 +873,7 @@ test('retrying a request with a body (stream)', { only: true }, async t => { await t.completed }) -test('retrying a request with a body (buffer)', { only: true }, async t => { +test('retrying a request with a body (buffer)', async t => { let counter = 0 const server = createServer() const dispatchOptions = { From 82d7fb30e47e6c4b25eaaeec833299060a047de2 Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Wed, 29 May 2024 10:15:20 +0200 Subject: [PATCH 3/4] fix: do not mutate original opts --- lib/handler/retry-handler.js | 3 +-- test/retry-handler.js | 8 ++------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/lib/handler/retry-handler.js b/lib/handler/retry-handler.js index e75a9aa29ab..fcd9f0df513 100644 --- a/lib/handler/retry-handler.js +++ b/lib/handler/retry-handler.js @@ -34,7 +34,7 @@ class RetryHandler { this.dispatch = handlers.dispatch this.handler = handlers.handler - this.opts = dispatchOpts + this.opts = { ...dispatchOpts, body: wrapRequestBody(opts.body) } this.abort = null this.aborted = false this.retryOpts = { @@ -68,7 +68,6 @@ class RetryHandler { this.end = null this.etag = null this.resume = null - this.opts.body = wrapRequestBody(this.opts.body) // Handle possible onConnect duplication this.handler.onConnect(reason => { diff --git a/test/retry-handler.js b/test/retry-handler.js index 85f5b5c0379..83e222861ae 100644 --- a/test/retry-handler.js +++ b/test/retry-handler.js @@ -205,7 +205,7 @@ test('Should account for network and response errors', async t => { await t.completed }) -test('Issue #3288', async t => { +test('Issue #3288 - request with body (asynciterable)', async t => { t = tspl(t, { plan: 6 }) const server = createServer() const dispatchOptions = { @@ -244,7 +244,6 @@ test('Issue #3288', async t => { return true }, onData (chunk) { - console.log('chunk', chunk) return true }, onComplete () { @@ -755,7 +754,6 @@ test('retrying a request with a body', async t => { t = tspl(t, { plan: 1 }) server.on('request', (req, res) => { - console.log({ counter }) switch (counter) { case 0: req.destroy() @@ -835,7 +833,6 @@ test('retrying a request with a body (stream)', async t => { t = tspl(t, { plan: 3 }) server.on('request', (req, res) => { - console.log({ counter }) switch (counter) { case 0: res.writeHead(500) @@ -988,8 +985,7 @@ test('should not error if request is not meant to be retried', async t => { t.strictEqual(Buffer.concat(chunks).toString('utf-8'), 'Bad request') }, onError (err) { - console.log({ err }) - t.fail() + t.fail(err) } } }) From 735a72f43c93a113fdd00ca75b4f584c5ba8b4e2 Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Wed, 29 May 2024 10:20:54 +0200 Subject: [PATCH 4/4] docs: extend documentation --- docs/docs/api/RetryHandler.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/docs/api/RetryHandler.md b/docs/docs/api/RetryHandler.md index 6dbc5077d02..8988ee53010 100644 --- a/docs/docs/api/RetryHandler.md +++ b/docs/docs/api/RetryHandler.md @@ -46,6 +46,9 @@ It represents the retry state for a given request. - **dispatch** `(options: Dispatch.DispatchOptions, handlers: Dispatch.DispatchHandlers) => Promise` (required) - Dispatch function to be called after every retry. - **handler** Extends [`Dispatch.DispatchHandlers`](Dispatcher.md#dispatcherdispatchoptions-handler) (required) - Handler function to be called after the request is successful or the retries are exhausted. +>__Note__: The `RetryHandler` does not retry over stateful bodies (e.g. streams, AsyncIterable) as those, once consumed, are left in an state that cannot be reutilized. For these situations the `RetryHandler` will identify +>the body as stateful and will not retry the request rejecting with the error `UND_ERR_REQ_RETRY`. + Examples: ```js