-
Notifications
You must be signed in to change notification settings - Fork 3
/
rx-extensions.js
44 lines (40 loc) · 1.13 KB
/
rx-extensions.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
var Rx = require('rx').Rx;
var WorkQueueRx = require('../lib/qrx.js');
/**
* Forking Rx combinator that projects reactive stream onto distributed
* work queue
*/
function extendRx(Rx){
Rx.Observable.prototype.ForkMany = function(options){
var q = new WorkQueueRx(options)
return this.SelectMany(function(r){
q.enqueue(r);
return q.completedObservable();
});
}
/**
* Rx Combinator that takes an async function with arbitrary paramaeters
* and yeilds an observable that yields the result of the callback
* @param {function}asyncFn with variable params
* @return {Rx.Observable} An observable with the results of the callback
*
*/
Rx.Observable.Callback = function(asyncFn){
var args = Array.prototype.slice.call(arguments);
args = args.slice(1);
return this.Create(function(obs){
args[args.length] = function(err, result){
if (!err){
obs.OnNext(result);
obs.OnCompleted();
} else {
obs.OnError(err);
}
};
asyncFn.apply(null, args)
return function(){};
});
}
return Rx;
}
exports.extendRx = extendRx;