Skip to content
This repository has been archived by the owner on Dec 2, 2020. It is now read-only.

Commit

Permalink
Better detection of socket closure.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbostock committed Apr 17, 2012
1 parent 6e9b795 commit 9580318
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 39 deletions.
30 changes: 12 additions & 18 deletions lib/cube/event.js
Expand Up @@ -110,6 +110,7 @@ exports.getter = function(db) {
if (isNaN(stop)) return callback({error: "invalid stop"}), -1; if (isNaN(stop)) return callback({error: "invalid stop"}), -1;


// Parse the expression. // Parse the expression.
var expression;
try { try {
expression = parser.parse(request.expression); expression = parser.parse(request.expression);
} catch (error) { } catch (error) {
Expand Down Expand Up @@ -170,36 +171,29 @@ exports.getter = function(db) {
else { else {
streams = streamsBySource[expression.source] = {time: stop, waiting: [], active: [callback]}; streams = streamsBySource[expression.source] = {time: stop, waiting: [], active: [callback]};
(function poll() { (function poll() {
query(function callback(event) { query(function(event) {


// If there's an event // If there's an event, send it to all active, open clients.
if (event) { if (event) {
var closed = false;

// Send the event to all active, open clients.
streams.active.forEach(function(callback) { streams.active.forEach(function(callback) {
if (!callback.closed) callback(event); if (!callback.closed) callback(event);
else closed = true;
}); });

// Remove any closed callbacks.
// Removal is rare, so we don't want to filter every time.
if (closed) streams.active = streams.active.filter(open);

// If no clients remain, then it's safe to close the callback.
// The query function will then terminate the underlying cursor.
if (!streams.active.length && !streams.waiting.length) {
callback.closed = true;
delete streamsBySource[expression.source];
}
} }


// Otherwise, we've reached the end of a poll, and it's time to // Otherwise, we've reached the end of a poll, and it's time to
// merge the waiting callbacks into the active callbacks. Advance // merge the waiting callbacks into the active callbacks. Advance
// the time range, and set a timeout for the next poll. // the time range, and set a timeout for the next poll.
else { else {
streams.active = streams.active.concat(streams.waiting); streams.active = streams.active.concat(streams.waiting).filter(open);
streams.waiting = []; streams.waiting = [];

// If no clients remain, then it's safe to delete the shared
// stream, and we'll no longer be responsible for polling.
if (!streams.active.length) {
delete streamsBySource[expression.source];
return;
}

filter.t.$gte = streams.time; filter.t.$gte = streams.time;
filter.t.$lt = streams.time = new Date(Date.now() - delay); filter.t.$lt = streams.time = new Date(Date.now() - delay);
setTimeout(poll, streamInterval); setTimeout(poll, streamInterval);
Expand Down
26 changes: 5 additions & 21 deletions lib/cube/server.js
Expand Up @@ -73,31 +73,15 @@ module.exports = function(options) {
if ((e = endpoints.ws[i]).match(request.url)) { if ((e = endpoints.ws[i]).match(request.url)) {


function callback(response) { function callback(response) {
if (connection.socket.writable) { connection.sendUTF(JSON.stringify(response));
connection.sendUTF(JSON.stringify(response));
}
} }


callback.id = ++id; callback.id = ++id;


// Listen for close events. // Listen for socket disconnect.
if (e.dispatch.close) { if (e.dispatch.close) connection.socket.on("end", function() {
connection.on("close", function() { e.dispatch.close(callback);
interval = clearInterval(interval); });
e.dispatch.close(callback);
});

// Unfortunately, it looks like there is a bug in websocket-server (or
// somewhere else) where close events are not emitted if the socket is
// closed very shortly after it is opened. So we do an additional
// check using an interval to verify that the socket is still open.
var interval = setInterval(function() {
if (!connection.socket.writable) {
interval = clearInterval(interval);
connection.close();
}
}, 5000);
}


connection.on("message", function(message) { connection.on("message", function(message) {
e.dispatch(JSON.parse(message.utf8Data || message), callback); e.dispatch(JSON.parse(message.utf8Data || message), callback);
Expand Down

0 comments on commit 9580318

Please sign in to comment.