diff --git a/lib/moveAndMaybeCompressFile.js b/lib/moveAndMaybeCompressFile.js index 69240ae..5646712 100644 --- a/lib/moveAndMaybeCompressFile.js +++ b/lib/moveAndMaybeCompressFile.js @@ -32,8 +32,7 @@ const moveAndMaybeCompressFile = async ( ); if (options.compress) { await new Promise((resolve, reject) => { - let error = false; - // to avoid concurrency, the process which can create the file will proceed (using flags wx) + // to avoid concurrency, the forked process which can create the file will proceed (using flags wx) const writeStream = fs.createWriteStream(targetFilePath, {mode: options.mode, flags: "wx"}) // wait until writable stream is valid before proceeding to read .on("open", () => { @@ -44,49 +43,45 @@ const moveAndMaybeCompressFile = async ( }) .on("error", (e) => { debug(`moveAndMaybeCompressFile: error reading ${sourceFilePath}`, e); - error = e; // manually close writable: https://nodejs.org/api/stream.html#readablepipedestination-options - writeStream.close(); + writeStream.destroy(e); }); }) - // wait until writable stream finishes (called even if closed prematurely) before deleting/truncating .on("finish", () => { - if (error) { - debug(`moveAndMaybeCompressFile: error compressing ${targetFilePath}, deleting ${targetFilePath}`); + debug(`moveAndMaybeCompressFile: finished compressing ${targetFilePath}, deleting ${sourceFilePath}`); + // delete sourceFilePath + fs.unlink(sourceFilePath) + .then(resolve) + .catch((e) => { + debug(`moveAndMaybeCompressFile: error deleting ${sourceFilePath}, truncating instead`, e); + // fallback to truncate + fs.truncate(sourceFilePath) + .then(resolve) + .catch((e) => { + debug(`moveAndMaybeCompressFile: error truncating ${sourceFilePath}`, e); + reject(e); + }); + }); + }) + .on("error", (e) => { + if (e.code === 'EEXIST') { + debug(`moveAndMaybeCompressFile: error creating ${targetFilePath}`, e); + // do not do anything if handled by another forked process + reject(e); + } else { + debug(`moveAndMaybeCompressFile: error writing ${targetFilePath}, deleting`, e); // delete targetFilePath (taking as nothing happened) fs.unlink(targetFilePath) - .then(() => { reject(error); }) + .then(() => { reject(e); }) .catch((e) => { - debug(`moveAndMaybeCompressFile: error deleting ${targetFilePath}`); + debug(`moveAndMaybeCompressFile: error deleting ${targetFilePath}`, e); reject(e); }); - } else { - debug(`moveAndMaybeCompressFile: finished compressing ${targetFilePath}, deleting ${sourceFilePath}`); - // delete sourceFilePath - fs.unlink(sourceFilePath) - .then(resolve) - .catch((e) => { - debug(`moveAndMaybeCompressFile: error deleting ${sourceFilePath}, truncating instead`, e); - // fallback to truncate - fs.truncate(sourceFilePath) - .then(resolve) - .catch((e) => { - debug(`moveAndMaybeCompressFile: error truncating ${sourceFilePath}`, e); - reject(e); - }); - }); } - }) - .on("error", (e) => { - debug(`moveAndMaybeCompressFile: error writing ${targetFilePath}`, e); - error = e; - reject(e); }); }).catch(() => {}); } else { - debug( - `moveAndMaybeCompressFile: renaming ${sourceFilePath} to ${targetFilePath}` - ); + debug(`moveAndMaybeCompressFile: renaming ${sourceFilePath} to ${targetFilePath}`); try { await fs.move(sourceFilePath, targetFilePath, { overwrite: true }); } catch (e) { diff --git a/test/moveAndMaybeCompressFile-test.js b/test/moveAndMaybeCompressFile-test.js index e436590..f0a57d7 100644 --- a/test/moveAndMaybeCompressFile-test.js +++ b/test/moveAndMaybeCompressFile-test.js @@ -70,43 +70,59 @@ describe('moveAndMaybeCompressFile', () => { const destination = path.join(TEST_DIR, 'moved-test.log.gz'); await fs.outputFile(source, 'This is the test file.'); // simulate another process has already started writing the destination file - await fs.outputFile(destination, 'This is the compressed file.'); + await fs.outputFile(destination, 'Compressed file.'); const options = {compress: true}; await moveAndMaybeCompressFile(source, destination, options); - (await fs.readFile(source, 'utf8')).should.equal('This is the test file.'); - (await fs.readFile(destination, 'utf8')).should.equal('This is the compressed file.'); + (await fs.readFile(source, 'utf8')).should.equal('This is the test file.', 'source file should remain intact'); + (await fs.readFile(destination, 'utf8')).should.equal('Compressed file.', 'destination file should remain'); }); it('should remove destination file if readstream error', async () => { const moveWithMock = proxyquire('../lib/moveAndMaybeCompressFile', { "fs-extra": { - pathExists: () => Promise.resolve(true) + createReadStream: (...args) => { + if (args[0]) { + // replace test.log with a non-existent file to simulate readstream error + args[0] = args[0].replace(new RegExp('test.log' + '$'), 'non-exist.log'); + } + return fs.createReadStream(...args) + } } }); const source = path.join(TEST_DIR, 'test.log'); const destination = path.join(TEST_DIR, 'moved-test.log.gz'); + await fs.outputFile(source, 'This is the test file.'); const options = {compress: true}; await moveWithMock(source, destination, options); - (await fs.pathExists(destination)).should.be.false(); + (await fs.readFile(source, 'utf8')).should.equal('This is the test file.', 'source file should remain intact'); + (await fs.pathExists(destination)).should.be.false('destination file should be removed'); }); - it('should have an empty destination file if readstream error and remove fails', async () => { + it('should have destination file if readstream error and remove fails', async () => { const moveWithMock = proxyquire('../lib/moveAndMaybeCompressFile', { "fs-extra": { - pathExists: () => Promise.resolve(true), + createReadStream: (...args) => { + if (args[0]) { + // replace test.log with a non-existent file to simulate readstream error + args[0] = args[0].replace(new RegExp('test.log' + '$'), 'non-exist.log'); + } + return fs.createReadStream(...args) + }, unlink: () => Promise.reject({ code: 'EBUSY', message: 'all gone wrong'}), } }); const source = path.join(TEST_DIR, 'test.log'); const destination = path.join(TEST_DIR, 'moved-test.log.gz'); + await fs.outputFile(source, 'This is the test file.'); const options = {compress: true}; await moveWithMock(source, destination, options); - (await fs.readFile(destination, 'utf8')).should.equal(''); + (await fs.readFile(source, 'utf8')).should.equal('This is the test file.', 'source file should remain intact'); + (await fs.readFile(destination, 'utf8')).should.equal('', 'destination file should remain'); }); it('should use copy+truncate if source file is locked (windows)', async () => { @@ -184,7 +200,7 @@ describe('moveAndMaybeCompressFile', () => { contents.should.equal('This is the test file.'); // won't delete or truncate the source - (await fs.readFile(source, 'utf8')).should.equal('This is the test file.'); + (await fs.readFile(source, 'utf8')).should.equal('This is the test file.', 'source file should remain intact'); }); it('should compress the source file at the new destination with 0o744 rights', async () => {