Permalink
Browse files

begin implementing bosh_server + bosh-test

  • Loading branch information...
1 parent c5da434 commit ba177a8577752b01de9edf3c5b3cb8f2450fc562 @astro astro committed Mar 25, 2012
Showing with 238 additions and 13 deletions.
  1. +1 −0 lib/node-xmpp.js
  2. +18 −12 lib/xmpp/bosh.js
  3. +144 −0 lib/xmpp/bosh_server.js
  4. +4 −1 lib/xmpp/c2s.js
  5. +71 −0 test/bosh-test.js
View
@@ -18,3 +18,4 @@ exports.Message = Stanza.Message;
exports.Presence = Stanza.Presence;
exports.Iq = Stanza.Iq;
exports.Router = Router.Router;
+exports.BOSHServer = require('./xmpp/bosh_server').BOSHServer;
View
@@ -5,7 +5,7 @@ if (process.title === 'browser')
request = require('browser-request');
else {
var requestPath = 'request';
- require(requestPath);
+ request = require(requestPath);
}
var ltx = require('ltx');
@@ -126,22 +126,28 @@ BOSHConnection.prototype.request = function(attrs, children, cb) {
body: boshEl.toString()
});
+ exports.parseBody(req, function(e, bodyEl) {
+ if (e)
+ console.error(e.stack || e);
+ else
+ process.nextTick(function() {
+ that.mayRequest();
+ });
+
+ that.currentRequests--;
+ cb(e, bodyEl);
+ });
+};
+
+exports.parseBody = function(stream, cb) {
var parser = new ltx.Parser();
- req.on('data', function(data) {
- console.log("<<",data.toString());
+ stream.on('data', function(data) {
parser.write(data);
});
- req.on('end', function() {
- that.currentRequests--;
+ stream.on('end', function() {
parser.end();
- process.nextTick(function() {
- that.mayRequest();
- });
});
- req.on('error', function(e) {
- // TODO
- console.log("req error", e);
- that.currentRequests--;
+ stream.on('error', function(e) {
cb(e);
});
parser.on('tree', function(bodyEl) {
View
@@ -0,0 +1,144 @@
+var EventEmitter = require('events').EventEmitter;
+var util = require('util');
+var ltx = require('ltx');
+var BOSH = require('./bosh');
+
+function BOSHServer() {
+ this.sessions = {};
+}
+
+util.inherits(BOSHServer, EventEmitter);
+exports.BOSHServer = BOSHServer;
+
+/**
+ * *YOU* need to check the path before passing to this function.
+ */
+BOSHServer.prototype.handleHTTP = function(req, res) {
+ var that = this;
+
+ if (req.method === 'POST') {
+ BOSH.parseBody(req, function(error, bodyEl) {
+ if (error || !bodyEl) {
+ res.writeHead(400, { 'Content-Type': "text/plain" });
+ res.end(error.message || error.stack || "Error");
+ return;
+ }
+
+ var session;
+ if (bodyEl.attrs.sid) {
+ session = that.sessions[bodyEl.attrs.sid];
+ if (session) {
+ session.handleHTTP({ req: req, res: res, bodyEl: bodyEl });
+ } else {
+ res.writeHead(404, { 'Content-Type': "text/plain" });
+ res.end("BOSH session not found");
+ }
+ } else {
+ /* No sid: create session */
+ do {
+ session = new BOSHServerSession({ req: req, res: res, bodyEl: bodyEl });
+ } while(that.sessions.hasOwnProperty(session.sid));
+ that.sessions[session.sid] = session;
+ /* Hook for destruction */
+ session.on('close', function() {
+ delete that.sessions[session.sid];
+ });
+ that.emit('connect', session);
+ }
+ });
+ } else if (false && req.method === 'PROPFIND') {
+ /* TODO */
+ } else {
+ res.writeHead(400);
+ res.end();
+ }
+};
+
+function generateSid() {
+ var sid = "";
+ for(var i = 0; i < 32; i++) {
+ sid += String.fromCharCode(48 + Math.floor(Math.random() * 10));
+ }
+ return sid;
+}
+
+function BOSHServerSession(opts) {
+ this.sid = generateSid();
+ this.nextRid = parseInt(opts.bodyEl.attrs.rid, 10);
+ this.inQueue = {};
+ this.outQueue = [];
+ this.stanzaQueue = [];
+
+ this.respond(opts.res, { sid: this.sid });
+
+ // Let someone hook to 'connect' event first
+ var that = this;
+ process.nextTick(function() {
+ console.log("BOSH streamStart", opts.bodyEl.attrs);
+ that.emit('streamStart', opts.bodyEl.attrs);
+ });
+}
+util.inherits(BOSHServerSession, EventEmitter);
+
+BOSHServerSession.prototype.handleHTTP = function(opts) {
+ if (this.inQueue.hasOwnProperty(opts.bodyEl.attrs.rid))
+ throw 'TODO';
+
+ this.inQueue[opts.bodyEl.attrs.rid] = opts;
+ this.workInQueue();
+};
+
+BOSHServerSession.prototype.workInQueue = function() {
+ if (!this.inQueue.hasOwnProperty(this.nextRid))
+ // Still waiting for next rid request
+ return;
+
+ var that = this;
+ var opts = this.inQueue[this.nextRid];
+ delete this.inQueue[this.nextRid];
+ this.nextRid++;
+
+ opts.bodyEl.children.forEach(function(stanza) {
+ that.emit('stanza', stanza);
+ });
+
+ this.outQueue.push(opts);
+
+ process.nextTick(function() {
+ that.workOutQueue();
+ that.workInQueue();
+ });
+};
+
+BOSHServerSession.prototype.workOutQueue = function() {
+ if (this.stanzaQueue.length < 1 || this.outQueue.length < 1)
+ return;
+
+ var stanzas = this.stanzaQueue;
+ this.stanzaQueue = [];
+ var opts = this.outQueue.shift();
+
+ this.respond(opts.res, {}, stanzas);
+};
+
+BOSHServerSession.prototype.send = function(stanza) {
+ console.log("Q", stanza.root().toString());
+ this.stanzaQueue.push(stanza.root());
+
+ var that = this;
+ process.nextTick(function() {
+ that.workOutQueue();
+ });
+};
+
+BOSHServerSession.prototype.respond = function(res, attrs, children) {
+ res.writeHead(200, { 'Content-Type': "application/xml; charset=utf-8" });
+ var bodyEl = new ltx.Element('body', attrs);
+ if (children)
+ children.forEach(bodyEl.cnode.bind(bodyEl));
+ console.log(">> ", bodyEl.toString());
+ bodyEl.write(function(s) {
+ res.write(s);
+ });
+ res.end();
+};
View
@@ -49,6 +49,8 @@ C2SServer.prototype.acceptConnection = function(socket) {
};
+// TODO: must accept a Connection or BOSHSession instead of socket;
+// remove inheritance here too.
function C2SStream(socket, server) {
var self = this;
this.authenticated = false;
@@ -74,6 +76,7 @@ function C2SStream(socket, server) {
return self;
};
util.inherits(C2SStream, Connection.Connection);
+exports.C2SStream = C2SStream;
C2SStream.prototype.startStream = function(streamAttrs) {
var attrs = {};
@@ -94,7 +97,7 @@ C2SStream.prototype.startStream = function(streamAttrs) {
attrs.id = this.streamId;
- attrs.from = this.server.options.domain;
+ attrs.from = this.domain;
var el = new ltx.Element('stream:stream', attrs);
// make it non-empty to cut the closing tag
View
@@ -0,0 +1,71 @@
+var vows = require('vows'),
+assert = require('assert'),
+http = require('http'),
+xmpp = require('./../lib/xmpp');
+C2SStream = require('./../lib/xmpp/c2s').C2SStream;
+
+const BOSH_PORT = 45580;
+
+vows.describe('BOSH client/server').addBatch({
+ 'client': {
+ topic: function() {
+ var that = this;
+ this.sv = new xmpp.BOSHServer();
+ http.createServer(function(req, res) {
+ that.sv.handleHTTP(req, res);
+ }).listen(BOSH_PORT);
+ this.sv.on('connect', function(svcl) {
+ that.svcl = svcl;
+ that.c2s = new C2SStream(svcl);
+ that.c2s.on('authenticate', function(opts, cb) {
+ cb();
+ });
+ });
+ this.cl = new xmpp.Client({
+ jid: 'test@example.com',
+ password: 'test',
+ boshURL: "http://localhost:" + BOSH_PORT
+ });
+ var cb = this.callback;
+ this.cl.on('online', function() {
+ cb();
+ });
+ },
+ "logged in": function() {},
+ 'can send stanzas': {
+ topic: function() {
+ var cb = this.callback;
+ this.svcl.once('stanza', function(stanza) {
+ cb(null, stanza);
+ });
+ this.cl.send(new xmpp.Message({ to: "foo@bar.org" }).
+ c('body').t("Hello"));
+ },
+ "received proper message": function(stanza) {
+ assert.ok(stanza.is('message'), "Message stanza");
+ assert.equal(stanza.attrs.to, "foo@bar.org");
+ assert.equal(stanza.getChildText('body'), "Hello");
+ }
+ },
+ 'can receive stanzas': {
+ topic: function() {
+ var cb = this.callback;
+ this.cl.once('stanza', function(stanza) {
+ cb(null, stanza);
+ });
+ this.svcl.send(new xmpp.Message({ to: "bar@bar.org" }).
+ c('body').t("Hello back"));
+ },
+ "received proper message": function(stanza) {
+ assert.ok(stanza.is('message'), "Message stanza");
+ assert.equal(stanza.attrs.to, "bar@bar.org");
+ assert.equal(stanza.getChildText('body'), "Hello back");
+ }
+ }
+ },
+
+ 'client fails login': "pending",
+
+ 'auto reconnect': "pending"
+
+}).export(module);

0 comments on commit ba177a8

Please sign in to comment.