Permalink
Browse files

Abort /event/get when connection is closed.

We now detect when the web socket is closed, and abort the query. This commit
also changes all errors to be re-thrown and logged centrally, rather than
duplicating logging code.
  • Loading branch information...
1 parent eaacccc commit 18d2ef356f9989206e6561bd450a4695fd8f1d5c @mbostock mbostock committed Oct 3, 2011
Showing with 24 additions and 14 deletions.
  1. +11 −3 lib/cube/server/event.js
  2. +9 −9 lib/cube/server/metric.js
  3. +4 −2 lib/cube/server/server.js
View
14 lib/cube/server/event.js
@@ -4,6 +4,7 @@
// TODO fix race condition between cache invalidation and metric computation
var util = require("util"),
+ mongodb = require("mongodb"),
parser = require("./event-expression"),
tiers = require("./tiers"),
types = require("./types");
@@ -73,7 +74,7 @@ exports.putter = function(db) {
exports.getter = function(db) {
var collection = types(db);
- return function(request, callback) {
+ function getter(request, callback) {
var start = new Date(request.start),
stop = new Date(request.stop);
@@ -99,14 +100,21 @@ exports.getter = function(db) {
// Query for the desired events.
collection(expression.type).events.find(filter, fields, event_options, function(error, cursor) {
- if (error) return util.log(error);
+ if (error) throw error;
cursor.each(function(error, event) {
- if (error) return util.log(error);
+ if (callback.closed) return cursor.close();
+ if (error) throw error;
if (event) callback({
time: event.t,
data: event.d
});
});
});
+ }
+
+ getter.close = function(callback) {
+ callback.closed = true;
};
+
+ return getter;
};
View
18 lib/cube/server/metric.js
@@ -80,10 +80,10 @@ exports.getter = function(db) {
// Immediately report back whatever we have. If any values are missing,
// merge them into contiguous intervals and asynchronously compute them.
function foundMetrics(error, cursor) {
- if (error) return util.log(error);
+ if (error) throw error;
var time = start;
cursor.each(function(error, row) {
- if (error) return util.log(error);
+ if (error) throw error;
if (row) {
callback(row.t, row.v, row.g);
if (time < row.t) compute(time, row.t);
@@ -100,15 +100,15 @@ exports.getter = function(db) {
filter.t.$gte = start;
filter.t.$lt = next;
type.events.find(filter, fields, group_options, function(error, cursor) {
- if (error) return util.log(error);
+ if (error) throw error;
var k0, values;
cursor.nextObject(function(error, row) {
- if (error) return util.log(error);
+ if (error) throw error;
if (!row) return;
k0 = group.value(row);
values = [map(row)];
cursor.each(function(error, row) {
- if (error) return util.log(error);
+ if (error) throw error;
if (row) {
var k1 = group.value(row);
if (k0 != k1) {
@@ -133,10 +133,10 @@ exports.getter = function(db) {
filter.t.$gte = start;
filter.t.$lt = next;
type.events.find(filter, fields, event_options, function(error, cursor) {
- if (error) return util.log(error);
+ if (error) throw error;
var groups = {};
cursor.each(function(error, row) {
- if (error) return util.log(error);
+ if (error) throw error;
if (!row) {
for (var key in groups) saveGroup(start, reduce(groups[key]), key);
@@ -181,10 +181,10 @@ exports.getter = function(db) {
filter.t.$gte = start;
filter.t.$lt = stop;
type.events.find(filter, fields, event_options, function(error, cursor) {
- if (error) return util.log(error);
+ if (error) throw error;
var time = start, values = [];
cursor.each(function(error, row) {
- if (error) return util.log(error);
+ if (error) throw error;
if (row) {
var then = tier.floor(row.t);
if (time < then) {
View
6 lib/cube/server/server.js
@@ -7,7 +7,7 @@ var util = require("util"),
// Don't crash on errors.
process.on("uncaughtException", function(error) {
- if (error.code !== "EPIPE") util.log(error.stack);
+ util.log(error.stack);
});
// And then this happened:
@@ -66,7 +66,9 @@ module.exports = function(options) {
if ((e = endpoints.ws[i]).match(request.url)) {
function callback(response) {
- connection.sendUTF(JSON.stringify(response));
+ if (connection.socket.writable) {
+ connection.sendUTF(JSON.stringify(response));
+ }
}
callback.id = ++id;

0 comments on commit 18d2ef3

Please sign in to comment.