Skip to content

Commit

Permalink
[api test] Properly pipe to unique pipe-chains in .by().
Browse files Browse the repository at this point in the history
  • Loading branch information
indexzero committed Mar 9, 2013
1 parent a3367ea commit 8c58441
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 19 deletions.
35 changes: 35 additions & 0 deletions lib/godot/common/filter-stream.js
@@ -0,0 +1,35 @@
/*
* filter-stream.js: Simple readable and writable stream that filters key by value.
*
* (C) 2012, Nodejitsu Inc.
*
*/

var utile = require('utile'),
ReadWriteStream = require('./read-write-stream')

//
// ### function ReadWriteStream ()
// A aimple readable and writable stream that filters key by value.
//
var FilterStream = module.exports = function FilterStream(key, value) {
this.key = key;
this.value = value

ReadWriteStream.call(this);
};

//
// Inherit from `ReadWriteStream`.
//
utile.inherits(FilterStream, ReadWriteStream);

//
// ### function write (data)
// Emits the "data" event with the pass-thru `data`.
//
FilterStream.prototype.write = function (data) {
if (data[this.key] === this.value) {
this.emit('data', data);
}
};
42 changes: 23 additions & 19 deletions lib/godot/reactor/by.js
Expand Up @@ -6,28 +6,31 @@
*/

var util = require('util'),
FilterStream = require('../common/filter-stream'),
ReadWriteStream = require('../common/read-write-stream'),
Reactor = require('./reactor');

//
// ### function By (key, reactor)
// #### @key {string} Particular key to listen for a change
// ### function By (keys, reactor)
// #### @key {string|Array} Particular key to listen for a change
// #### @reactor {godot.reactor().type()} Reactor or reactor chain to be created
// Constructor function for the by stream to trigger the creation of a new set
// of streams based on a key change.
//
var By = module.exports = function (key, reactor) {
var By = module.exports = function (keys, reactor) {
ReadWriteStream.call(this);

if (typeof key !== 'string'
if ((typeof keys !== 'string' && !Array.isArray(keys))
|| !(reactor instanceof Reactor)) {
throw new Error('This reactor takes a key and godot.reactor() as an argument');
throw new Error('This reactor takes key(s) and godot.reactor() as an argument');
}

this.last;
this.key = key;
this.keys = !Array.isArray(keys) ? [keys] : keys;
this.reactor = reactor;

this.streams = this.keys.reduce(function (all, key) {
all[key] = {};
return all;
}, {});
};

//
Expand All @@ -38,21 +41,22 @@ util.inherits(By, ReadWriteStream);
//
// ### function write (data)
// #### @data {Object} JSON to filter
// Watches for changes to the key value
// and pipes it to a new reactor or reactor chain
// if it does.
// Creates a new pipe-chain for `this.reactor`
// for any unique values for all of `this.keys`
// and pipes `data` to them.
//
By.prototype.write = function (data) {
var self = this;

if (typeof this.last !== 'undefined'
&& this.last !== data[this.key]) {

var reactor = new Reactor();
var stream = reactor.createStream.call(this.reactor);
this.pipe(stream);
this.keys.forEach(function (key) {
var value = data[key];

}
if (!self.streams[key][value]) {
var source = new FilterStream(key, value);
self.streams[key][value] = self.reactor.createStream(source);
self.pipe(source);
}
});

this.last = data[this.key];
this.emit('data', data);
};
14 changes: 14 additions & 0 deletions test/fixtures/by.json
@@ -0,0 +1,14 @@
[
{
"service": "charlie/app/health/heartbeat",
"ttl": 50
},
{
"service": "charlie/app/health/memory",
"ttl": 50
},
{
"service": "charlie/app/health/heartbeat",
"ttl": 50
}
]
34 changes: 34 additions & 0 deletions test/reactor/by-test.js
@@ -0,0 +1,34 @@
/*
* change-test.js: Tests for the Change reactor stream.
*
* (C) 2012, Nodejitsu Inc.
*
*/

var assert = require('assert'),
vows = require('vows'),
godot = require('../../lib/godot'),
macros = require('../macros').reactor;

var count = 0;

vows.describe('godot/reactor/by').addBatch({
"Godot by": {
"service": macros.shouldEmitDataSync(
godot.reactor()
.by(
'service',
godot.reactor().map(function (data) {
count++;
return data;
})
),
'by',
3
)
}
}).addBatch({
"Should emit pipe the events to the correct pipe-chains": function () {
assert.equal(count, 3);
}
}).export(module);

0 comments on commit 8c58441

Please sign in to comment.