Skip to content

Commit

Permalink
WriteEntry backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
isaacs committed Aug 9, 2021
1 parent 0dcc5b2 commit 83bb22c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 7 deletions.
28 changes: 22 additions & 6 deletions lib/write-entry.js
Expand Up @@ -24,6 +24,8 @@ const OPENFILE = Symbol('openfile')
const ONOPENFILE = Symbol('onopenfile')
const CLOSE = Symbol('close')
const MODE = Symbol('mode')
const AWAITDRAIN = Symbol('awaitDrain')
const ONDRAIN = Symbol('ondrain')
const warner = require('./warn-mixin.js')
const winchars = require('./winchars.js')
const stripAbsolutePath = require('./strip-absolute-path.js')
Expand Down Expand Up @@ -226,7 +228,7 @@ const WriteEntry = warner(class WriteEntry extends MiniPass {
this.pos = 0
this.remain = this.stat.size
this.length = this.buf.length
this[READ](this.stat.size)
this[READ]()
}

[READ] () {
Expand Down Expand Up @@ -278,13 +280,23 @@ const WriteEntry = warner(class WriteEntry extends MiniPass {

const writeBuf = this.offset === 0 && bytesRead === this.buf.length ?
this.buf : this.buf.slice(this.offset, this.offset + bytesRead)
this.remain -= bytesRead
this.blockRemain -= bytesRead
this.pos += bytesRead
this.offset += bytesRead
this.remain -= writeBuf.length
this.blockRemain -= writeBuf.length
this.pos += writeBuf.length
this.offset += writeBuf.length

const flushed = this.write(writeBuf)
if (!flushed)
this[AWAITDRAIN](() => this[ONDRAIN]())
else
this[ONDRAIN]()
}

this.write(writeBuf)
[AWAITDRAIN] (cb) {
this.once('drain', cb)
}

[ONDRAIN] () {
if (!this.remain) {
if (this.blockRemain)
this.write(Buffer.alloc(this.blockRemain))
Expand Down Expand Up @@ -338,6 +350,10 @@ class WriteEntrySync extends WriteEntry {
}
}

[AWAITDRAIN] (cb) {
cb()
}

[CLOSE] (cb) {
fs.closeSync(this.fd)
cb()
Expand Down
18 changes: 17 additions & 1 deletion test/write-entry.js
Expand Up @@ -229,6 +229,22 @@ t.test('zero-byte file', t => {
})
})

t.test('zero-byte file, but close fails', t => {
const poop = new Error('poop')
t.tearDown(mutateFS.fail('close', poop))

const ws = new WriteEntry('files/1024-bytes.txt', { cwd: fixtures })

ws.on('end', _ =>
t.fail('should not get an end, because the close fails'))

ws.on('error', er => {
t.match(er, { message: 'poop' })
t.end()
})
ws.resume()
})

t.test('hardlinks', t => {
const h1 = 'hardlink-1'
const h2 = 'hardlink-2'
Expand Down Expand Up @@ -573,7 +589,7 @@ t.test('read overflow expectation', t => {
new WriteEntry(f, { cwd: files, maxReadSize: 2 }).on('error', er => {
t.match(er, expect)
t.end()
})
}).resume()
})

t.test('short reads', t => {
Expand Down

0 comments on commit 83bb22c

Please sign in to comment.