-
Notifications
You must be signed in to change notification settings - Fork 3
/
message-queue.js
49 lines (47 loc) · 1.61 KB
/
message-queue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
function MessageQueue() {
function remove(xs, x) {
xs.splice(xs.indexOf(x), 1)
}
function Subscription(observable) {
var disposable
function cancel() { remove(subscriptions, subscription)}
function push(message) { messageQueue.push(message) }
function start() {
disposable = observable.Subscribe( push, cancel)
}
function stop() {
if (disposable) disposable.Dispose()
}
var subscription = {
start : start, stop : stop
}
subscriptions.push(subscription)
if (observers.length > 0) { start() }
return subscription;
}
var subscriptions = []
var observers = []
var messageQueue = Rx.Observable.Create(function(observer) {
observers.push(observer)
if (observers.length == 1) {
subscriptions.forEach(function(subscription) { subscription.start() })
}
return function() {
remove(observers, observer);
if (observers.length == 0) {
subscriptions.forEach(function(subscription) { subscription.stop() })
}
}
})
messageQueue.push = function (message) {
observers.map(identity).forEach(function(observer) {
observer.OnNext(message)
});
return messageQueue
}
messageQueue.plug = function (observable) {
Subscription(observable)
return messageQueue
}
return messageQueue
}