forked from mrauhu/ddp-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
crossbar.js
167 lines (152 loc) · 6.5 KB
/
crossbar.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// A "crossbar" is a class that provides structured notification registration.
// See _match for the definition of how a notification matches a trigger.
// All notifications and triggers must have a string key named 'collection'.
DDPServer._Crossbar = function (options) {
var self = this;
options = options || {};
self.nextId = 1;
// map from collection name (string) -> listener id -> object. each object has
// keys 'trigger', 'callback'. As a hack, the empty string means "no
// collection".
self.listenersByCollection = {};
self.listenersByCollectionCount = {};
self.factPackage = options.factPackage || "livedata";
self.factName = options.factName || null;
};
_.extend(DDPServer._Crossbar.prototype, {
// msg is a trigger or a notification
_collectionForMessage: function (msg) {
var self = this;
if (! _.has(msg, 'collection')) {
return '';
} else if (typeof(msg.collection) === 'string') {
if (msg.collection === '')
throw Error("Message has empty collection!");
return msg.collection;
} else {
throw Error("Message has non-string collection!");
}
},
// Listen for notification that match 'trigger'. A notification
// matches if it has the key-value pairs in trigger as a
// subset. When a notification matches, call 'callback', passing
// the actual notification.
//
// Returns a listen handle, which is an object with a method
// stop(). Call stop() to stop listening.
//
// XXX It should be legal to call fire() from inside a listen()
// callback?
listen: function (trigger, callback) {
var self = this;
var id = self.nextId++;
var collection = self._collectionForMessage(trigger);
var record = {trigger: EJSON.clone(trigger), callback: callback};
if (! _.has(self.listenersByCollection, collection)) {
self.listenersByCollection[collection] = {};
self.listenersByCollectionCount[collection] = 0;
}
self.listenersByCollection[collection][id] = record;
self.listenersByCollectionCount[collection]++;
if (self.factName && Package['facts-base']) {
Package['facts-base'].Facts.incrementServerFact(
self.factPackage, self.factName, 1);
}
return {
stop: function () {
if (self.factName && Package['facts-base']) {
Package['facts-base'].Facts.incrementServerFact(
self.factPackage, self.factName, -1);
}
delete self.listenersByCollection[collection][id];
self.listenersByCollectionCount[collection]--;
if (self.listenersByCollectionCount[collection] === 0) {
delete self.listenersByCollection[collection];
delete self.listenersByCollectionCount[collection];
}
}
};
},
// Fire the provided 'notification' (an object whose attribute
// values are all JSON-compatibile) -- inform all matching listeners
// (registered with listen()).
//
// If fire() is called inside a write fence, then each of the
// listener callbacks will be called inside the write fence as well.
//
// The listeners may be invoked in parallel, rather than serially.
fire: function (notification) {
var self = this;
var collection = self._collectionForMessage(notification);
if (! _.has(self.listenersByCollection, collection)) {
return;
}
var listenersForCollection = self.listenersByCollection[collection];
var callbackIds = [];
_.each(listenersForCollection, function (l, id) {
if (self._matches(notification, l.trigger)) {
callbackIds.push(id);
}
});
// Listener callbacks can yield, so we need to first find all the ones that
// match in a single iteration over self.listenersByCollection (which can't
// be mutated during this iteration), and then invoke the matching
// callbacks, checking before each call to ensure they haven't stopped.
// Note that we don't have to check that
// self.listenersByCollection[collection] still === listenersForCollection,
// because the only way that stops being true is if listenersForCollection
// first gets reduced down to the empty object (and then never gets
// increased again).
_.each(callbackIds, function (id) {
if (_.has(listenersForCollection, id)) {
listenersForCollection[id].callback(notification);
}
});
},
// A notification matches a trigger if all keys that exist in both are equal.
//
// Examples:
// N:{collection: "C"} matches T:{collection: "C"}
// (a non-targeted write to a collection matches a
// non-targeted query)
// N:{collection: "C", id: "X"} matches T:{collection: "C"}
// (a targeted write to a collection matches a non-targeted query)
// N:{collection: "C"} matches T:{collection: "C", id: "X"}
// (a non-targeted write to a collection matches a
// targeted query)
// N:{collection: "C", id: "X"} matches T:{collection: "C", id: "X"}
// (a targeted write to a collection matches a targeted query targeted
// at the same document)
// N:{collection: "C", id: "X"} does not match T:{collection: "C", id: "Y"}
// (a targeted write to a collection does not match a targeted query
// targeted at a different document)
_matches: function (notification, trigger) {
// Most notifications that use the crossbar have a string `collection` and
// maybe an `id` that is a string or ObjectID. We're already dividing up
// triggers by collection, but let's fast-track "nope, different ID" (and
// avoid the overly generic EJSON.equals). This makes a noticeable
// performance difference; see https://github.com/meteor/meteor/pull/3697
if (typeof(notification.id) === 'string' &&
typeof(trigger.id) === 'string' &&
notification.id !== trigger.id) {
return false;
}
if (notification.id instanceof MongoID.ObjectID &&
trigger.id instanceof MongoID.ObjectID &&
! notification.id.equals(trigger.id)) {
return false;
}
return _.all(trigger, function (triggerValue, key) {
return !_.has(notification, key) ||
EJSON.equals(triggerValue, notification[key]);
});
}
});
// The "invalidation crossbar" is a specific instance used by the DDP server to
// implement write fence notifications. Listener callbacks on this crossbar
// should call beginWrite on the current write fence before they return, if they
// want to delay the write fence from firing (ie, the DDP method-data-updated
// message from being sent).
DDPServer._InvalidationCrossbar = new DDPServer._Crossbar({
factName: "invalidation-crossbar-listeners"
});