From f10fe55e2e341ac2d151b99d26f47a52e9de9f2e Mon Sep 17 00:00:00 2001 From: Tiago Costa Date: Wed, 19 Dec 2018 11:49:45 +0000 Subject: [PATCH] fix: memory leaks, worker and main process lifecycles (#51) --- package-lock.json | 94 ++++++++++++++++++++++++++++++++++++++++- src/WorkerPool.js | 76 ++++++++++++++++++++++++++++++--- src/readBuffer.js | 24 +++-------- src/worker.js | 94 +++++++++++++++++++++++++++++++++++------ test/readBuffer.test.js | 14 +++--- test/workerPool.test.js | 7 +-- 6 files changed, 264 insertions(+), 45 deletions(-) diff --git a/package-lock.json b/package-lock.json index 5862be8..a66ec9f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4205,6 +4205,14 @@ "null-check": "^1.0.0" } }, + "fs-minipass": { + "version": "1.2.5", + "resolved": "https://registry.npmjs.org/fs-minipass/-/fs-minipass-1.2.5.tgz", + "integrity": "sha512-JhBl0skXjUPCFH7x6x61gQxrKyXsxB5gcgePLZCwfyCGGsTISMoIeObbrvVeP6Xmyaudw4TT43qV2Gz+iyd2oQ==", + "requires": { + "minipass": "^2.2.1" + } + }, "fs-readdir-recursive": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs-readdir-recursive/-/fs-readdir-recursive-1.0.0.tgz", @@ -4492,6 +4500,9 @@ "integrity": "sha512-O+v1r9yN4tOsvl90p5HAP4AEqbYhx4036AGMm075fH9F8Qwi3oJ+v4u50FkT/KkvywNGtwkk0zRI+8eYm1X/xg==", "requires": { "chownr": "^1.0.1", + "fs-minipass": "^1.2.5", + "minipass": "^2.2.4", + "minizlib": "^1.1.0", "mkdirp": "^0.5.0", "safe-buffer": "^5.1.1", "yallist": "^3.0.2" @@ -8037,6 +8048,35 @@ "is-plain-obj": "^1.1.0" } }, + "minipass": { + "version": "2.3.5", + "resolved": "https://registry.npmjs.org/minipass/-/minipass-2.3.5.tgz", + "integrity": "sha512-Gi1W4k059gyRbyVUZQ4mEqLm0YIUiGYfvxhF6SIlk3ui1WVxMTGfGdQ2SInh3PDrRTVvPKgULkpJtT4RH10+VA==", + "requires": { + "safe-buffer": "^5.1.2", + "yallist": "^3.0.0" + }, + "dependencies": { + "safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + }, + "yallist": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/yallist/-/yallist-3.0.3.tgz", + "integrity": "sha512-S+Zk8DEWE6oKpV+vI3qWkaK+jSbIK86pCwe2IF/xwIpQ8jEuxpw9NyaGjmp9+BoJv5FV2piqCDcoCtStppiq2A==" + } + } + }, + "minizlib": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/minizlib/-/minizlib-1.2.1.tgz", + "integrity": "sha512-7+4oTUOWKg7AuL3vloEWekXY2/D20cevzsrNT2kGWm+39J9hGTCBv8VI5Pm5lXZ/o3/mdR4f8rflAPhnQb8mPA==", + "requires": { + "minipass": "^2.2.1" + } + }, "mississippi": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/mississippi/-/mississippi-3.0.0.tgz", @@ -8362,11 +8402,56 @@ "detect-libc": "^1.0.2", "mkdirp": "^0.5.1", "needle": "^2.2.0", + "nopt": "^4.0.1", "npm-packlist": "^1.1.6", "npmlog": "^4.0.2", "rc": "^1.1.7", "rimraf": "^2.6.1", - "semver": "^5.3.0" + "semver": "^5.3.0", + "tar": "^4" + }, + "dependencies": { + "nopt": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/nopt/-/nopt-4.0.1.tgz", + "integrity": "sha1-0NRoWv1UFRk8jHUFYC0NF81kR00=", + "dev": true, + "optional": true, + "requires": { + "abbrev": "1", + "osenv": "^0.1.4" + } + }, + "safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", + "dev": true, + "optional": true + }, + "tar": { + "version": "4.4.8", + "resolved": "https://registry.npmjs.org/tar/-/tar-4.4.8.tgz", + "integrity": "sha512-LzHF64s5chPQQS0IYBn9IN5h3i98c12bo4NCO7e0sGM2llXQ3p2FGC5sdENN4cTW48O915Sh+x+EXx7XW96xYQ==", + "dev": true, + "optional": true, + "requires": { + "chownr": "^1.1.1", + "fs-minipass": "^1.2.5", + "minipass": "^2.3.4", + "minizlib": "^1.1.1", + "mkdirp": "^0.5.0", + "safe-buffer": "^5.1.2", + "yallist": "^3.0.2" + } + }, + "yallist": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/yallist/-/yallist-3.0.3.tgz", + "integrity": "sha512-S+Zk8DEWE6oKpV+vI3qWkaK+jSbIK86pCwe2IF/xwIpQ8jEuxpw9NyaGjmp9+BoJv5FV2piqCDcoCtStppiq2A==", + "dev": true, + "optional": true + } } }, "node-sass": { @@ -9766,6 +9851,7 @@ "dev": true, "requires": { "is-number": "^4.0.0", + "kind-of": "^6.0.0", "math-random": "^1.0.1" }, "dependencies": { @@ -9774,6 +9860,12 @@ "resolved": "https://registry.npmjs.org/is-number/-/is-number-4.0.0.tgz", "integrity": "sha512-rSklcAIlf1OmFdyAqbnWTLVelsQ58uvZ66S/ZyawjWqIviTWCjg2PzVGw8WUA+nNuPTqb4wgA+NszrJ+08LlgQ==", "dev": true + }, + "kind-of": { + "version": "6.0.2", + "resolved": "https://registry.npmjs.org/kind-of/-/kind-of-6.0.2.tgz", + "integrity": "sha512-s5kLOcnH0XqDO+FvuaLX8DDjZ18CGFk7VygH40QoKPUQhW4e2rvM0rwUq0t8IQDOwYSeLK01U90OjzBTme2QqA==", + "dev": true } } }, diff --git a/src/WorkerPool.js b/src/WorkerPool.js index a578b95..c462b5d 100644 --- a/src/WorkerPool.js +++ b/src/WorkerPool.js @@ -12,6 +12,7 @@ let workerId = 0; class PoolWorker { constructor(options, onJobDone) { + this.disposed = false; this.nextJobId = 0; this.jobs = Object.create(null); this.activeJobs = 0; @@ -19,9 +20,12 @@ class PoolWorker { this.id = workerId; workerId += 1; this.worker = childProcess.spawn(process.execPath, [].concat(options.nodeArgs || []).concat(workerPath, options.parallelJobs), { - stdio: ['ignore', 1, 2, 'pipe', 'pipe'], + detached: true, + stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe'], }); + this.worker.unref(); + // This prevents a problem where the worker stdio can be undefined // when the kernel hits the limit of open files. // More info can be found on: https://github.com/webpack-contrib/thread-loader/issues/2 @@ -33,9 +37,42 @@ class PoolWorker { const [, , , readPipe, writePipe] = this.worker.stdio; this.readPipe = readPipe; this.writePipe = writePipe; + this.listenStdOutAndErrFromWorker(this.worker.stdout, this.worker.stderr); this.readNextMessage(); } + listenStdOutAndErrFromWorker(workerStdout, workerStderr) { + if (workerStdout) { + workerStdout.on('data', this.writeToStdout); + } + + if (workerStderr) { + workerStderr.on('data', this.writeToStderr); + } + } + + ignoreStdOutAndErrFromWorker(workerStdout, workerStderr) { + if (workerStdout) { + workerStdout.removeListener('data', this.writeToStdout); + } + + if (workerStderr) { + workerStderr.removeListener('data', this.writeToStderr); + } + } + + writeToStdout(data) { + if (!this.disposed) { + process.stdout.write(data); + } + } + + writeToStderr(data) { + if (!this.disposed) { + process.stderr.write(data); + } + } + run(data, callback) { const jobId = this.nextJobId; this.nextJobId += 1; @@ -64,9 +101,7 @@ class PoolWorker { } writeEnd() { - const lengthBuffer = new Buffer(4); - lengthBuffer.writeInt32BE(0, 0); - this.writePipe.write(lengthBuffer); + this.writePipe.write(Buffer.alloc(0)); } readNextMessage() { @@ -78,6 +113,7 @@ class PoolWorker { } this.state = 'length read'; const length = lengthBuffer.readInt32BE(0); + this.state = 'read message'; this.readBuffer(length, (messageError, messageBuffer) => { if (messageError) { @@ -201,7 +237,11 @@ class PoolWorker { } dispose() { - this.writeEnd(); + if (!this.disposed) { + this.disposed = true; + this.ignoreStdOutAndErrFromWorker(this.worker.stdout, this.worker.stderr); + this.writeEnd(); + } } } @@ -216,6 +256,32 @@ export default class WorkerPool { this.activeJobs = 0; this.timeout = null; this.poolQueue = asyncQueue(this.distributeJob.bind(this), options.poolParallelJobs); + this.terminated = false; + + this.setupLifeCycle(); + } + + terminate() { + if (!this.terminated) { + this.terminated = true; + + this.poolQueue.kill(); + this.disposeWorkers(); + } + } + + setupLifeCycle() { + process.on('SIGTERM', () => { + this.terminate(); + }); + + process.on('SIGINT', () => { + this.terminate(); + }); + + process.on('exit', () => { + this.terminate(); + }); } run(data, callback) { diff --git a/src/readBuffer.js b/src/readBuffer.js index 0285e57..9dec250 100644 --- a/src/readBuffer.js +++ b/src/readBuffer.js @@ -1,16 +1,14 @@ export default function readBuffer(pipe, length, callback) { if (length === 0) { - callback(null, new Buffer(0)); + callback(null, Buffer.alloc(0)); return; } - let terminated = false; + let remainingLength = length; const buffers = []; + const readChunk = () => { const onChunk = (arg) => { - if (terminated) { - return; - } let chunk = arg; let overflow; if (chunk.length > remainingLength) { @@ -22,26 +20,18 @@ export default function readBuffer(pipe, length, callback) { } buffers.push(chunk); if (remainingLength === 0) { - pipe.pause(); pipe.removeListener('data', onChunk); + pipe.pause(); + if (overflow) { pipe.unshift(overflow); } - terminated = true; + callback(null, Buffer.concat(buffers, length)); } }; - const onEnd = () => { - if (terminated) { - return; - } - terminated = true; - const err = new Error(`Stream ended ${remainingLength.toString()} bytes prematurely`); - err.name = 'EarlyEOFError'; - callback(err); - }; + pipe.on('data', onChunk); - pipe.on('end', onEnd); pipe.resume(); }; readChunk(); diff --git a/src/worker.js b/src/worker.js index ca67025..352aaa9 100644 --- a/src/worker.js +++ b/src/worker.js @@ -9,14 +9,66 @@ import readBuffer from './readBuffer'; const writePipe = fs.createWriteStream(null, { fd: 3 }); const readPipe = fs.createReadStream(null, { fd: 4 }); -writePipe.on('error', console.error.bind(console)); -readPipe.on('error', console.error.bind(console)); +writePipe.on('finish', onTerminateWrite); +readPipe.on('end', onTerminateRead); +writePipe.on('close', onTerminateWrite); +readPipe.on('close', onTerminateRead); + +readPipe.on('error', onError); +writePipe.on('error', onError); const PARALLEL_JOBS = +process.argv[2]; +let terminated = false; let nextQuestionId = 0; const callbackMap = Object.create(null); +function onError(error) { + console.error(error); +} + +function onTerminateRead() { + terminateRead(); +} + +function onTerminateWrite() { + terminateWrite(); +} + +function writePipeWrite(...args) { + if (!terminated) { + writePipe.write(...args); + } +} + +function writePipeCork() { + if (!terminated) { + writePipe.cork(); + } +} + +function writePipeUncork() { + if (!terminated) { + writePipe.uncork(); + } +} + +function terminateRead() { + terminated = true; + this.writePipe.write(Buffer.alloc(0)); + readPipe.removeAllListeners(); +} + +function terminateWrite() { + terminated = true; + writePipe.removeAllListeners(); +} + +function terminate() { + terminateRead(); + terminateWrite(); +} + function toErrorObj(err) { return { message: err.message, @@ -35,13 +87,17 @@ function toNativeError(obj) { } function writeJson(data) { - writePipe.cork(); - process.nextTick(() => writePipe.uncork()); - const lengthBuffer = new Buffer(4); - const messageBuffer = new Buffer(JSON.stringify(data), 'utf-8'); + writePipeCork(); + process.nextTick(() => { + writePipeUncork(); + }); + + const lengthBuffer = Buffer.alloc(4); + const messageBuffer = Buffer.from(JSON.stringify(data), 'utf-8'); lengthBuffer.writeInt32BE(messageBuffer.length, 0); - writePipe.write(lengthBuffer); - writePipe.write(messageBuffer); + + writePipeWrite(lengthBuffer); + writePipeWrite(messageBuffer); } const queue = asyncQueue(({ id, data }, taskCallback) => { @@ -135,7 +191,7 @@ const queue = asyncQueue(({ id, data }, taskCallback) => { data: buffersToSend.map(buffer => buffer.length), }); buffersToSend.forEach((buffer) => { - writePipe.write(buffer); + writePipeWrite(buffer); }); setImmediate(taskCallback); }); @@ -149,6 +205,13 @@ const queue = asyncQueue(({ id, data }, taskCallback) => { } }, PARALLEL_JOBS); +function dispose() { + terminate(); + + queue.kill(); + process.exit(0); +} + function onMessage(message) { try { const { type, id } = message; @@ -191,19 +254,26 @@ function readNextMessage() { console.error(`Failed to communicate with main process (read length) ${lengthReadError}`); return; } - const length = lengthBuffer.readInt32BE(0); + + const length = lengthBuffer.length && lengthBuffer.readInt32BE(0); + if (length === 0) { - // worker should exit - process.exit(0); + // worker should dispose and exit + dispose(); return; } readBuffer(readPipe, length, (messageError, messageBuffer) => { + if (terminated) { + return; + } + if (messageError) { console.error(`Failed to communicate with main process (read message) ${messageError}`); return; } const messageString = messageBuffer.toString('utf-8'); const message = JSON.parse(messageString); + onMessage(message); setImmediate(() => readNextMessage()); }); diff --git a/test/readBuffer.test.js b/test/readBuffer.test.js index 65715a6..0c7a321 100644 --- a/test/readBuffer.test.js +++ b/test/readBuffer.test.js @@ -24,8 +24,8 @@ test('data is read', (done) => { readBuffer.default(mockEventStream, 8, cb); }); -test('EOF returned for early quit', (done) => { - expect.assertions(2); +test('no data is read when early quit but no error is thrown', (done) => { + expect.assertions(1); let eventCount = 0; function read() { eventCount += 1; @@ -38,10 +38,10 @@ test('EOF returned for early quit', (done) => { objectMode: true, read, }); - function cb(err) { - expect(err.name).toBe('EarlyEOFError'); - expect(err.message).toBe('Stream ended 3 bytes prematurely'); - done(); - } + + const cb = jest.fn(); readBuffer.default(mockEventStream, 8, cb); + + expect(cb).not.toHaveBeenCalled(); + done(); }); diff --git a/test/workerPool.test.js b/test/workerPool.test.js index 6f3ae2b..f13aae2 100644 --- a/test/workerPool.test.js +++ b/test/workerPool.test.js @@ -5,15 +5,15 @@ import WorkerPool from '../src/WorkerPool'; jest.mock('child_process', () => { return { spawn: jest.fn(() => { - return {}; + return { + unref: jest.fn(), + }; }), }; }); describe('workerPool', () => { it('should throw an error when worker.stdio is undefined', () => { - childProcess.spawn.mockImplementationOnce(() => { return {}; }); - const workerPool = new WorkerPool({}); expect(() => workerPool.createWorker()).toThrowErrorMatchingSnapshot(); expect(() => workerPool.createWorker()).toThrowError('Please verify if you hit the OS open files limit'); @@ -23,6 +23,7 @@ describe('workerPool', () => { childProcess.spawn.mockImplementationOnce(() => { return { stdio: new Array(5).fill(new stream.PassThrough()), + unref: jest.fn(), }; });