Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 66 additions & 16 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ const EventEmitter = require('events');
const { StringDecoder } = require('string_decoder');
const { kFSWatchStart, watch } = require('internal/fs/watchers');
const nonNativeWatcher = require('internal/fs/recursive_watch');
const { isIterable } = require('internal/streams/utils');
const {
isIterable,
isReadableErrored,
isReadableNodeStream,
} = require('internal/streams/utils');
const assert = require('internal/assert');

const permission = require('internal/process/permission');
Expand Down Expand Up @@ -1116,24 +1120,60 @@ function checkAborted(signal) {
throw new AbortError(undefined, { cause: signal.reason });
}

async function writeFileHandle(filehandle, data, signal, encoding) {
checkAborted(signal);
function makeWriteFileStreamErrorHandler(data) {
if (!isReadableNodeStream(data))
return undefined;

let error;
let errored = false;
function onError(err) {
error = err;
errored = true;
}
const streamError = isReadableErrored(data);
if (streamError != null)
onError(streamError);
data.on('error', onError);

return {
__proto__: null,
check() {
if (errored)
throw error;
},
cleanup() {
data.removeListener('error', onError);
},
};
}

async function writeFileHandle(filehandle, data, signal, encoding, streamErrorHandler) {
if (isCustomIterable(data)) {
for await (const buf of data) {
streamErrorHandler ??= makeWriteFileStreamErrorHandler(data);
try {
checkAborted(signal);
const toWrite =
isArrayBufferView(buf) ? buf : Buffer.from(buf, encoding || 'utf8');
let remaining = toWrite.byteLength;
while (remaining > 0) {
const writeSize = MathMin(kWriteFileMaxChunkSize, remaining);
const { bytesWritten } = await write(
filehandle, toWrite, toWrite.byteLength - remaining, writeSize);
remaining -= bytesWritten;
streamErrorHandler?.check();
for await (const buf of data) {
checkAborted(signal);
streamErrorHandler?.check();
const toWrite =
isArrayBufferView(buf) ? buf : Buffer.from(buf, encoding || 'utf8');
let remaining = toWrite.byteLength;
while (remaining > 0) {
const writeSize = MathMin(kWriteFileMaxChunkSize, remaining);
const { bytesWritten } = await write(
filehandle, toWrite, toWrite.byteLength - remaining, writeSize);
remaining -= bytesWritten;
checkAborted(signal);
streamErrorHandler?.check();
}
}
} finally {
streamErrorHandler?.cleanup();
}
return;
}
checkAborted(signal);
data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength);
let remaining = data.byteLength;
if (remaining === 0) return;
Expand Down Expand Up @@ -1891,13 +1931,23 @@ async function writeFile(path, data, options) {
}

validateAbortSignal(options.signal);
checkAborted(options.signal);
const streamErrorHandler = makeWriteFileStreamErrorHandler(data);

if (path instanceof FileHandle)
return writeFileHandle(path, data, options.signal, options.encoding);
return writeFileHandle(
path, data, options.signal, options.encoding, streamErrorHandler);

checkAborted(options.signal);
let fd;
try {
fd = await open(path, flag, options.mode);
} catch (err) {
streamErrorHandler?.cleanup();
throw err;
}

const fd = await open(path, flag, options.mode);
let writeOp = writeFileHandle(fd, data, options.signal, options.encoding);
let writeOp = writeFileHandle(
fd, data, options.signal, options.encoding, streamErrorHandler);

if (flush) {
writeOp = handleFdSync(writeOp, fd);
Expand Down
30 changes: 30 additions & 0 deletions test/parallel/test-fs-promises-file-handle-writeFile.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async function doWriteAndCancel() {

const dest = path.resolve(tmpDir, 'tmp.txt');
const otherDest = path.resolve(tmpDir, 'tmp-2.txt');
const errorDest = path.resolve(tmpDir, 'tmp-error.txt');
const stream = Readable.from(['a', 'b', 'c']);
const stream2 = Readable.from(['ümlaut', ' ', 'sechzig']);
const iterable = {
Expand All @@ -65,6 +66,15 @@ function iterableWith(value) {
}
};
}

function createEarlyErrorStream(error) {
const stream = new Readable({
read() {}
});
process.nextTick(() => stream.destroy(error));
return stream;
}

const bufferIterable = {
expected: 'abc',
*[Symbol.iterator]() {
Expand Down Expand Up @@ -94,6 +104,25 @@ async function doWriteStream() {
}
}

async function doWriteStreamError() {
const fileHandle = await open(errorDest, 'w+');
const error = new Error('early file handle writeFile stream error');
const stream = createEarlyErrorStream(error);
const uncaughtException = common.mustNotCall(
'stream errors should reject FileHandle.writeFile()');

process.once('uncaughtException', uncaughtException);
try {
await assert.rejects(
fileHandle.writeFile(stream),
{ message: error.message }
);
} finally {
process.removeListener('uncaughtException', uncaughtException);
await fileHandle.close();
}
}

async function doWriteStreamWithCancel() {
const controller = new AbortController();
const { signal } = controller;
Expand Down Expand Up @@ -190,6 +219,7 @@ async function doWriteInvalidValues() {
await validateWriteFile();
await doWriteAndCancel();
await doWriteStream();
await doWriteStreamError();
await doWriteStreamWithCancel();
await doWriteIterable();
await doWriteInvalidIterable();
Expand Down
40 changes: 40 additions & 0 deletions test/parallel/test-fs-promises-writefile.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ tmpdir.refresh();

const dest = path.resolve(tmpDir, 'tmp.txt');
const otherDest = path.resolve(tmpDir, 'tmp-2.txt');
const errorDest = path.resolve(tmpDir, 'tmp-error.txt');
const buffer = Buffer.from('abc'.repeat(1000));
const buffer2 = Buffer.from('xyz'.repeat(1000));
const stream = Readable.from(['a', 'b', 'c']);
Expand Down Expand Up @@ -40,6 +41,15 @@ function iterableWith(value) {
}
};
}

function createEarlyErrorStream(error) {
const stream = new Readable({
read() {}
});
process.nextTick(() => stream.destroy(error));
return stream;
}

const bufferIterable = {
expected: 'abc',
*[Symbol.iterator]() {
Expand Down Expand Up @@ -70,6 +80,34 @@ async function doWriteStream() {
assert.deepStrictEqual(data, expected);
}

async function doWriteStreamError() {
const error = new Error('early writeFile stream error');
const stream = createEarlyErrorStream(error);
const uncaughtException = common.mustNotCall(
'stream errors should reject writeFile()');

process.once('uncaughtException', uncaughtException);
try {
await assert.rejects(
fsPromises.writeFile(errorDest, stream),
{ message: error.message }
);
assert.strictEqual(stream.listenerCount('error'), 0);
} finally {
process.removeListener('uncaughtException', uncaughtException);
}
}

async function doWriteStreamOpenError() {
const stream = Readable.from(['a']);

await assert.rejects(
fsPromises.writeFile(path.resolve(tmpDir, 'not-found', 'tmp.txt'), stream),
{ code: 'ENOENT' }
);
assert.strictEqual(stream.listenerCount('error'), 0);
}

async function doWriteStreamWithCancel() {
const controller = new AbortController();
const { signal } = controller;
Expand Down Expand Up @@ -168,6 +206,8 @@ async function doReadWithEncoding() {
await doRead();
await doReadWithEncoding();
await doWriteStream();
await doWriteStreamError();
await doWriteStreamOpenError();
await doWriteStreamWithCancel();
await doWriteIterable();
await doWriteInvalidIterable();
Expand Down
Loading