Skip to content

Commit

Permalink
Avoid PRE_SLEEP unless idle; queue extra jobs
Browse files Browse the repository at this point in the history
Closes iarna#20
  • Loading branch information
streamcode9 authored and nponeccop committed May 15, 2016
1 parent e4f64d2 commit 193696c
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions worker.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"use strict";
var assert = require('assert');
var packet = require('gearman-packet');
var toBuffer = packet.Emitter.prototype.toBuffer;
var stream = require('readable-stream');
Expand All @@ -14,13 +15,17 @@ exports.__construct = function (init) {
this._activeJobs = {};
this._activeJobsCount = 0;

this._queue = [];
this._grabbingJob = 0;

this._clientId = null;

if (!this.options.maxJobs) {
this.options.maxJobs = 1;
}
if (!this.options.maxQueued) {
this.options.maxQueued = 1;
}
this.on('connect', function (self, conn) {
conn.socket.handleNoJob(function(data) {
self._grabbingJob --;
Expand All @@ -29,15 +34,19 @@ exports.__construct = function (init) {

conn.socket.handleNoOp(function(data) {
conn.socket.wakeup();
while (self.options.maxJobs > (self._activeJobsCount+self._grabbingJob)) {
while ((self.options.maxJobs+self.options.maxQueued) > (self._activeJobsCount+self._grabbingJob+self._queue.length)) {
self._grabbingJob ++;
conn.socket.grabJob();
}
});
if (! self._workersCount) return;
conn.socket.handleJobAssign(function(job) {
self._grabbingJob --;
self.dispatchWorker(job,conn.socket);
assert(self._queue.length < self.options.maxQueued);
self._queue.push(job);
if (self._activeJobsCount < self.options.maxJobs) {
self.dispatchWorker(conn.socket);
}
});
if (self._clientId) {
conn.socket.setClientId(self._clientId);
Expand Down Expand Up @@ -97,13 +106,11 @@ Worker.askForWork = function () {
Worker.startWork = function (jobid) {
this._activeJobs[jobid] = true;
++ this._activeJobsCount;
this.askForWork();
}

Worker.endWork = function (jobid) {
delete this._activeJobs[jobid];
-- this._activeJobsCount;
this.askForWork();
}

Worker.isRegistered = function (func) {
Expand Down Expand Up @@ -148,7 +155,12 @@ Worker.registerWorkerStream = function (func, options, handler) {
this.getConnectedServers().forEach(function(conn) {
conn.socket.handleJobAssign(function(job) {
self._grabbingJob --;
self.dispatchWorker(job,conn.socket);
assert(self._queue.length < self.options.maxQueued);
self._queue.push(job);
if (self._activeJobsCount < self.options.maxJobs)
{
self.dispatchWorker(conn.socket);
}
});
});
}
Expand Down Expand Up @@ -189,8 +201,9 @@ Worker.forgetAllWorkers = function () {
});
}

Worker.dispatchWorker = function (job,socket) {
Worker.dispatchWorker = function (socket) {
var self = this;
var job = self._queue.pop();
var jobid = job.args.job;
var worker = this._worker(job.args.function);
if (!worker.handler) throw Error('Assigned job for worker we no longer have');
Expand All @@ -214,6 +227,12 @@ Worker.dispatchWorker = function (job,socket) {
task.writer.once('end', function () {
self.endWork(jobid);
if (socket.connected) {
if (self._queue.length > 0) {
self.dispatchWorker(socket);
assert(self._activeJobsCount == self.options.maxJobs);
}
socket.grabJob();
self._grabbingJob ++;
socket.workComplete(jobid,task.lastChunk);
}
});
Expand All @@ -232,6 +251,12 @@ Worker.dispatchWorker = function (job,socket) {
self.endWork(jobid);
if (socket.connected) {
if (task.lastChunk) addToBuffer(task.lastChunk);
if (self._queue.length > 0) {
self.dispatchWorker(socket);
assert(self._activeJobsCount == self.options.maxJobs);
}
socket.grabJob();
self._grabbingJob ++;
socket.workComplete(jobid,buffer);
}
});
Expand Down

0 comments on commit 193696c

Please sign in to comment.