Permalink
Browse files

Allow event stream delay to be overridden.

By default there is a five-second delay when event streaming, to give events
time to arrive before they are skipped by listeners. You can now change the
stream delay as part of the /event/get request. This is useful if there is a lag
for incoming events (say because of polling a primary data source), or if you
want a time-shifted stream of old events (say, those from yesterday).
  • Loading branch information...
1 parent 614573e commit e51cd376c36325aacd83c7b4e42894b1be80a429 @mbostock mbostock committed Apr 15, 2012
Showing with 86 additions and 14 deletions.
  1. +32 −0 examples/event-stream/event-get.html
  2. +38 −0 examples/event-stream/event-put.html
  3. +16 −14 lib/cube/server/event.js
@@ -0,0 +1,32 @@
+<!DOCTYPE html>
+<meta charset="utf-8">
+<h1>Streaming Events - Get</h1>
+
+<p>This page streams events from Cube's evaluator (the /event/get endpoint) and logs them to the JavaScript console. If you open <a href="event-put.html" target="_blank">event-put.html</a> in a new window, you'll start receiving events.
+
+<script>
+
+var socket = new WebSocket("ws://localhost:1081/1.0/event/get");
+
+socket.onopen = function() {
+ console.log("connected!");
+ socket.send(JSON.stringify({
+ expression: "test(index)",
+ start: new Date()
+ }));
+};
+
+socket.onmessage = function(message) {
+ var event = JSON.parse(message.data);
+ console.log("received", event.data.index);
+};
+
+socket.onclose = function() {
+ console.log("closed");
+};
+
+socket.onerror = function(error) {
+ console.log("error", error);
+};
+
+</script>
@@ -0,0 +1,38 @@
+<!DOCTYPE html>
+<meta charset="utf-8">
+<h1>Streaming Events - Put</h1>
+
+<p>This page streams events to Cube's collector (the /event/put endpoint) and logs them to the JavaScript console. If you open <a href="event-get.html" target="_blank">event-get.html</a> in a new window, you can verify that Cube is receiving the events.
+
+<script>
+
+var index = 0;
+
+var socket = new WebSocket("ws://localhost:1080/1.0/event/put");
+
+socket.onopen = function() {
+ console.log("connected!");
+ setInterval(function() {
+ socket.send(JSON.stringify({
+ type: "test",
+ time: new Date(),
+ data: {index: ++index}
+ }));
+ console.log("sent", index);
+ }, 1000);
+};
+
+socket.onmessage = function(message) {
+ var event = JSON.parse(message.data);
+ console.log("received", event);
+};
+
+socket.onclose = function() {
+ console.log("closed");
+};
+
+socket.onerror = function(error) {
+ console.log("error", error);
+};
+
+</script>
View
@@ -13,6 +13,12 @@ var type_re = /^[a-z][a-zA-Z0-9_]+$/,
event_options = {sort: {t: -1}, batchSize: 1000},
metric_options = {capped: true, size: 1e7, autoIndexId: true};
+// When streaming events, we should allow a delay for events to arrive, or else
+// we risk skipping events that arrive after their event.time. This delay can be
+// customized by specifying a `delay` property as part of the request.
+var streamDelayDefault = 5000,
+ streamInterval = 1000;
+
exports.putter = function(db) {
var collection = types(db),
knownByType = {},
@@ -91,13 +97,13 @@ exports.putter = function(db) {
};
exports.getter = function(db) {
- var collection = types(db),
- streamDelay = 5000;
+ var collection = types(db);
function getter(request, callback) {
var stream = !("stop" in request),
+ delay = "delay" in request ? +request.delay : streamDelayDefault,
start = new Date(request.start),
- stop = stream ? new Date(Date.now() - streamDelay) : new Date(request.stop);
+ stop = stream ? new Date(Date.now() - delay) : new Date(request.stop);
// Validate the dates.
if (isNaN(start)) return callback({error: "invalid start"}), -1;
@@ -134,24 +140,20 @@ exports.getter = function(db) {
else handle(error);
// A null event indicates that there are no more results. For
- // streaming queries, we don't report this sentinel value.
+ // streaming queries, we don't report this sentinel value, and instead
+ // set a timeout to issue another query for more recent results.
if (event) callback({time: event.t, data: event.d});
else if (!stream) callback(null);
+ else {
+ filter.t.$gte = stop;
+ filter.t.$lt = stop = new Date(Date.now() - delay);
+ setTimeout(query, streamInterval);
+ }
});
});
}
query();
-
- // While streaming, periodically poll for new results.
- if (stream) {
- stream = setInterval(function() {
- if (callback.closed) return clearInterval(stream);
- filter.t.$gte = stop;
- filter.t.$lt = stop = new Date(Date.now() - streamDelay);
- query();
- }, streamDelay);
- }
}
getter.close = function(callback) {

0 comments on commit e51cd37

Please sign in to comment.