Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry on body support #3294

Merged
merged 7 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module.exports = {
kHost: Symbol('host'),
kNoRef: Symbol('no ref'),
kBodyUsed: Symbol('used'),
kBody: Symbol('abstracted request body'),
kRunning: Symbol('running'),
kBlocking: Symbol('blocking'),
kPending: Symbol('pending'),
Expand Down
60 changes: 57 additions & 3 deletions lib/core/util.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,72 @@
'use strict'

const assert = require('node:assert')
const { kDestroyed, kBodyUsed, kListeners } = require('./symbols')
const { kDestroyed, kBodyUsed, kListeners, kBody } = require('./symbols')
const { IncomingMessage } = require('node:http')
const stream = require('node:stream')
const net = require('node:net')
const { InvalidArgumentError } = require('./errors')
const { Blob } = require('node:buffer')
const nodeUtil = require('node:util')
const { stringify } = require('node:querystring')
const { EventEmitter: EE } = require('node:events')
const { InvalidArgumentError } = require('./errors')
const { headerNameLowerCasedRecord } = require('./constants')
const { tree } = require('./tree')

const [nodeMajor, nodeMinor] = process.versions.node.split('.').map(v => Number(v))

class BodyAsyncIterable {
constructor (body) {
this[kBody] = body
this[kBodyUsed] = false
}

async * [Symbol.asyncIterator] () {
assert(!this[kBodyUsed], 'disturbed')
this[kBodyUsed] = true
yield * this[kBody]
}
}

function wrapRequestBody (body) {
if (isStream(body)) {
// TODO (fix): Provide some way for the user to cache the file to e.g. /tmp
// so that it can be dispatched again?
// TODO (fix): Do we need 100-expect support to provide a way to do this properly?
if (bodyLength(body) === 0) {
body
.on('data', function () {
assert(false)
})
}

if (typeof body.readableDidRead !== 'boolean') {
body[kBodyUsed] = false
EE.prototype.on.call(body, 'data', function () {
this[kBodyUsed] = true
})
}

return body
} else if (body && typeof body.pipeTo === 'function') {
// TODO (fix): We can't access ReadableStream internal state
// to determine whether or not it has been disturbed. This is just
// a workaround.
return new BodyAsyncIterable(body)
} else if (
body &&
typeof body !== 'string' &&
!ArrayBuffer.isView(body) &&
isIterable(body)
) {
// TODO: Should we allow re-using iterable if !this.opts.idempotent
// or through some other flag?
ronag marked this conversation as resolved.
Show resolved Hide resolved
return new BodyAsyncIterable(body)
} else {
return body
}
}

function nop () {}

function isStream (obj) {
Expand Down Expand Up @@ -634,5 +687,6 @@ module.exports = {
isHttpOrHttpsPrefixed,
nodeMajor,
nodeMinor,
safeHTTPMethods: ['GET', 'HEAD', 'OPTIONS', 'TRACE']
safeHTTPMethods: ['GET', 'HEAD', 'OPTIONS', 'TRACE'],
wrapRequestBody
}
14 changes: 11 additions & 3 deletions lib/handler/retry-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ const assert = require('node:assert')

const { kRetryHandlerDefaultRetry } = require('../core/symbols')
const { RequestRetryError } = require('../core/errors')
const { isDisturbed, parseHeaders, parseRangeHeader } = require('../core/util')
const {
isDisturbed,
parseHeaders,
parseRangeHeader,
wrapRequestBody
} = require('../core/util')

function calculateRetryAfterHeader (retryAfter) {
const current = Date.now()
Expand Down Expand Up @@ -63,6 +68,7 @@ class RetryHandler {
this.end = null
this.etag = null
this.resume = null
this.opts.body = wrapRequestBody(this.opts.body)
ronag marked this conversation as resolved.
Show resolved Hide resolved
metcoder95 marked this conversation as resolved.
Show resolved Hide resolved

// Handle possible onConnect duplication
this.handler.onConnect(reason => {
Expand Down Expand Up @@ -174,7 +180,9 @@ class RetryHandler {
this.abort(
new RequestRetryError('Request failed', statusCode, {
headers,
count: this.retryCount
data: {
count: this.retryCount
}
})
)
return false
Expand Down Expand Up @@ -278,7 +286,7 @@ class RetryHandler {

const err = new RequestRetryError('Request failed', statusCode, {
headers,
count: this.retryCount
data: { count: this.retryCount }
})

this.abort(err)
Expand Down
211 changes: 211 additions & 0 deletions test/retry-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const { tspl } = require('@matteo.collina/tspl')
const { test, after } = require('node:test')
const { createServer } = require('node:http')
const { once } = require('node:events')
const { Readable } = require('node:stream')

const { RetryHandler, Client } = require('..')
const { RequestHandler } = require('../lib/api/api-request')
Expand Down Expand Up @@ -204,6 +205,75 @@ test('Should account for network and response errors', async t => {
await t.completed
})

test('Issue #3288', async t => {
t = tspl(t, { plan: 6 })
const server = createServer()
const dispatchOptions = {
method: 'POST',
path: '/',
headers: {
'content-type': 'application/json'
},
body: (function * () {
yield 'hello'
yield 'world'
})()
}

server.on('request', (req, res) => {
res.writeHead(500, {
'content-type': 'application/json'
})

res.end('{"message": "failed"}')
})

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
const handler = new RetryHandler(dispatchOptions, {
dispatch: client.dispatch.bind(client),
handler: {
onConnect () {
t.ok(true, 'pass')
},
onBodySent () {
t.ok(true, 'pass')
},
onHeaders (status, _rawHeaders, resume, _statusMessage) {
t.strictEqual(status, 500)
return true
},
onData (chunk) {
console.log('chunk', chunk)
return true
},
onComplete () {
t.fail()
},
onError (err) {
t.equal(err.message, 'Request failed')
t.equal(err.statusCode, 500)
t.equal(err.data.count, 1)
}
}
})

after(async () => {
await client.close()
server.close()

await once(server, 'close')
})

client.dispatch(
dispatchOptions,
handler
)
})

await t.completed
})

test('Should use retry-after header for retries', async t => {
t = tspl(t, { plan: 4 })

Expand Down Expand Up @@ -685,6 +755,7 @@ test('retrying a request with a body', async t => {
t = tspl(t, { plan: 1 })

server.on('request', (req, res) => {
console.log({ counter })
switch (counter) {
case 0:
req.destroy()
Expand Down Expand Up @@ -734,6 +805,146 @@ test('retrying a request with a body', async t => {
await t.completed
})

test('retrying a request with a body (stream)', async t => {
let counter = 0
const server = createServer()
const dispatchOptions = {
retryOptions: {
retry: (err, { state, opts }, done) => {
counter++

if (
err.statusCode === 500 ||
err.message.includes('other side closed')
) {
setTimeout(done, 500)
return
}

return done(err)
}
},
method: 'POST',
path: '/',
headers: {
'content-type': 'application/json'
},
body: Readable.from(Buffer.from(JSON.stringify({ hello: 'world' })))
}

t = tspl(t, { plan: 3 })

server.on('request', (req, res) => {
console.log({ counter })
switch (counter) {
case 0:
res.writeHead(500)
res.end('failed')
return
default:
t.fail()
}
})

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
const handler = new RetryHandler(dispatchOptions, {
dispatch: client.dispatch.bind(client),
handler: new RequestHandler(dispatchOptions, (err, data) => {
t.equal(err.statusCode, 500)
t.equal(err.data.count, 1)
t.equal(err.code, 'UND_ERR_REQ_RETRY')
})
})

after(async () => {
await client.close()
server.close()

await once(server, 'close')
})

client.dispatch(
dispatchOptions,
handler
)
})

await t.completed
})

test('retrying a request with a body (buffer)', async t => {
let counter = 0
const server = createServer()
const dispatchOptions = {
retryOptions: {
retry: (err, { state, opts }, done) => {
counter++

if (
err.statusCode === 500 ||
err.message.includes('other side closed')
) {
setTimeout(done, 500)
return
}

return done(err)
}
},
method: 'POST',
path: '/',
headers: {
'content-type': 'application/json'
},
body: Buffer.from(JSON.stringify({ hello: 'world' }))
}

t = tspl(t, { plan: 1 })

server.on('request', (req, res) => {
switch (counter) {
case 0:
req.destroy()
return
case 1:
res.writeHead(500)
res.end('failed')
return
case 2:
res.writeHead(200)
res.end('hello world!')
return
default:
t.fail()
}
})

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
const handler = new RetryHandler(dispatchOptions, {
dispatch: client.dispatch.bind(client),
handler: new RequestHandler(dispatchOptions, (err, data) => {
t.ifError(err)
})
})

after(async () => {
await client.close()
server.close()

await once(server, 'close')
})

client.dispatch(
dispatchOptions,
handler
)
})

await t.completed
})

test('should not error if request is not meant to be retried', async t => {
t = tspl(t, { plan: 3 })

Expand Down
Loading