Permalink
Browse files

multiple nodes per user

  • Loading branch information...
Liam
Liam committed Aug 9, 2011
1 parent a43a3f2 commit 08c5a68688d58ff1e376fa67a582737ca2bedeb9
Showing with 83 additions and 63 deletions.
  1. +6 −4 Readme
  2. +4 −4 mqclient.js
  3. +41 −33 mqlib.js
  4. +29 −19 mqserver.js
  5. +3 −3 mqtest.js
View
10 Readme
@@ -6,12 +6,13 @@ License http://www.gnu.org/licenses/agpl-3.0.html
https://github.com/networkimprov/websocket.MQ
-A reliable message queue service for websocket-connected clients.
+A reliable message queue service for socket-connected clients.
The initial goal is to enable messaging among Node.js instances on the public net.
Clients' connections are assumed to be potentially unreliable.
As with email, each message is posted with a list of recipients.
A user may have multiple aliases; a single user may be sent a short message via an alias.
+A user may have multiple client "nodes"; messages are queued for all nodes.
Distribution lists are supported.
On disk, messages are stored in separate files, within transient directories for each recipient.
@@ -23,14 +24,15 @@ Components
wsstream - websocket stream module
Dependencies
- Node.js 0.2.x https://github.com/ry/node
+ Node.js 0.2.x https://github.com/joyent/node
Usage
$ node mqserver.js & # run server in background
$ node mqtest.js # run test client; stop with ctl-C
$ node mqserver.js stop # stop server
Message format
+ messages are delimited with websocket-style frames
f0f {"op":"type" ...}data
1-4 hex digits, padded with 0-3 spaces, giving the size of the JSON metadata object
JSON metadata
@@ -42,8 +44,8 @@ Message op types
Todo
Implement user registration with email confirmation
- SQLite-based user database
- Browser-compatible client library
+ Redis(?) user database
+ Manage inactive nodes/users
SSL
Email gateway with SMTP client and LMTP server
View
@@ -106,13 +106,13 @@ MqClient.prototype = {
return this.socket.readable && this.socket.writable;
} ,
- register: function(iNode, iPassword, iAliases) {
- var aMsg = packMsg({op:'register', nodeid:iNode, password:iPassword, aliases:iAliases});
+ register: function(iUid, iNewNode, iPrevNode, iAliases) {
+ var aMsg = packMsg({op:'register', userId:iUid, newNode:iNewNode, prevNode:iPrevNode, aliases:iAliases});
this.ws.write(1, 'binary', aMsg);
} ,
- login: function(iNode, iPass) {
- var aMsg = packMsg({op:'login', nodeid:iNode, password:iPass});
+ login: function(iUid, iNode) {
+ var aMsg = packMsg({op:'login', userId:iUid, nodeId:iNode});
this.ws.write(1, 'binary', aMsg);
} ,
View
@@ -391,14 +391,15 @@ function Link(iConn) {
that.timeout();
}, 4000, this);
this.conn = iConn;
+ this.uid = null;
this.node = null;
}
Link.prototype = {
params: {
- register: { nodeid:'string', password:'string', aliases:'string' },
- login: { nodeid:'string', password:'string' },
+ register: { userId:'string', newNode:'string', prevNode:'string', aliases:'string' },
+ login: { userId:'string', nodeId:'string' },
listEdit: { id:'string', to:'string', type:'string', member:'string' },
//listRenew:{ id:'string', to:'string', list:'object' },
post: { id:'string', to:'object' },
@@ -451,7 +452,7 @@ Link.prototype = {
handle_register: function(iReq) {
var that = this;
- sRegSvc[this.node ? 'reregister' : 'register'](iReq.nodeid, iReq.password, iReq.aliases, function(err, aliases) {
+ sRegSvc[this.node ? 'reregister' : 'register'](iReq.userId, iReq.newNode, iReq.prevNode, iReq.aliases, function(err, aliases) {
if (!that.conn)
return;
if (err) {
@@ -464,36 +465,32 @@ Link.prototype = {
handle_login: function(iReq) {
var that = this;
- sRegSvc.verify(iReq.nodeid, iReq.password, function(err, ok) {
+ sRegSvc.verify(iReq.userId, iReq.nodeId, function(err, ok) {
if (!that.conn)
return;
- if (err || !ok) {
- that.conn.write(1, 'binary', makeMsg({op:'quit', info:'invalid login'}));
+ var aNode = iReq.userId+iReq.nodeId;
+ if (err || !ok) var aErr = 'invalid login';
+ else if (aNode in sActive) var aErr = 'node already active';
+ else if (sShutdown) var aErr = 'shutdown';
+ if (aErr) {
+ that.conn.write(1, 'binary', makeMsg({op:'quit', info:aErr}));
that.conn.close();
return;
}
- that._activate(iReq.nodeid, 'ok login');
+ clearTimeout(that.loginTimer);
+ that.loginTimer = null;
+ that.uid = iReq.userId;
+ that.node = aNode;
+ sActive[aNode] = that;
+ that.conn.write(1, 'binary', makeMsg({op:'info', info:'ok login'}));
+ startQueue(aNode);
});
} ,
- _activate: function(iNode, iAck) {
- if (iNode in sActive || sShutdown) {
- this.conn.write(1, 'binary', makeMsg({op:'quit', info:(sShutdown ? 'shutdown' : 'login already active')}));
- this.conn.close();
- return;
- }
- clearTimeout(this.loginTimer);
- this.loginTimer = null;
- this.node = iNode;
- sActive[iNode] = this;
- this.conn.write(1, 'binary', makeMsg({op:'info', info:iAck}));
- startQueue(iNode);
- } ,
-
handle_listEdit: function(iReq, iBuf) {
switch (iReq.type) {
- case 'add': sRegSvc.listAdd(iReq.to, this.node, iReq.member, aComplete); break;
- case 'remove': sRegSvc.listRemove(iReq.to, this.node, iReq.member, aComplete); break;
+ case 'add': sRegSvc.listAdd(iReq.to, this.uid, iReq.member, aComplete); break;
+ case 'remove': sRegSvc.listRemove(iReq.to, this.uid, iReq.member, aComplete); break;
default: aComplete(new Error('invalid listEdit type: '+iReq.type));
}
var that = this;
@@ -543,7 +540,7 @@ Link.prototype = {
if (iReq.to[a] === NaN || iReq.to[a] < 2 || iReq.to[a] > 3)
continue;
++aCbCount;
- sRegSvc.listLookup(a, that.node, aSend);
+ sRegSvc.listLookup(a, that.uid, aSend);
}
if (aCbCount === 0)
that._postSend(iReq, iBuf);
@@ -554,7 +551,7 @@ Link.prototype = {
else aCbErr.message += ', '+err.message;
} else {
for (var a in members)
- if (a !== that.node || iReq.to[list] === 3)
+ if (a !== that.uid || iReq.to[list] === 3)
iReq.to[a] = members[a];
}
delete iReq.to[list];
@@ -570,7 +567,7 @@ Link.prototype = {
_postSend: function(iReq, iBuf) {
var that = this;
var aId = this._makeId();
- var aMsg = makeMsg({op:'deliver', id:aId, from:that.node, etc:iReq.etc}, iBuf);
+ var aMsg = makeMsg({op:'deliver', id:aId, from:that.uid, etc:iReq.etc}, iBuf);
fs.open(sTempDir+aId, 'w', 0600, function(err, fd) {
if (err) return that._ackFail(iReq.id, err);
writeAll(fd, aMsg, function(err) { // attempt write to temp
@@ -579,17 +576,28 @@ Link.prototype = {
fs.close(fd);
if (err) return that._ackFail(iReq.id, err);
sMsgCache.put(aId, aMsg);
- var aToCount = 0;
- for (var a in iReq.to) {
+ var aTo = {}, aToCount = 0;
+ for (var aUid in iReq.to) {
++aToCount;
- queueItem(a, aId, aCb);
+ sRegSvc.getNodes(aUid, fUidCb);
}
- function aCb() {
+ function fUidCb(err, uid, list) {
+ if (err) throw err;
+ for (var aN in list)
+ aTo[uid+aN] = list[aN];
if (--aToCount > 0)
return;
- if (that.conn)
- that.conn.write(1, 'binary', makeMsg({op:'ack', type:'ok', id:iReq.id}));
- fs.unlink(sTempDir+aId, noop);
+ for (var aN in aTo) {
+ ++aToCount;
+ queueItem(aN, aId, fToCb);
+ }
+ function fToCb() {
+ if (--aToCount > 0)
+ return;
+ if (that.conn)
+ that.conn.write(1, 'binary', makeMsg({op:'ack', type:'ok', id:iReq.id}));
+ fs.unlink(sTempDir+aId, noop);
+ }
}
});
});
View
@@ -98,15 +98,6 @@ function main(argv) {
});
});
- fs.writeFileSync(sPid, process.pid.toString());
- process.on('SIGINT', function() {
- fs.unlink(sPid, noop);
- aServer.close();
- mq.quit();
- });
-
- aServer.listen(8008);
-
var aHttp = http.createServer(function(req, res) {
if (req.method.toLowerCase() === 'post') {
var aUrl = url.parse(req.url, true);
@@ -135,6 +126,16 @@ function main(argv) {
res.end(aHtml);
}
});
+
+ fs.writeFileSync(sPid, process.pid.toString());
+ process.on('SIGINT', function() {
+ fs.unlink(sPid, noop);
+ aHttp.close();
+ aServer.close();
+ mq.quit();
+ });
+
+ aServer.listen(8008);
aHttp.listen(8080);
}
@@ -150,12 +151,14 @@ function RegDb(iFileName) {
RegDb.prototype = {
- register: function(iUid, iPassword, iAliases, iCallback, iReReg) {
+ register: function(iUid, iNewNode, iPrevNode, iAliases, iCallback, iReReg) {
var aHas = iUid in this.db.uid;
if (!iReReg && aHas || iReReg && !aHas)
var aErr = aHas ? 'user exists' : 'no such user';
- else if (!aHas && !iPassword)
- var aErr = 'password required';
+ else if (!aHas && !iNewNode)
+ var aErr = 'new nodename required';
+ else if (aHas && iNewNode && !(iPrevNode in this.db.uid[iUid].nodes))
+ var aErr = 'prev nodename invalid';
if (aErr) {
process.nextTick(function() { iCallback(new Error(aErr)) });
return;
@@ -164,9 +167,9 @@ RegDb.prototype = {
for (var a=0; a < this.db.uid[iUid].aliases.length; ++a)
delete this.db.alias[this.db.uid[iUid].aliases[a]];
if (!aHas)
- this.db.uid[iUid] = {};
- if (iPassword)
- this.db.uid[iUid].password = iPassword;
+ this.db.uid[iUid] = { nodes:{}, aliases:null };
+ if (iNewNode)
+ this.db.uid[iUid].nodes[iNewNode] = true;
if (iAliases) {
var aAccept = iAliases.split(/\s+/);
for (var a=aAccept.length-1; a >= 0; --a) {
@@ -184,8 +187,8 @@ RegDb.prototype = {
});
} ,
- reregister: function(iUid, iPassword, iAliases, iCallback) {
- this.register(iUid, iPassword, iAliases, iCallback, true);
+ reregister: function(iUid, iNewNode, iPrevNode, iAliases, iCallback) {
+ this.register(iUid, iNewNode, iPrevNode, iAliases, iCallback, true);
} ,
remove: function(iUid) {
@@ -200,10 +203,17 @@ RegDb.prototype = {
return true;
} ,
- verify: function(iUid, iPassword, iCallback) {
+ verify: function(iUid, iNode, iCallback) {
+ var that = this;
+ process.nextTick(function() {
+ iCallback(null, iUid in that.db.uid && iNode in that.db.uid[iUid].nodes);
+ });
+ } ,
+
+ getNodes: function(iUid, iCallback) {
var that = this;
process.nextTick(function() {
- iCallback(null, iUid in that.db.uid && that.db.uid[iUid].password === iPassword);
+ iCallback(that.db.uid[iUid] ? null : new Error('no such uid'), iUid, that.db.uid[iUid] && that.db.uid[iUid].nodes);
});
} ,
View
@@ -28,7 +28,7 @@ function Testconn(iId, iReg) {
this.client.on('registered', function(aliases) {
that.reg = true;
console.log(that.id+' registered '+aliases);
- that.client.login(that.id, 'password');
+ that.client.login(that.id, 'node'+that.id);
});
this.client.on('info', function(msg) {
console.log(that.id+' '+msg);
@@ -94,9 +94,9 @@ function testLink(aC, iState) {
aC.client.connect('localhost', 8008, function() {
aC.open = true;
if (aC.reg)
- aC.client.login(aC.id, 'password');
+ aC.client.login(aC.id, 'node'+aC.id);
else
- aC.client.register(aC.id, 'password', 'alias'+aC.id);
+ aC.client.register(aC.id, 'node'+aC.id, 'prevnode', 'alias'+aC.id);
setTimeout(testLink, (Date.now()%10+1)*1000, aC, iState+1);
});
break;

0 comments on commit 08c5a68

Please sign in to comment.