Permalink
Browse files

Add UDP support for posting events.

This commit also changes the emitter client so that you can easily switch
between websockets and UDP as desired. UDP is faster, but does not detect when
events are not sent successfully.
  • Loading branch information...
1 parent 46e887f commit 1918af1cecd5b53c824ebd6f98a4578aecd8f3f6 @mbostock mbostock committed Apr 22, 2012
View
@@ -5,5 +5,6 @@ module.exports = {
"mongo-database": "cube_development",
"mongo-username": null,
"mongo-password": null,
- "http-port": 1080
+ "http-port": 1080,
+ "udp-port": 1180
};
@@ -1,8 +1,7 @@
module.exports = {
// The collector to send events to.
- "http-host": "127.0.0.1",
- "http-port": 1080,
+ "collector": "ws://127.0.0.1:1080",
// The offset and duration to backfill, in milliseconds.
// For example, if the offset is minus four hours, then the first event that
@@ -4,8 +4,8 @@ var util = require("util"),
cube = require("../../"), // replace with require("cube")
options = require("./random-config");
-util.log("starting websocket client");
-var emitter = cube.emitter().open(options["http-host"], options["http-port"]);
+util.log("starting emitter");
+var emitter = cube.emitter(options["collector"]);
var start = Date.now() + options["offset"],
stop = start + options["duration"],
@@ -26,5 +26,5 @@ while (start < stop) {
}
util.log("sent " + count + " events");
-util.log("stopping websocket client");
+util.log("stopping emitter");
emitter.close();
View
@@ -21,6 +21,9 @@ exports.register = function(db, endpoints) {
endpoint("POST", "/1.0/event/put", poster),
endpoint("POST", "/collectd", require("./collectd").putter(putter))
);
+
+ //
+ endpoints.udp = putter;
};
function post(putter) {
View
@@ -0,0 +1,35 @@
+var util = require("util"),
+ dgram = require("dgram");
+
+module.exports = function(protocol, host, port) {
+ var emitter = {},
+ queue = [],
+ udp = dgram.createSocket("udp4"),
+ closing;
+
+ if (protocol != "udp:") throw new Error("invalid UDP protocol");
+
+ function send() {
+ var event = queue.pop();
+ if (!event) return;
+ var buffer = new Buffer(JSON.stringify(event));
+ udp.send(buffer, 0, buffer.length, port, host, function(error) {
+ if (error) console.warn(error);
+ if (queue.length) process.nextTick(send);
+ else if (closing) udp.close();
+ });
+ }
+
+ emitter.send = function(event) {
+ if (!closing && queue.push(event) == 1) process.nextTick(send);
+ return emitter;
+ };
+
+ emitter.close = function() {
+ if (queue.length) closing = 1;
+ else udp.close();
+ return emitter;
+ };
+
+ return emitter;
+};
View
@@ -0,0 +1,86 @@
+var util = require("util"),
+ websocket = require("websocket");
+
+module.exports = function(protocol, host, port) {
+ var emitter = {},
+ queue = [],
+ url = protocol + "//" + host + ":" + port + "/1.0/event/put",
+ socket,
+ timeout,
+ closing;
+
+ function close() {
+ if (socket) {
+ util.log("closing socket");
+ socket.removeListener("error", reopen);
+ socket.removeListener("close", reopen);
+ socket.close();
+ socket = null;
+ }
+ }
+
+ function closeWhenDone() {
+ closing = true;
+ if (socket) {
+ if (!socket.bytesWaitingToFlush) close();
+ else setTimeout(closeWhenDone, 1000);
+ }
+ }
+
+ function open() {
+ timeout = 0;
+ close();
+ util.log("opening socket: " + url);
+ var client = new websocket.client();
+ client.on("connect", function(connection) {
+ socket = connection;
+ socket.on("message", log);
+ socket.on("error", reopen);
+ socket.on("close", reopen);
+ flush();
+ if (closing) closeWhenDone();
+ });
+ client.on("connectFailed", reopen);
+ client.on("error", reopen);
+ client.connect(url);
+ }
+
+ function reopen() {
+ if (!timeout && !closing) {
+ util.log("reopening soon");
+ timeout = setTimeout(open, 1000);
+ }
+ }
+
+ function flush() {
+ var event;
+ while (event = queue.pop()) {
+ try {
+ socket.sendUTF(JSON.stringify(event));
+ } catch (e) {
+ util.log(e.stack);
+ reopen();
+ return queue.push(event);
+ }
+ }
+ }
+
+ function log(message) {
+ util.log(message.utf8Data);
+ }
+
+ emitter.send = function(event) {
+ queue.push(event);
+ if (socket) flush();
+ return emitter;
+ };
+
+ emitter.close = function() {
+ closeWhenDone();
+ return emitter;
+ };
+
+ open();
+
+ return emitter;
+};
View
@@ -1,90 +1,14 @@
var util = require("util"),
- websocket = require("websocket");
-
-module.exports = function() {
- var emitter = {},
- queue = [],
- url,
- socket,
- timeout,
- closing;
-
- function close() {
- if (socket) {
- util.log("closing socket");
- socket.removeListener("error", reopen);
- socket.removeListener("close", reopen);
- socket.close();
- socket = null;
- }
- }
-
- function closeWhenDone() {
- closing = true;
- if (socket) {
- if (!socket.bytesWaitingToFlush) close();
- else setTimeout(closeWhenDone, 1000);
- }
- }
-
- function open() {
- timeout = 0;
- close();
- util.log("opening socket: " + url);
- var client = new websocket.client();
- client.on("connect", function(connection) {
- socket = connection;
- socket.on("message", log);
- socket.on("error", reopen);
- socket.on("close", reopen);
- flush();
- if (closing) closeWhenDone();
- });
- client.on("connectFailed", reopen);
- client.on("error", reopen);
- client.connect(url);
- }
-
- function reopen() {
- if (!timeout && !closing) {
- util.log("reopening soon");
- timeout = setTimeout(open, 1000);
- }
- }
-
- function flush() {
- var event;
- while (event = queue.pop()) {
- try {
- socket.sendUTF(JSON.stringify(event));
- } catch (e) {
- util.log(e.stack);
- reopen();
- return queue.push(event);
- }
- }
- }
-
- function log(message) {
- util.log(message.utf8Data);
- }
-
- emitter.open = function(host, port) {
- url = "ws://" + host + ":" + port + "/1.0/event/put";
- open();
- return emitter;
- };
-
- emitter.send = function(event) {
- queue.push(event);
- if (socket) flush();
- return emitter;
- };
-
- emitter.close = function() {
- closeWhenDone();
- return emitter;
- };
-
- return emitter;
+ url = require("url"),
+ udp = require("./emitter-udp"),
+ ws = require("./emitter-ws");
+
+module.exports = function(u) {
+ var emitter;
+ u = url.parse(u);
+ switch (u.protocol) {
+ case "udp:": emitter = udp; break;
+ case "ws:": case "wss:": emitter = ws; break;
+ }
+ return emitter(u.protocol, u.hostname, u.port);
};
View
@@ -1,6 +1,7 @@
var util = require("util"),
url = require("url"),
http = require("http"),
+ dgram = require("dgram"),
websocket = require("websocket"),
websprocket = require("websocket-server"),
static = require("node-static"),
@@ -157,8 +158,20 @@ module.exports = function(options) {
meta = require("./event").putter(db);
util.log("starting http server on port " + options["http-port"]);
primary.listen(options["http-port"]);
+ if (endpoints.udp) {
+ util.log("starting udp server on port " + options["udp-port"]);
+ var udp = dgram.createSocket("udp4");
+ udp.on("message", function(message) {
+ endpoints.udp(JSON.parse(message.toString("utf8")), ignore);
+ });
+ udp.bind(options["udp-port"]);
+ }
}
};
return server;
};
+
+function ignore() {
+ // Responses for UDP are ignored; there's nowhere for them to go!
+}

0 comments on commit 1918af1

Please sign in to comment.