Skip to content
Permalink
Browse files

[api] implement recombine option

  • Loading branch information...
jcrugzz committed Jun 12, 2013
1 parent a09715e commit 7178134d059e3a5704866f49866f2f7ee1729e64
Showing with 14 additions and 6 deletions.
  1. +14 −6 lib/godot/reactor/by.js
@@ -14,20 +14,24 @@ var util = require('util'),
// ### function By (keys, reactor)
// #### @key {string|Array} Particular key to listen for a change
// #### @reactor {godot.reactor().type()} Reactor or reactor chain to be created
// #### @options {Object} options object
// #### @recombine {Boolean} Recombines the data from the split streams
// Constructor function for the by stream to trigger the creation of a new set
// of streams based on a key change.
//
var By = module.exports = function (keys, reactor) {
var By = module.exports = function (keys, reactor, options) {
ReadWriteStream.call(this);

if ((typeof keys !== 'string' && !Array.isArray(keys))
|| !(reactor instanceof Reactor)) {
throw new Error('This reactor takes key(s) and godot.reactor() as an argument');
}

this.keys = !Array.isArray(keys) ? [keys] : keys;
this.reactor = reactor;
this.streams = this.keys.reduce(function (all, key) {
this.keys = !Array.isArray(keys) ? [keys] : keys;
this.options = options || {};
this.reactor = reactor;
this.recombine = this.options.recombine || false;
this.streams = this.keys.reduce(function (all, key) {
all[key] = {};
return all;
}, {});
@@ -55,8 +59,12 @@ By.prototype.write = function (data) {
var source = new FilterStream(key, value);
self.streams[key][value] = self.reactor.createStream(source);
self.pipe(source);
if (self.recombine) {
self.streams[key][value].on('data', self.emit.bind(self, 'data'))
}
}
});

this.emit('data', data);
if(!this.recombine) {
this.emit('data', data);
}
};

0 comments on commit 7178134

Please sign in to comment.
You can’t perform that action at this time.