Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Shared polling for event streams.

If multiple clients are listening to the same event stream, share a polling
loop. This is much more efficient than every client polling for itself!
  • Loading branch information...
commit 6e9b795eb3faa70ad6437a1333cda1b083936dec 1 parent 408a979
@mbostock mbostock authored
View
2  lib/cube/event-expression.js
@@ -142,7 +142,7 @@ module.exports = (function(){
pos = savedPos1;
}
var result2 = result1 !== null
- ? (function(expression) { return expression; })(result1[1])
+ ? (function(expression) { expression.source = input; return expression; })(result1[1])
: null;
if (result2 !== null) {
var result0 = result2;
View
2  lib/cube/event-expression.peg
@@ -73,7 +73,7 @@
}
start
- = _ expression:event_expression _ { return expression; }
+ = _ expression:event_expression _ { expression.source = input; return expression; }
event_expression
= value:event_value_expression filters:(_ "." _ event_filter_expression)*
View
98 lib/cube/event.js
@@ -96,7 +96,8 @@ exports.putter = function(db) {
};
exports.getter = function(db) {
- var collection = types(db);
+ var collection = types(db),
+ streamsBySource = {};
function getter(request, callback) {
var stream = !("stop" in request),
@@ -109,7 +110,6 @@ exports.getter = function(db) {
if (isNaN(stop)) return callback({error: "invalid stop"}), -1;
// Parse the expression.
- var expression;
try {
expression = parser.parse(request.expression);
} catch (error) {
@@ -129,38 +129,88 @@ exports.getter = function(db) {
expression.fields(fields);
// Query for the desired events.
- function query() {
+ function query(callback) {
collection(expression.type).events.find(filter, fields, options, function(error, cursor) {
handle(error);
cursor.each(function(error, event) {
- // If the callback is closed (for example, when the WebSocket is
- // closed, or when the maximum number of events from the GET endpoint
- // is reached), treat this event as the last event. Note that closing
- // the cursor mid-loop causes an error, which we ignore!
- if (callback.closed) {
- cursor.close();
- if (!stream) callback(null);
- return;
- }
+ // If the callback is closed (i.e., if the WebSocket connection was
+ // closed), then abort the query. Note that closing the cursor mid-
+ // loop causes an error, which we subsequently ignore!
+ if (callback.closed) return cursor.close();
handle(error);
- // A null event indicates that there are no more results. For
- // streaming queries, we don't report this sentinel value, and instead
- // set a timeout to issue another query for more recent results.
+ // A null event indicates that there are no more results.
if (event) callback({time: event.t, data: event.d});
- else if (!stream) callback(null);
- else {
- filter.t.$gte = stop;
- filter.t.$lt = stop = new Date(Date.now() - delay);
- setTimeout(query, streamInterval);
- }
+ else callback(null);
});
});
}
- query();
+ // For streaming queries, share streams for efficient polling.
+ if (stream) {
+ var streams = streamsBySource[expression.source];
+
+ // If there is an existing stream to attach to, backfill the initial set
+ // of results to catch the client up to the stream. Add the new callback
+ // to a queue, so that when the shared stream finishes its current poll,
+ // it begins notifying the new client. Note that we don't pass the null
+ // (end terminator) to the callback, because more results are to come!
+ if (streams) {
+ filter.t.$lt = streams.time;
+ streams.waiting.push(callback);
+ query(function(event) { if (event) callback(event); });
+ }
+
+ // Otherwise, we're creating a new stream, so we're responsible for
+ // starting the polling loop. This means notifying active callbacks,
+ // detecting when active callbacks are closed, advancing the time window,
+ // and moving waiting clients to active clients.
+ else {
+ streams = streamsBySource[expression.source] = {time: stop, waiting: [], active: [callback]};
+ (function poll() {
+ query(function callback(event) {
+
+ // If there's an event…
+ if (event) {
+ var closed = false;
+
+ // Send the event to all active, open clients.
+ streams.active.forEach(function(callback) {
+ if (!callback.closed) callback(event);
+ else closed = true;
+ });
+
+ // Remove any closed callbacks.
+ // Removal is rare, so we don't want to filter every time.
+ if (closed) streams.active = streams.active.filter(open);
+
+ // If no clients remain, then it's safe to close the callback.
+ // The query function will then terminate the underlying cursor.
+ if (!streams.active.length && !streams.waiting.length) {
+ callback.closed = true;
+ delete streamsBySource[expression.source];
+ }
+ }
+
+ // Otherwise, we've reached the end of a poll, and it's time to
+ // merge the waiting callbacks into the active callbacks. Advance
+ // the time range, and set a timeout for the next poll.
+ else {
+ streams.active = streams.active.concat(streams.waiting);
+ streams.waiting = [];
+ filter.t.$gte = streams.time;
+ filter.t.$lt = streams.time = new Date(Date.now() - delay);
+ setTimeout(poll, streamInterval);
+ }
+ });
+ })();
+ }
+ }
+
+ // For non-streaming queries, just send the single batch!
+ else query(callback);
}
getter.close = function(callback) {
@@ -173,3 +223,7 @@ exports.getter = function(db) {
function handle(error) {
if (error) throw error;
}
+
+function open(callback) {
+ return !callback.closed;
+}
View
33 test/event-expression-test.js
@@ -20,6 +20,9 @@ suite.addBatch({
var filter = {};
e.filter(filter);
assert.deepEqual(filter, {});
+ },
+ "has the expected source": function(e) {
+ assert.equal(e.source, "test");
}
},
@@ -34,6 +37,9 @@ suite.addBatch({
var filter = {};
e.filter(filter);
assert.deepEqual(filter, {"d.i": {$exists: true}});
+ },
+ "has the expected source": function(e) {
+ assert.equal(e.source, "test(i)");
}
},
@@ -48,6 +54,9 @@ suite.addBatch({
var filter = {};
e.filter(filter);
assert.deepEqual(filter, {"d.i": {$exists: true}, "d.j": {$exists: true}});
+ },
+ "has the expected source": function(e) {
+ assert.equal(e.source, "test(i, j)");
}
},
@@ -62,6 +71,9 @@ suite.addBatch({
var filter = {};
e.filter(filter);
assert.deepEqual(filter, {"d.i": {$gt: 42}});
+ },
+ "has the expected source": function(e) {
+ assert.equal(e.source, "test(i).gt(i, 42)");
}
},
@@ -85,6 +97,9 @@ suite.addBatch({
var filter = {};
e.filter(filter);
assert.deepEqual(filter, {"d.i": {$gt: 42, $lte: 52}});
+ },
+ "has the expected source": function(e) {
+ assert.equal(e.source, "test.gt(i, 42).le(i, 52)");
}
},
@@ -94,6 +109,9 @@ suite.addBatch({
var filter = {};
e.filter(filter);
assert.deepEqual(filter, {"d.i": 52});
+ },
+ "has the expected source": function(e) {
+ assert.equal(e.source, "test.gt(i, 42).eq(i, 52)");
}
},
@@ -103,6 +121,9 @@ suite.addBatch({
var filter = {};
e.filter(filter);
assert.deepEqual(filter, {"d.i": 52});
+ },
+ "has the expected source": function(e) {
+ assert.equal(e.source, "test.eq(i, 52).gt(i, 42)");
}
},
@@ -117,6 +138,9 @@ suite.addBatch({
var filter = {};
e.filter(filter);
assert.deepEqual(filter, {"d.i.j": {$exists: true}});
+ },
+ "has the expected source": function(e) {
+ assert.equal(e.source, "test(i.j)");
}
},
@@ -131,6 +155,9 @@ suite.addBatch({
var filter = {};
e.filter(filter);
assert.deepEqual(filter, {"d.i": {$exists: true}});
+ },
+ "has the expected source": function(e) {
+ assert.equal(e.source, "test(i[0])");
}
},
@@ -145,6 +172,9 @@ suite.addBatch({
var filter = {};
e.filter(filter);
assert.deepEqual(filter, {"d.i.j.k": {$exists: true}});
+ },
+ "has the expected source": function(e) {
+ assert.equal(e.source, "test(i.j[0].k)");
}
},
@@ -159,6 +189,9 @@ suite.addBatch({
var filter = {};
e.filter(filter);
assert.deepEqual(filter, {"d.i.j.0.k": {$gt: 42}});
+ },
+ "has the expected source": function(e) {
+ assert.equal(e.source, "test.gt(i.j[0].k, 42)");
}
},
Please sign in to comment.
Something went wrong with that request. Please try again.