Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'websocket-draft-10'

  • Loading branch information...
commit 143b0cb539ed1cca0250cb7acc6265080efc61a2 2 parents 333c32b + 77ef72c
@mbostock mbostock authored
View
2  lib/cube/index.js
@@ -1,4 +1,4 @@
-exports.version = "0.0.3";
+exports.version = "0.0.4";
exports.emitter = require("./server/emitter");
exports.server = require("./server/server");
exports.collector = require("./server/collector");
View
14 lib/cube/server/emitter.js
@@ -1,5 +1,5 @@
var util = require("util"),
- WebSocket = require("websocket-client").WebSocket;
+ websocket = require("websocket");
module.exports = function() {
var emitter = {},
@@ -11,7 +11,6 @@ module.exports = function() {
function close() {
if (socket) {
util.log("closing socket");
- socket.onclose = null;
socket.close();
socket = null;
}
@@ -21,9 +20,10 @@ module.exports = function() {
timeout = 0;
close();
util.log("opening socket: " + url);
- socket = new WebSocket(url);
- socket.onopen = flush;
- socket.onclose = reopen;
+ var client = new websocket.client();
+ client.on("connect", function(connection) { socket = connection; flush(); });
+ client.on("connectFailed", reopen);
+ client.connect(url);
}
function reopen() {
@@ -37,7 +37,7 @@ module.exports = function() {
var event;
while (event = queue.pop()) {
try {
- socket.send(JSON.stringify(event));
+ socket.sendUTF(JSON.stringify(event));
} catch (e) {
util.log(e.stack);
reopen();
@@ -54,7 +54,7 @@ module.exports = function() {
emitter.send = function(event) {
queue.push(event);
- if (socket.readyState === socket.OPEN) flush();
+ if (socket) flush();
return emitter;
};
View
67 lib/cube/server/server.js
@@ -1,6 +1,8 @@
var util = require("util"),
url = require("url"),
- websocket = require("websocket-server"),
+ http = require("http"),
+ websocket = require("websocket"),
+ websprocket = require("websocket-server"),
mongodb = require("mongodb");
// Don't crash on errors.
@@ -8,26 +10,65 @@ process.on("uncaughtException", function(error) {
if (error.code !== "EPIPE") util.log(error.stack);
});
+// And then this happened:
+websprocket.Connection = require("../../../node_modules/websocket-server/lib/ws/connection");
+
+// Configuration for WebSocket requests.
+var wsOptions = {
+ maxReceivedFrameSize: 0x10000,
+ maxReceivedMessageSize: 0x100000,
+ fragmentOutgoingMessages: true,
+ fragmentationThreshold: 0x4000,
+ keepalive: true,
+ keepaliveInterval: 20000,
+ assembleFragments: true,
+ disableNagleAlgorithm: true,
+ closeTimeout: 5000
+};
+
module.exports = function(options) {
var server = {},
- socket = websocket.createServer(),
+ primary = http.createServer(),
+ secondary = websprocket.createServer(),
endpoints = {ws: [], http: []},
mongo = new mongodb.Server(options["mongo-host"], options["mongo-port"]),
- db = new mongodb.Db(options["mongo-database"], mongo);
+ db = new mongodb.Db(options["mongo-database"], mongo),
+ id = 0;
+
+ secondary.server = primary;
+
+ // Register primary WebSocket listener with fallback.
+ primary.on("upgrade", function(request, socket, head) {
+ if ("sec-websocket-version" in request.headers) {
+ request = new websocket.request(socket, request, wsOptions);
+ request.readHandshake();
+ connect(request.accept(request.requestedProtocols[0], request.origin), request.httpRequest);
+ } else if (request.method === "GET"
+ && /^websocket$/i.test(request.headers.upgrade)
+ && /^upgrade$/i.test(request.headers.connection)) {
+ new websprocket.Connection(secondary.manager, secondary.options, request, socket, head);
+ }
+ });
- // Register WebSocket listener.
- socket.on("connection", function(connection) {
- util.log(connection._socket.remoteAddress + " " + connection._req.url);
+ // Register secondary WebSocket listener.
+ secondary.on("connection", function(connection) {
+ connection.remoteAddress = connection._socket.remoteAddress;
+ connection.sendUTF = connection.send;
+ connect(connection, connection._req);
+ });
+
+ function connect(connection, request) {
+ util.log(connection.remoteAddress + " " + request.url);
// Forward messages to the appropriate endpoint, or close the connection.
for (var i = -1, n = endpoints.ws.length, e; ++i < n;) {
- if ((e = endpoints.ws[i]).match(connection._req.url)) {
+ if ((e = endpoints.ws[i]).match(request.url)) {
function callback(response) {
- connection.send(JSON.stringify(response));
+ connection.sendUTF(JSON.stringify(response));
}
- callback.id = connection.id;
+ callback.id = ++id;
// Listen for close events.
if (e.dispatch.close) {
@@ -49,16 +90,16 @@ module.exports = function(options) {
}
return connection.on("message", function(request) {
- e.dispatch(JSON.parse(request), callback);
+ e.dispatch(JSON.parse(request.utf8Data || request), callback);
});
}
}
connection.close();
- });
+ }
// Register HTTP listener.
- socket.on("request", function(request, response) {
+ primary.on("request", function(request, response) {
var u = url.parse(request.url);
util.log(request.connection.remoteAddress + " " + u.pathname);
@@ -83,7 +124,7 @@ module.exports = function(options) {
// Start the server!
util.log("starting http server on port " + options["http-port"]);
- socket.listen(options["http-port"]);
+ primary.listen(options["http-port"]);
};
return server;
View
4 package.json
@@ -1,6 +1,6 @@
{
"name": "cube",
- "version": "0.0.3",
+ "version": "0.0.4",
"description": "A system for time series visualization using MongoDB, Node and D3.",
"keywords": ["time series", "visualization"],
"homepage": "http://square.github.com/cube/",
@@ -12,7 +12,7 @@
"mongodb": "0.9.6-15",
"pegjs": "0.6.2",
"vows": "0.5.11",
- "websocket-client": "1.0.0",
+ "websocket": "0.0.16",
"websocket-server": "1.4.04"
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.