Skip to content

Commit

Permalink
fix: start retrying immediately, stop after 60 seconds
Browse files Browse the repository at this point in the history
PR-URL: #217
Credit: @nlf
Close: #217
Reviewed-by: @isaacs, @wraithgar
  • Loading branch information
nlf authored and isaacs committed Aug 5, 2021
1 parent 00474f6 commit 569a726
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 30 deletions.
100 changes: 77 additions & 23 deletions graceful-fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ if (!fs[gracefulQueue]) {
return fs$close.call(fs, fd, function (err) {
// This function uses the graceful-fs shared queue
if (!err) {
retry()
resetQueue()
}

if (typeof cb === 'function')
Expand All @@ -72,7 +72,7 @@ if (!fs[gracefulQueue]) {
function closeSync (fd) {
// This function uses the graceful-fs shared queue
fs$closeSync.apply(fs, arguments)
retry()
resetQueue()
}

Object.defineProperty(closeSync, previousSymbol, {
Expand Down Expand Up @@ -114,10 +114,10 @@ function patch (fs) {

return go$readFile(path, options, cb)

function go$readFile (path, options, cb, attempts = 0) {
function go$readFile (path, options, cb, startTime) {
return fs$readFile(path, options, function (err) {
if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
enqueue([go$readFile, [path, options, cb], attempts + 1, err])
enqueue([go$readFile, [path, options, cb], err, startTime || Date.now(), Date.now()])
else {
if (typeof cb === 'function')
cb.apply(this, arguments)
Expand All @@ -134,10 +134,10 @@ function patch (fs) {

return go$writeFile(path, data, options, cb)

function go$writeFile (path, data, options, cb, attempts = 0) {
function go$writeFile (path, data, options, cb, startTime) {
return fs$writeFile(path, data, options, function (err) {
if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
enqueue([go$writeFile, [path, data, options, cb], attempts + 1, err])
enqueue([go$writeFile, [path, data, options, cb], err, startTime || Date.now(), Date.now()])
else {
if (typeof cb === 'function')
cb.apply(this, arguments)
Expand All @@ -155,10 +155,10 @@ function patch (fs) {

return go$appendFile(path, data, options, cb)

function go$appendFile (path, data, options, cb, attempts = 0) {
function go$appendFile (path, data, options, cb, startTime) {
return fs$appendFile(path, data, options, function (err) {
if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
enqueue([go$appendFile, [path, data, options, cb], attempts + 1, err])
enqueue([go$appendFile, [path, data, options, cb], err, startTime || Date.now(), Date.now()])
else {
if (typeof cb === 'function')
cb.apply(this, arguments)
Expand All @@ -177,10 +177,10 @@ function patch (fs) {
}
return go$copyFile(src, dest, flags, cb)

function go$copyFile (src, dest, flags, cb, attempts = 0) {
function go$copyFile (src, dest, flags, cb, startTime) {
return fs$copyFile(src, dest, flags, function (err) {
if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
enqueue([go$copyFile, [src, dest, flags, cb], attempts + 1, err])
enqueue([go$copyFile, [src, dest, flags, cb], err, startTime || Date.now(), Date.now()])
else {
if (typeof cb === 'function')
cb.apply(this, arguments)
Expand All @@ -197,10 +197,10 @@ function patch (fs) {

return go$readdir(path, options, cb)

function go$readdir (path, options, cb, attempts = 0) {
function go$readdir (path, options, cb, startTime) {
return fs$readdir(path, options, function (err, files) {
if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
enqueue([go$readdir, [path, options, cb], attempts + 1, err])
enqueue([go$readdir, [path, options, cb], err, startTime || Date.now(), Date.now()])
else {
if (files && files.sort)
files.sort()
Expand Down Expand Up @@ -334,10 +334,10 @@ function patch (fs) {

return go$open(path, flags, mode, cb)

function go$open (path, flags, mode, cb, attempts = 0) {
function go$open (path, flags, mode, cb, startTime) {
return fs$open(path, flags, mode, function (err, fd) {
if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
enqueue([go$open, [path, flags, mode, cb], attempts + 1, err])
enqueue([go$open, [path, flags, mode, cb], err, startTime || Date.now(), Date.now()])
else {
if (typeof cb === 'function')
cb.apply(this, arguments)
Expand All @@ -355,21 +355,75 @@ function enqueue (elem) {
retry()
}

// keep track of the timeout between retry() calls
var retryTimer

// reset the startTime and lastTime to now
// this resets the start of the 60 second overall timeout as well as the
// delay between attempts so that we'll retry these jobs sooner
function resetQueue () {
var now = Date.now()
for (var i = 0; i < fs[gracefulQueue].length; ++i) {
// entries that are only a length of 2 are from an older version, don't
// bother modifying those since they'll be retried anyway.
if (fs[gracefulQueue][i].length > 2) {
fs[gracefulQueue][i][3] = now // startTime
fs[gracefulQueue][i][4] = now // lastTime
}
}
// call retry to make sure we're actively processing the queue
retry()
}

function retry () {
// clear the timer and remove it to help prevent unintended concurrency
clearTimeout(retryTimer)
retryTimer = undefined

if (fs[gracefulQueue].length === 0)
return

var elem = fs[gracefulQueue].shift()
if (elem) {
const [fn, args, attempts, err] = elem
if (attempts < 10) {
debug('RETRY', fn.name, args, `ATTEMPT #${attempts}`)
fn.call(null, ...args, attempts)
var fn = elem[0]
var args = elem[1]
// these items may be unset if they were added by an older graceful-fs
var err = elem[2]
var startTime = elem[3]
var lastTime = elem[4]

// if we don't have a startTime we have no way of knowing if we've waited
// long enough, so go ahead and retry this item now
if (startTime === undefined) {
debug('RETRY', fn.name, args)
fn.apply(null, args)
} else if (Date.now() - startTime >= 60000) {
// it's been more than 60 seconds total, bail now
debug('TIMEOUT', fn.name, args)
var cb = args.pop()
if (typeof cb === 'function')
cb.call(null, err)
} else {
// the amount of time between the last attempt and right now
var sinceAttempt = Date.now() - lastTime
// the amount of time between when we first tried, and when we last tried
// rounded up to at least 1
var sinceStart = Math.max(lastTime - startTime, 1)
// backoff. wait longer than the total time we've been retrying, but only
// up to a maximum of 100ms
var desiredDelay = Math.min(sinceStart * 1.2, 100)
// it's been long enough since the last retry, do it again
if (sinceAttempt >= desiredDelay) {
debug('RETRY', fn.name, args)
fn.apply(null, args.concat([startTime]))
} else {
const cb = args.pop()
if (typeof cb === 'function')
cb.call(null, err)
// if we can't do this job yet, push it to the end of the queue
// and let the next iteration check again
fs[gracefulQueue].push(elem)
}
}
setImmediate(retry)

// schedule our next run if one isn't already scheduled
if (retryTimer === undefined) {
retryTimer = setTimeout(retry, 0)
}
}
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 19 additions & 6 deletions test/retry.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,30 @@ var test = require('tap').test

var EMFILE = Object.assign(new Error('FAKE EMFILE'), { code: 'EMFILE' })

test('retries 10 times before erroring', function (t) {
var attempts = 0
test('eventually times out and returns error', function (t) {
var readFile = realFs.readFile
var realNow = Date.now

t.teardown(function () {
realFs.readFile = readFile
Date.now = realNow
})

realFs.readFile = function (path, options, cb) {
++attempts
process.nextTick(cb, EMFILE)
process.nextTick(function () {
cb(EMFILE)
// hijack Date.now _after_ we call the callback, the callback will
// call it when adding the job to the queue, we want to capture it
// any time after that first call so we can pretend it's been 60s
Date.now = function () {
return realNow() + 60000
}
})
}
var fs = importFresh(path.dirname(__dirname))

var fs = importFresh(path.dirname(__dirname))
fs.readFile('literally anything', function (err) {
t.equal(err.code, 'EMFILE', 'eventually got the EMFILE')
t.equal(attempts, 10, 'tried 10 times')
t.done()
})
})

0 comments on commit 569a726

Please sign in to comment.