Skip to content

Commit

Permalink
disable worker task queueing and address a probable memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
ansuz committed May 5, 2020
1 parent 6f1b04c commit 9f1f01f
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions lib/workers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const Workers = module.exports;
const PID = process.pid;

const DB_PATH = 'lib/workers/db-worker';
const MAX_JOBS = 8;
const MAX_JOBS = 16;

Workers.initialize = function (Env, config, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
Expand All @@ -35,6 +35,11 @@ Workers.initialize = function (Env, config, _cb) {
return response.expected(id)? guid(): id;
};

const countWorkerTasks = function (/* index */) {
return 0; // XXX this disables all queueing until it can be proven correct
//return Object.keys(workers[index].tasks || {}).length;
};

var workerOffset = -1;
var queue = [];
var getAvailableWorkerIndex = function () {
Expand Down Expand Up @@ -67,7 +72,8 @@ Workers.initialize = function (Env, config, _cb) {
but this is a relatively easy way to make sure it's always up to date.
We'll see how it performs in practice before optimizing.
*/
if (workers[temp] && Object.keys(workers[temp].tasks || {}).length < MAX_JOBS) {

if (workers[temp] && countWorkerTasks(temp) <= MAX_JOBS) {
return temp;
}
}
Expand Down Expand Up @@ -96,15 +102,23 @@ Workers.initialize = function (Env, config, _cb) {
return;
}

var cb = Util.once(Util.mkAsync(_cb));

const txid = guid();
msg.txid = txid;
msg.pid = PID;

// track which worker is doing which jobs
state.tasks[txid] = msg;

var cb = Util.once(Util.mkAsync(Util.both(_cb, function (err /*, value */) {
if (err !== 'TIMEOUT') { return; }
// in the event of a timeout the user will receive an error
// but the state used to resend a query in the event of a worker crash
// won't be cleared. This also leaks a slot that could be used to keep
// an upper bound on the amount of parallelism for any given worker.
// if you run out of slots then the worker locks up.
delete state.tasks[txid];
})));

response.expect(txid, cb, 180000);
state.worker.send(msg);
};
Expand Down

0 comments on commit 9f1f01f

Please sign in to comment.