Permalink
Browse files

Merge branch 'release'

  • Loading branch information...
2 parents 143b0cb + c91c25c commit f394dfb3cd051d5a625aa4b0df4a3579a9681284 @mbostock mbostock committed Oct 5, 2011
Showing with 54 additions and 27 deletions.
  1. +1 −1 lib/cube/index.js
  2. +36 −12 lib/cube/server/event.js
  3. +9 −9 lib/cube/server/metric.js
  4. +7 −4 lib/cube/server/server.js
  5. +1 −1 package.json
View
@@ -1,4 +1,4 @@
-exports.version = "0.0.4";
+exports.version = "0.0.5";
exports.emitter = require("./server/emitter");
exports.server = require("./server/server");
exports.collector = require("./server/collector");
View
@@ -4,6 +4,7 @@
// TODO fix race condition between cache invalidation and metric computation
var util = require("util"),
+ mongodb = require("mongodb"),
parser = require("./event-expression"),
tiers = require("./tiers"),
types = require("./types");
@@ -71,11 +72,13 @@ exports.putter = function(db) {
};
exports.getter = function(db) {
- var collection = types(db);
+ var collection = types(db),
+ streamDelay = 5000;
- return function(request, callback) {
- var start = new Date(request.start),
- stop = new Date(request.stop);
+ function getter(request, callback) {
+ var stream = !("stop" in request),
+ start = new Date(request.start),
+ stop = stream ? new Date(Date.now() - streamDelay) : new Date(request.stop);
// Validate the dates.
if (isNaN(start)) return util.log("invalid start: " + request.start);
@@ -98,15 +101,36 @@ exports.getter = function(db) {
expression.fields(fields);
// Query for the desired events.
- collection(expression.type).events.find(filter, fields, event_options, function(error, cursor) {
- if (error) return util.log(error);
- cursor.each(function(error, event) {
- if (error) return util.log(error);
- if (event) callback({
- time: event.t,
- data: event.d
+ function query() {
+ collection(expression.type).events.find(filter, fields, event_options, function(error, cursor) {
+ if (error) throw error;
+ cursor.each(function(error, event) {
+ if (callback.closed) return cursor.close();
+ if (error) throw error;
+ if (event) callback({
+ time: event.t,
+ data: event.d
+ });
});
});
- });
+ }
+
+ query();
+
+ // While streaming, periodically poll for new results.
+ if (stream) {
+ stream = setInterval(function() {
+ if (callback.closed) return clearInterval(stream);
+ filter.t.$gte = stop;
+ filter.t.$lt = stop = new Date(Date.now() - streamDelay);
+ query();
+ }, streamDelay);
+ }
+ }
+
+ getter.close = function(callback) {
+ callback.closed = true;
};
+
+ return getter;
};
View
@@ -80,10 +80,10 @@ exports.getter = function(db) {
// Immediately report back whatever we have. If any values are missing,
// merge them into contiguous intervals and asynchronously compute them.
function foundMetrics(error, cursor) {
- if (error) return util.log(error);
+ if (error) throw error;
var time = start;
cursor.each(function(error, row) {
- if (error) return util.log(error);
+ if (error) throw error;
if (row) {
callback(row.t, row.v, row.g);
if (time < row.t) compute(time, row.t);
@@ -100,15 +100,15 @@ exports.getter = function(db) {
filter.t.$gte = start;
filter.t.$lt = next;
type.events.find(filter, fields, group_options, function(error, cursor) {
- if (error) return util.log(error);
+ if (error) throw error;
var k0, values;
cursor.nextObject(function(error, row) {
- if (error) return util.log(error);
+ if (error) throw error;
if (!row) return;
k0 = group.value(row);
values = [map(row)];
cursor.each(function(error, row) {
- if (error) return util.log(error);
+ if (error) throw error;
if (row) {
var k1 = group.value(row);
if (k0 != k1) {
@@ -133,10 +133,10 @@ exports.getter = function(db) {
filter.t.$gte = start;
filter.t.$lt = next;
type.events.find(filter, fields, event_options, function(error, cursor) {
- if (error) return util.log(error);
+ if (error) throw error;
var groups = {};
cursor.each(function(error, row) {
- if (error) return util.log(error);
+ if (error) throw error;
if (!row) {
for (var key in groups) saveGroup(start, reduce(groups[key]), key);
@@ -181,10 +181,10 @@ exports.getter = function(db) {
filter.t.$gte = start;
filter.t.$lt = stop;
type.events.find(filter, fields, event_options, function(error, cursor) {
- if (error) return util.log(error);
+ if (error) throw error;
var time = start, values = [];
cursor.each(function(error, row) {
- if (error) return util.log(error);
+ if (error) throw error;
if (row) {
var then = tier.floor(row.t);
if (time < then) {
View
@@ -7,7 +7,7 @@ var util = require("util"),
// Don't crash on errors.
process.on("uncaughtException", function(error) {
- if (error.code !== "EPIPE") util.log(error.stack);
+ util.log(error.stack);
});
// And then this happened:
@@ -52,7 +52,8 @@ module.exports = function(options) {
// Register secondary WebSocket listener.
secondary.on("connection", function(connection) {
- connection.remoteAddress = connection._socket.remoteAddress;
+ connection.socket = connection._socket;
+ connection.remoteAddress = connection.socket.remoteAddress;
connection.sendUTF = connection.send;
connect(connection, connection._req);
});
@@ -65,7 +66,9 @@ module.exports = function(options) {
if ((e = endpoints.ws[i]).match(request.url)) {
function callback(response) {
- connection.sendUTF(JSON.stringify(response));
+ if (connection.socket.writable) {
+ connection.sendUTF(JSON.stringify(response));
+ }
}
callback.id = ++id;
@@ -82,7 +85,7 @@ module.exports = function(options) {
// closed very shortly after it is opened. So we do an additional
// check using an interval to verify that the socket is still open.
var interval = setInterval(function() {
- if (!connection._socket.writable) {
+ if (!connection.socket.writable) {
interval = clearInterval(interval);
connection.close();
}
View
@@ -1,6 +1,6 @@
{
"name": "cube",
- "version": "0.0.4",
+ "version": "0.0.5",
"description": "A system for time series visualization using MongoDB, Node and D3.",
"keywords": ["time series", "visualization"],
"homepage": "http://square.github.com/cube/",

0 comments on commit f394dfb

Please sign in to comment.