Skip to content

Commit

Permalink
added throttle work and completed work, work timeout, cleaned-up code…
Browse files Browse the repository at this point in the history
…, improved performance and updated tests to use Async BDD
  • Loading branch information
Roger Castillo committed May 14, 2012
1 parent 6560f7c commit 11c6a68
Show file tree
Hide file tree
Showing 15 changed files with 464 additions and 651 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
@@ -0,0 +1,3 @@
language: node_js
node_js:
- 0.6
32 changes: 16 additions & 16 deletions lib/channels.js
Expand Up @@ -15,24 +15,24 @@ exports.CLOSE_MSG = CLOSE_MSG;
*/
function createObservableChannel(ch, redisOpts) {
// subscribtion have to create their own
// redis connection as the connection can
// only be used for pub/sub when 'SUBSCRIBE'
// is invoked
var client = redisOpts ?
redis.createClient(redisOpts.port, redisOpts.host, redisOpts):
redis.createClient();
// redis connection as the connection can
// only be used for pub/sub when 'SUBSCRIBE'
// is invoked
var client = redisOpts ?
redis.createClient(redisOpts.port, redisOpts.host, redisOpts):
redis.createClient();

return Rx.Observable.Create(function(obs){
client.on("message", function(channel, msg) {
if (msg == CLOSE_MSG){
obs.OnCompleted();
} else {
obs.OnNext(msg);
}
});
client.subscribe(ch);
return function(){client.end();};
return Rx.Observable.Create(function(obs){
client.on("message", function(channel, msg) {
if (msg == CLOSE_MSG){
obs.OnCompleted();
} else {
obs.OnNext(msg);
}
});
client.subscribe(ch);
return function(){client.end();};
});
}

exports.createObservableChannel = createObservableChannel;
Expand Down
125 changes: 80 additions & 45 deletions lib/qrx.js
Expand Up @@ -9,6 +9,24 @@ var WorkItem = require('./workitem').WorkItem,
newRedisClient = require('./redisutils').newRedisClient,
ns = require('./messaging').ns;

/**
* Wraps a node style callback in a timeout timer
* @param {function} callback
* @param {Integer} delay
*/
function callbackWithTimeout(callback, delay) {
// light the fuse
var timeoutId = setTimeout(function(){
timedCallback('timeout', null);
}, delay);

var timedCallback = function(err, result) {
// cut the red wire
clearTimeout(timeoutId);
callback(err, result);
};
return timedCallback;
}

/**
* Creates a 'complete' callback function for completed work
Expand All @@ -31,29 +49,21 @@ function completeCallbackFn(queue, workItem, scheduleNextWorkFn){
* @param {Object} [redisOpts] redis connection options
* @param {Integer} throttle max amount of in-flight work to any worker
*/
function WorkQueueRx(qname, redisOpts, throttle) {
function WorkQueueRx(qname, redisOpts, throttle, completedThrottle, workTimeout) {

// two clients are required to avoid blocking conditions
var client = newRedisClient(redisOpts);

this.throttle = throttle || 1; // default only 1 in flight at any given time
// max time for each work item to complete
this.workTimeout = workTimeout;

// count of work in flight
this.workInFlight = 0;
// default only 1 in flight at any given time per worker
this.throttle = throttle || 1;

this.getWorkInFlight = function(){
return this.workInFlight;
}

this.incWorkInFlight = function(){
this.workInFlight++;
}

this.decWorkInFlight = function(){
this.workInFlight--;
}

this.completedWorkItems = {};
// ToDo: This assumes 1 subscriber for completed work
this.completedInFlight = 0;
this.completedThrottle = completedThrottle || 1;


// master-side
Expand All @@ -79,16 +89,20 @@ function WorkQueueRx(qname, redisOpts, throttle) {
/**
* Adds work to queue for workers
* @param {Object} work
* @param {function} callback
*/
this.enqueue = function(work) {
this.enqueue = function(work, callback) {
// push onto the queue
// work contains by name, where completed work should
var workItem = new WorkItem(work, this.completedQueueName);
pending.enqueue(workItem, function(err, workItem){
var defaultCallback = function(err, workItem){
if (err){
throw err;
}
});
};

callback = callback || defaultCallback;
var workItem = new WorkItem(work, this.completedQueueName);
pending.enqueue(workItem, callback);
};

/**
Expand All @@ -115,26 +129,32 @@ function WorkQueueRx(qname, redisOpts, throttle) {
*/
this.completedObservable = function() {
var self = this;
var completed = new WorkQueue(this.completedQueueName, redisOpts);
return Rx.Observable.Create(function(obs){
var completedDrain = completed.blockingDrainRx(0,0);
var heartBeat = Rx.Observable.Interval(1000);
var subs = heartBeat.Merge(completedDrain).Subscribe(function(r){
if (!(r instanceof Object)){
// beat
if (self.queueStopped) {
// signal completed if stopped
obs.OnCompleted();
}
} else {
obs.OnNext(r);
var rc = newRedisClient(redisOpts);
var pending = new WorkQueue(this.completedQueueName, redisOpts);

return Rx.Observable.Create(function(obs) {
function getNextCompleted() {
while(self.completedInFlight < self.completedThrottle){
self.completedInFlight++
pending.blockingDequeue(rc, 0, function(err, completedWorkItem){
if (!err){
obs.OnNext(completedWorkItem);
self.completedInFlight--;
if (!self.queueStopped){
Rx.Observable.Start(getNextCompleted);
} else {
obs.OnCompleted();
}
} else {
obs.OnError(err);
}
});
}
},
function(err){
obs.OnError(err);
});
return function(){subs.Dispose();};
}
Rx.Observable.Start(getNextCompleted);
return function(){rc.quit();};
});

};

/////////////////
Expand All @@ -146,15 +166,24 @@ function WorkQueueRx(qname, redisOpts, throttle) {
* with every Rx subscription creating a new reactive work scream
* @return {Rx.Observable} stream of work to do
*/
this.workObservable = function(){
this.workObservable = function() {
// immediately 'complete' any workers who subscribe
// to a stopped queue
if (this.queueStopped) {
return Rx.Observable.Empty();
}
var self = this;
var rc = newRedisClient(redisOpts);
rc.incr(this.workerCount);
return Rx.Observable.Create(function(obs){

// each work subscription keeps track of how much work
// is in flight
var workInFlight = 0;
return Rx.Observable.Create(function(obs) {
function getNextWork(){
// this will set n callbacks where n == throttle
while (self.getWorkInFlight() < self.throttle){
self.incWorkInFlight();
while (workInFlight < self.throttle){
workInFlight++;
pending.blockingDequeue(rc, 0, function(err, workItem){
if (workItem.work != STOP_MESSAGE) {
// add it to the working set
Expand All @@ -163,8 +192,12 @@ function WorkQueueRx(qname, redisOpts, throttle) {
// dispatch to the worker
// workers get only the information they need
var workObj = {work: workItem.work,
callback:completeCallbackFn(self, workItem,
function(){Rx.Observable.Start(getNextWork);})};
callback:callbackWithTimeout(completeCallbackFn(self,
workItem,
function() {
workInFlight--;
Rx.Observable.Start(getNextWork);}
), self.workTimeout)};
// catch any work exception
try {
obs.OnNext(workObj);
Expand Down Expand Up @@ -204,13 +237,15 @@ function WorkQueueRx(qname, redisOpts, throttle) {
// deliever the complted work item
client.lpush(workItem.completedWorkQueue, JSON.stringify(completedWorkItem), function(err, length){
if (!err) {
self.decWorkInFlight();
scheduleNextWorkFn();
}
});
} else {
// redis err
// throw
if (!err){
//console.warn('warn: Multiple calls to mark completed');
}
}
});
};
Expand Down
61 changes: 0 additions & 61 deletions lib/workqueue.js
Expand Up @@ -131,9 +131,6 @@ function WorkQueue(qname, redisOpts) {
client.del(this.q, callback);
}




/**
* Returns an observable to the head of a q or Empty
* @return {Rx.Observable} of WorkItem(s)
Expand All @@ -156,64 +153,6 @@ function WorkQueue(qname, redisOpts) {
});
}

/**
* Returns an observable of the head blocking the
* connection until a head is available or timeout
* @param {redis.RedisClient} client
* @param {Integer} [timeout] in ms, infinite if not spec'd
*/
this.blockingDequeueRx = function(timeout){
var self = this;
var rc = newRedisClient(redisOpts);
// create a new connection to block for the
// the result
return Rx.Observable.Create(function(obs){
self.blockingDequeue(rc, timeout || 0, function(err, result){
if (!err && result){
obs.OnNext(result);
obs.OnCompleted();
} else {
obs.OnError(err);
}
});
return function(){
// kill the redis connection, which breaks the block
rc.quit();
};
});
}

/**
* Infinite drain of WorkQueue, which only completes, when timeout is
* exceeeded
* blocking WorkQueue connection
* @param {Integer} [timeout] in ms for block or null if infinite
* @param {Integer} [throttle] in ms to space dequeues over time
* return {Rx.Observable}
*/
this.blockingDrainRx = function(timeout, throttle) {
// create new connection bound into the Observable
var self = this;
var rc = newRedisClient(redisOpts);
return Rx.Observable.Create(function(obs){
var intervalSubs = null;
intervalSubs = Rx.Observable.Interval(throttle || 0).Subscribe(function(_){
self.blockingDequeue(rc, timeout || 0, function(err, result){
if (result){
obs.OnNext(result);
} else if (!err) {
obs.OnCompleted();
} else {
obs.OnError(err);
}
});
});
return function(){
rc.quit();
intervalSubs.Dispose();
};
});
}

}

Expand Down
2 changes: 1 addition & 1 deletion lib/workset.js
Expand Up @@ -40,7 +40,7 @@ function WorkSet(setName, redisOpts) {
/**
* @return {number} number of members
*/
this.setCount = function(callback){
this.size = function(callback){
client.hlen(s,callback)
}

Expand Down
12 changes: 11 additions & 1 deletion package.json
Expand Up @@ -18,7 +18,17 @@
"underscore": "~1.3.1"
},
"devDependencies": {
"expresso": "~0.9.2"
"expresso": "~0.9.2",
"mocha": "git://github.com/domenic/mocha.git#promises",
"chai": "0.5.3",
"chai-as-promised": "2.2.0",
"sinon-chai": "1.3.1",
"sinon": "1.3.4",
"q": "0.8.4"

},
"scripts": {
"test": "./node_modules/mocha/bin/mocha $(find test -type f)"
},
"engines": {
"node": ">= 0.4.11-pre"
Expand Down
29 changes: 29 additions & 0 deletions test/channel-spec.js
@@ -0,0 +1,29 @@
var expect = require('chai').expect,
sinon = require('sinon'),
Q = require('q'),
Rx = require('rx').Rx,
uuid = require('node-uuid');

var Channel = require('../lib/channels.js').Channel;


describe('channels', function(){
it('should be able to send messages and receive them by subscription', function(){
var testChannel = new Channel(uuid.v1());
var deferredMessage = Q.defer();
var deferredClose = Q.defer();
testChannel.asObservable().Subscribe(function(message) {
expect(message).to.equal('foo');
testChannel.close();
deferredMessage.resolve(message);
},
function(exn){},
function(){
deferredClose.resolve(true);
});
return Q.all([Q.node(testChannel.sendMessage, testChannel, 'foo')(),
deferredMessage.promise,
deferredClose.promise]);
})
});

0 comments on commit 11c6a68

Please sign in to comment.