-
Notifications
You must be signed in to change notification settings - Fork 37
/
reactor.js
81 lines (71 loc) · 2.17 KB
/
reactor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/*
* reactor.js: Reactor object responsible for creating pipe chains of streams for reacting to events.
*
* (C) 2012, Charlie Robbins, Jarrett Cruger, and the Contributors.
*
*/
var EventEmitter = require('events').EventEmitter,
util = require('util'),
uuid = require('node-uuid'),
ReadWriteStream = require('../common/read-write-stream');
//
// ### function Reactor ()
// #### @options {string|Object} Options for this reactor
// Constructor function for the Reactor object responsible for creating
// pipe chains of streams for reacting to events.
//
var Reactor = module.exports = function Reactor(options) {
EventEmitter.call(this);
options = options || {};
if (typeof options === 'string') {
options = { name: options };
}
this.reactors = [];
this.id = uuid.v4();
this.name = options.name;
};
//
// Inherit from event emitter for error propagation
//
util.inherits(Reactor, EventEmitter);
//
// ### function createStream ()
// Instantiates a new and unique pipe-chain for the reactors
// associated with this instance.
//
Reactor.prototype.createStream = function (source) {
var self = this;
source = source || new ReadWriteStream();
//
// Helper function which wraps the `Stream` constructor
// function and applys the arguments passed to the target
// `reactor.Reactor.prototype[method]` liberally. This
// allows for variable arguments when constructing
// reactor chains. e.g.:
//
// reactor()
// .where('service', '*/health/heartbeat')
// OR
//
// reactor()
// .where({
// 'state': 'critical',
// 'service': '*/health/heartbeat'
// })
//
function wrapStream(Stream, args) {
var wrap = {};
wrap.__proto__ = Stream.prototype;
Stream.apply(wrap, args);
wrap.name = self.name;
wrap.id = self.id;
return wrap;
}
source.on('error', this.emit.bind(this, 'error'));
return this.reactors.reduce(function (last, nextOptions) {
var stream = wrapStream(nextOptions.Factory, nextOptions.args || []);
stream.on('error', self.emit.bind(self, 'error'));
stream.on('reactor:error', self.emit.bind(self, 'reactor:error'));
return last.pipe(stream);
}, source);
};