Skip to content

Commit

Permalink
updated doc and examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Roger Castillo committed May 16, 2012
1 parent 7ad0fc8 commit 7549931
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 28 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ npm install qrx

```javascript
// create a new queue with well known name
wq = new WorkQueueRx('test-wq');
wq = new WorkQueueRx();
// clear any pending work (optional)
wq.clear();

Expand All @@ -72,7 +72,7 @@ wq.completedObservable().Subscribe(function(completedWork){
*(From: /test/qrx-test.js)*

```javascript
var wqMaster = new WorkQueueRx('clean-test2');
var wqMaster = new WorkQueueRx({qname: 'clean-test2'});

var WORK_COUNT = 500;

Expand All @@ -83,13 +83,13 @@ for(var i=0; i < WORK_COUNT; i++){

// two slaves serving 1 master
var workReceived = 0;
var slave1 = new WorkQueueRx('clean-test2');
var slave1 = new WorkQueueRx({qname: 'clean-test2'});
slave1.workObservable().Subscribe(function(workObj){
workReceived++;
workObj.callback(null, workObj.work + 3);
});

var slave2 = new WorkQueueRx('clean-test2');
var slave2 = new WorkQueueRx({qname: 'clean-test2'});
slave2.workObservable().Subscribe(function(workObj){
workReceived++;
workObj.callback(null, workObj.work + 3);
Expand All @@ -112,12 +112,12 @@ wqMaster.completedObservable().Subscribe(function(workItem){

Rx.Observable.FromArray([1,2,3])
// ForkMany usage
.ForkMany('test-q')
.ForkMany({qname: 'test-q'})
.Subscribe(function(result){
console.log(result);
});

var worker = new WorkQueueRx('test-q');
var worker = new WorkQueueRx({qname: 'test-q'});
worker.workObservable().Subscribe(function(workItem){
workItem.callback(null, workItem.work + 1);
});
Expand Down
6 changes: 3 additions & 3 deletions examples/fork-many.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@

var forkmany = require('../lib/rx-extensions');
var WorkQueueRx = require('../lib/qrx').WorkQueueRx;
var Qrx = require('../lib/qrx');

// add the forkmany combinator to Rx
var Rx = forkmany.extendRx(require('rx').Rx);

Rx.Observable.FromArray([1,2,3])
// ForkMany usage
.ForkMany('test-q')
.ForkMany({qname:'test-q'})
.Subscribe(function(result){
console.log(result);
});

var worker = new WorkQueueRx('test-q');
var worker = new Qrx({qname:'test-q'});
worker.workObservable().Subscribe(function(workItem){
workItem.callback(null, workItem.work + 1);
});
8 changes: 4 additions & 4 deletions examples/hello-qrx.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ var redis = require('redis'),
uuid = require('node-uuid');


var WorkQueueRx = require('../lib/qrx').WorkQueueRx;
Qrx = require('../lib/qrx');

// create a new queue with well known name
wq = new WorkQueueRx('test-wq');
// clear any pending work (optional)
wq.clear();
wq = new Qrx();

wq.enqueue('one');
wq.enqueue('two');



// subscribe for work
wq.workObservable().Subscribe(function(workObj){
// worker receives two an object with work
Expand Down
24 changes: 16 additions & 8 deletions examples/qrx-cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ var numCPUs = require('os').cpus().length;


var forkmany = require('../lib/rx-extensions');
var WorkQueueRx = require('../lib/qrx').WorkQueueRx;
var Qrx = require('../lib/qrx');



var workQueue = new WorkQueueRx('clustered-q');
var workQueue = new Qrx({qname:'test-cluster'});

var workCount = 100;

if (cluster.isMaster) {
// Fork workers.
Expand All @@ -17,25 +19,31 @@ if (cluster.isMaster) {
cluster.fork();
}
} else {
// Worker processes each have a queue
workQueue.workObservable().Subscribe(function(workItem){
workItem.callback(null, workItem.work + 10);
})
// Worker processes each have a queue
workQueue.workObservable().Subscribe(function(workItem){
workItem.callback(null, workItem.work + 10);
})
}


if (cluster.isMaster){
// enqueue some work for the cluster

var completedCount = 0;
workQueue.completedObservable().Subscribe(function(workItem){
console.log(workItem.completedWork);
completedCount++;

if (completedCount == workCount) {
workQueue.stop();
}
}, function(exn){
console.log('work exception:', exn);
},
function(){
console.log('Work Completed with with:', numCPUs, 'processors')
});

for (var i=0; i < 20; i++) {
for (var i=0; i < workCount; i++) {
workQueue.enqueue(i);
}
}
9 changes: 6 additions & 3 deletions lib/qrx.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ Qrx.prototype.initialize = function (options) {
self.statsChannel.send(self.stats);
});



}

/**
Expand Down Expand Up @@ -198,7 +200,7 @@ Qrx.prototype.workObservable = function() {
var self = this;
var rc = newRedisClient(this.redisOptions);
rc.incr(this.workerCount);

// each work subscription keeps track of how much work
// is in flight
var workInFlight = 0;
Expand All @@ -207,10 +209,11 @@ Qrx.prototype.workObservable = function() {
// this will set n callbacks where n == throttle
while (workInFlight < self.workThrottle){
workInFlight++;
self.pending.blockingDequeue(rc, 0, function(err, workItem){
self.pending.blockingDequeue(rc, 0, function(err, workItem) {

if (workItem.work != this.STOP_MESSAGE) {
// add it to the working set
self.workingSet.addToSet(workItem, function(err, result){
self.workingSet.addToSet(workItem, function(err, result) {
if (!err){
// dispatch to the worker
// workers get only the information they need
Expand Down
6 changes: 3 additions & 3 deletions lib/rx-extensions.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
var Rx = require('rx').Rx;
var WorkQueueRx = require('../lib/qrx.js').WorkQueueRx;
var WorkQueueRx = require('../lib/qrx.js');


/**
Expand All @@ -8,8 +8,8 @@ var WorkQueueRx = require('../lib/qrx.js').WorkQueueRx;
*/
function extendRx(Rx){

Rx.Observable.prototype.ForkMany = function(qname, redisOpts){
var q = new WorkQueueRx(qname, redisOpts)
Rx.Observable.prototype.ForkMany = function(options){
var q = new WorkQueueRx(options)
return this.SelectMany(function(r){
q.enqueue(r);
return q.completedObservable();
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "qrx",
"description": "A light-weight distributed queue based on redis and RxJS.",
"author": "Roger H. Castillo <roger.castillo@loku.com> (http://tech.loku.com)",
"version": "0.2.6",
"version": "0.2.7",
"repository": {
"type": "git",
"url": "git://github.com/loku/qrx.git"
Expand Down

0 comments on commit 7549931

Please sign in to comment.