Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(refactor): use writeStream.destroy() instead #125

Merged
merged 1 commit into from
Apr 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
59 changes: 27 additions & 32 deletions lib/moveAndMaybeCompressFile.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ const moveAndMaybeCompressFile = async (
);
if (options.compress) {
await new Promise((resolve, reject) => {
let error = false;
// to avoid concurrency, the process which can create the file will proceed (using flags wx)
// to avoid concurrency, the forked process which can create the file will proceed (using flags wx)
const writeStream = fs.createWriteStream(targetFilePath, {mode: options.mode, flags: "wx"})
// wait until writable stream is valid before proceeding to read
.on("open", () => {
Expand All @@ -44,49 +43,45 @@ const moveAndMaybeCompressFile = async (
})
.on("error", (e) => {
debug(`moveAndMaybeCompressFile: error reading ${sourceFilePath}`, e);
error = e;
// manually close writable: https://nodejs.org/api/stream.html#readablepipedestination-options
writeStream.close();
writeStream.destroy(e);
});
})
// wait until writable stream finishes (called even if closed prematurely) before deleting/truncating
.on("finish", () => {
if (error) {
debug(`moveAndMaybeCompressFile: error compressing ${targetFilePath}, deleting ${targetFilePath}`);
debug(`moveAndMaybeCompressFile: finished compressing ${targetFilePath}, deleting ${sourceFilePath}`);
// delete sourceFilePath
fs.unlink(sourceFilePath)
.then(resolve)
.catch((e) => {
debug(`moveAndMaybeCompressFile: error deleting ${sourceFilePath}, truncating instead`, e);
// fallback to truncate
fs.truncate(sourceFilePath)
.then(resolve)
.catch((e) => {
debug(`moveAndMaybeCompressFile: error truncating ${sourceFilePath}`, e);
reject(e);
});
});
})
.on("error", (e) => {
if (e.code === 'EEXIST') {
debug(`moveAndMaybeCompressFile: error creating ${targetFilePath}`, e);
// do not do anything if handled by another forked process
reject(e);
} else {
debug(`moveAndMaybeCompressFile: error writing ${targetFilePath}, deleting`, e);
// delete targetFilePath (taking as nothing happened)
fs.unlink(targetFilePath)
.then(() => { reject(error); })
.then(() => { reject(e); })
.catch((e) => {
debug(`moveAndMaybeCompressFile: error deleting ${targetFilePath}`);
debug(`moveAndMaybeCompressFile: error deleting ${targetFilePath}`, e);
reject(e);
});
} else {
debug(`moveAndMaybeCompressFile: finished compressing ${targetFilePath}, deleting ${sourceFilePath}`);
// delete sourceFilePath
fs.unlink(sourceFilePath)
.then(resolve)
.catch((e) => {
debug(`moveAndMaybeCompressFile: error deleting ${sourceFilePath}, truncating instead`, e);
// fallback to truncate
fs.truncate(sourceFilePath)
.then(resolve)
.catch((e) => {
debug(`moveAndMaybeCompressFile: error truncating ${sourceFilePath}`, e);
reject(e);
});
});
}
})
.on("error", (e) => {
debug(`moveAndMaybeCompressFile: error writing ${targetFilePath}`, e);
error = e;
reject(e);
});
}).catch(() => {});
} else {
debug(
`moveAndMaybeCompressFile: renaming ${sourceFilePath} to ${targetFilePath}`
);
debug(`moveAndMaybeCompressFile: renaming ${sourceFilePath} to ${targetFilePath}`);
try {
await fs.move(sourceFilePath, targetFilePath, { overwrite: true });
} catch (e) {
Expand Down
34 changes: 25 additions & 9 deletions test/moveAndMaybeCompressFile-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,43 +70,59 @@ describe('moveAndMaybeCompressFile', () => {
const destination = path.join(TEST_DIR, 'moved-test.log.gz');
await fs.outputFile(source, 'This is the test file.');
// simulate another process has already started writing the destination file
await fs.outputFile(destination, 'This is the compressed file.');
await fs.outputFile(destination, 'Compressed file.');
const options = {compress: true};
await moveAndMaybeCompressFile(source, destination, options);

(await fs.readFile(source, 'utf8')).should.equal('This is the test file.');
(await fs.readFile(destination, 'utf8')).should.equal('This is the compressed file.');
(await fs.readFile(source, 'utf8')).should.equal('This is the test file.', 'source file should remain intact');
(await fs.readFile(destination, 'utf8')).should.equal('Compressed file.', 'destination file should remain');
});

it('should remove destination file if readstream error', async () => {
const moveWithMock = proxyquire('../lib/moveAndMaybeCompressFile', {
"fs-extra": {
pathExists: () => Promise.resolve(true)
createReadStream: (...args) => {
if (args[0]) {
// replace test.log with a non-existent file to simulate readstream error
args[0] = args[0].replace(new RegExp('test.log' + '$'), 'non-exist.log');
}
return fs.createReadStream(...args)
}
}
});

const source = path.join(TEST_DIR, 'test.log');
const destination = path.join(TEST_DIR, 'moved-test.log.gz');
await fs.outputFile(source, 'This is the test file.');
const options = {compress: true};
await moveWithMock(source, destination, options);

(await fs.pathExists(destination)).should.be.false();
(await fs.readFile(source, 'utf8')).should.equal('This is the test file.', 'source file should remain intact');
(await fs.pathExists(destination)).should.be.false('destination file should be removed');
});

it('should have an empty destination file if readstream error and remove fails', async () => {
it('should have destination file if readstream error and remove fails', async () => {
const moveWithMock = proxyquire('../lib/moveAndMaybeCompressFile', {
"fs-extra": {
pathExists: () => Promise.resolve(true),
createReadStream: (...args) => {
if (args[0]) {
// replace test.log with a non-existent file to simulate readstream error
args[0] = args[0].replace(new RegExp('test.log' + '$'), 'non-exist.log');
}
return fs.createReadStream(...args)
},
unlink: () => Promise.reject({ code: 'EBUSY', message: 'all gone wrong'}),
}
});

const source = path.join(TEST_DIR, 'test.log');
const destination = path.join(TEST_DIR, 'moved-test.log.gz');
await fs.outputFile(source, 'This is the test file.');
const options = {compress: true};
await moveWithMock(source, destination, options);

(await fs.readFile(destination, 'utf8')).should.equal('');
(await fs.readFile(source, 'utf8')).should.equal('This is the test file.', 'source file should remain intact');
(await fs.readFile(destination, 'utf8')).should.equal('', 'destination file should remain');
});

it('should use copy+truncate if source file is locked (windows)', async () => {
Expand Down Expand Up @@ -184,7 +200,7 @@ describe('moveAndMaybeCompressFile', () => {
contents.should.equal('This is the test file.');

// won't delete or truncate the source
(await fs.readFile(source, 'utf8')).should.equal('This is the test file.');
(await fs.readFile(source, 'utf8')).should.equal('This is the test file.', 'source file should remain intact');
});

it('should compress the source file at the new destination with 0o744 rights', async () => {
Expand Down