Skip to content

Commit

Permalink
fs: synchronize close with other I/O for streams
Browse files Browse the repository at this point in the history
Part of the flakiness in the
parallel/test-readline-async-iterators-destroy test comes from
fs streams starting `_read()` and `_destroy()` without waiting
for the other to finish, which can lead to the `fs.read()` call
resulting in `EBADF` if timing is bad.

Fix this by synchronizing write and read operations with `close()`.

Refs: #30660

PR-URL: #30837
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
addaleax authored and BethGriggs committed Feb 6, 2020
1 parent 6725fa1 commit 578d12f
Showing 1 changed file with 46 additions and 6 deletions.
52 changes: 46 additions & 6 deletions lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ const {
NumberIsSafeInteger,
ObjectDefineProperty,
ObjectSetPrototypeOf,
Symbol,
} = primordials;

const {
ERR_OUT_OF_RANGE
ERR_OUT_OF_RANGE,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;
const { validateNumber } = require('internal/validators');
const fs = require('fs');
Expand All @@ -21,6 +23,8 @@ const {
} = require('internal/fs/utils');
const { Readable, Writable } = require('stream');
const { toPathIfFileURL } = require('internal/url');
const kIoDone = Symbol('kIoDone');
const kIsPerformingIO = Symbol('kIsPerformingIO');

const kMinPoolSpace = 128;

Expand Down Expand Up @@ -85,6 +89,7 @@ function ReadStream(path, options) {
this.pos = undefined;
this.bytesRead = 0;
this.closed = false;
this[kIsPerformingIO] = false;

if (this.start !== undefined) {
checkPosition(this.start, 'start');
Expand Down Expand Up @@ -143,6 +148,8 @@ ReadStream.prototype._read = function(n) {
});
}

if (this.destroyed) return;

if (!pool || pool.length - pool.used < kMinPoolSpace) {
// Discard the old pool.
allocNewPool(this.readableHighWaterMark);
Expand All @@ -166,7 +173,12 @@ ReadStream.prototype._read = function(n) {
return this.push(null);

// the actual read.
this[kIsPerformingIO] = true;
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) return this.emit(kIoDone, er);

if (er) {
if (this.autoClose) {
this.destroy();
Expand Down Expand Up @@ -212,8 +224,12 @@ ReadStream.prototype._destroy = function(err, cb) {
return;
}

if (this[kIsPerformingIO]) {
this.once(kIoDone, (er) => closeFsStream(this, cb, err || er));
return;
}

closeFsStream(this, cb, err);
this.fd = null;
};

function closeFsStream(stream, cb, err) {
Expand All @@ -224,6 +240,8 @@ function closeFsStream(stream, cb, err) {
if (!er)
stream.emit('close');
});

stream.fd = null;
}

ReadStream.prototype.close = function(cb) {
Expand Down Expand Up @@ -262,6 +280,7 @@ function WriteStream(path, options) {
this.pos = undefined;
this.bytesWritten = 0;
this.closed = false;
this[kIsPerformingIO] = false;

if (this.start !== undefined) {
checkPosition(this.start, 'start');
Expand Down Expand Up @@ -316,7 +335,17 @@ WriteStream.prototype._write = function(data, encoding, cb) {
});
}

if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));

this[kIsPerformingIO] = true;
fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
cb(er);
return this.emit(kIoDone, er);
}

if (er) {
if (this.autoClose) {
this.destroy();
Expand All @@ -339,7 +368,8 @@ WriteStream.prototype._writev = function(data, cb) {
});
}

const self = this;
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));

const len = data.length;
const chunks = new Array(len);
let size = 0;
Expand All @@ -351,12 +381,22 @@ WriteStream.prototype._writev = function(data, cb) {
size += chunk.length;
}

fs.writev(this.fd, chunks, this.pos, function(er, bytes) {
this[kIsPerformingIO] = true;
fs.writev(this.fd, chunks, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
cb(er);
return this.emit(kIoDone, er);
}

if (er) {
self.destroy();
if (this.autoClose) {
this.destroy();
}
return cb(er);
}
self.bytesWritten += bytes;
this.bytesWritten += bytes;
cb();
});

Expand Down

0 comments on commit 578d12f

Please sign in to comment.