Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

clean: give member functions fully-specified names; correct name pref…

…ixes; remove Link#timeout()
  • Loading branch information...
commit f0f0ef02f74a4ec75d22ef817809f643767dc68d 1 parent e5c2c20
Liam authored
Showing with 139 additions and 148 deletions.
  1. +30 −33 mqclient.js
  2. +89 −91 mqlib.js
  3. +20 −24 mqserver.js
63 mqclient.js
View
@@ -83,10 +83,10 @@ function MqClient() {
this.ws.on('data', function(frame, buf) {
try {
var aReq = unpackMsg(buf);
- if (typeof aReq.op !== 'string' || typeof that.sParams[aReq.op] === 'undefined')
+ if (typeof aReq.op !== 'string' || typeof that.kParams[aReq.op] === 'undefined')
throw new Error('invalid request op: '+aReq.op);
- for (var a in that.sParams[aReq.op])
- if (typeof aReq[a] !== that.sParams[aReq.op][a])
+ for (var a in that.kParams[aReq.op])
+ if (typeof aReq[a] !== that.kParams[aReq.op][a])
throw new Error(aReq.op+' request missing param '+a);
} catch (err) {
that.event_error(err);
@@ -113,9 +113,7 @@ function MqClient() {
MqClient.packMsg = packMsg;
MqClient.unpackMsg = unpackMsg;
-MqClient.prototype = {
-
- sParams: {
+ MqClient.prototype.kParams = {
registered: { },
added: { },
listEdited: { id:'string', from:'string', etc:'object' },
@@ -123,71 +121,70 @@ MqClient.prototype = {
ack: { id:'string', type:'string' },
info: { info:'string' },
quit: { info:'string' }
- } ,
+ };
- event_error: function(msg) { throw new Error(msg) } ,
+ MqClient.prototype.event_error = function(msg) { throw new Error(msg) };
- connect: function(iHost, iPort, iCallback) {
+ MqClient.prototype.connect = function(iHost, iPort, iCallback) {
this.host = iHost;
this.port = iPort || 8008;
this.on('connect', iCallback);
this.socket.connect(this.port, this.host);
- } ,
+ };
- close: function() {
+ MqClient.prototype.close = function() {
this.ws.end();
//this.socket.destroy();
- } ,
+ };
- isOpen: function() {
+ MqClient.prototype.isOpen = function() {
return this.socket.readable && this.socket.writable;
- } ,
+ };
- register: function(iUid, iNewNode, iAliases) {
+ MqClient.prototype.register = function(iUid, iNewNode, iAliases) {
var aMsg = packMsg({op:'register', userId:iUid, newNode:iNewNode, aliases:iAliases});
this.ws.write(1, 'binary', aMsg);
- } ,
+ };
- addNode: function(iUid, iNewNode, iPrevNode) {
+ MqClient.prototype.addNode = function(iUid, iNewNode, iPrevNode) {
var aMsg = packMsg({op:'addNode', userId:iUid, newNode:iNewNode, prevNode:iPrevNode});
this.ws.write(1, 'binary', aMsg);
- } ,
+ };
- login: function(iUid, iNode) {
+ MqClient.prototype.login = function(iUid, iNode) {
var aMsg = packMsg({op:'login', userId:iUid, nodeId:iNode});
this.ws.write(1, 'binary', aMsg);
- } ,
+ };
- listEdit: function(iTo, iType, iMember, iId, iEtc) {
+ MqClient.prototype.listEdit = function(iTo, iType, iMember, iId, iEtc) {
var aMsg = packMsg({op:'listEdit', to:iTo, type:iType, member:iMember, id:iId, etc:iEtc}, null);
this.ws.write(1, 'binary', aMsg);
- } ,
+ };
- post: function(iToList, iMsg, iId, iEtc) {
+ MqClient.prototype.post = function(iToList, iMsg, iId, iEtc) {
var aMsg = packMsg({op:'post', to:iToList, id:iId, etc:iEtc}, iMsg);
this.ws.write(1, 'binary', aMsg);
- } ,
+ };
- ping: function(iAlias, iId, iEtc) {
+ MqClient.prototype.ping = function(iAlias, iId, iEtc) {
var aMsg = packMsg({op:'ping', alias:iAlias, id:iId, etc:iEtc});
this.ws.write(1, 'binary', aMsg);
- } ,
+ };
- ack: function(iId, iType) {
+ MqClient.prototype.ack = function(iId, iType) {
var aMsg = packMsg({op:'ack', id:iId, type:iType});
this.ws.write(1, 'binary', aMsg);
- } ,
+ };
- send: function(iPackedMsg) {
+ MqClient.prototype.send = function(iPackedMsg) {
this.ws.write(1, 'binary', iPackedMsg);
- } ,
+ };
- on: function(iEvt, iFn) {
+ MqClient.prototype.on = function(iEvt, iFn) {
if (typeof iEvt !== 'string')
throw new Error('not a string '+iEvt);
if (typeof iFn !== 'function')
throw new Error('not a function '+iFn);
this['event_'+iEvt] = iFn;
- }
-};
+ };
180 mqlib.js
View
@@ -16,6 +16,7 @@ var sActive = {}; // connections indexed by nodeid
var sSingleNodeMsg = {}; // queues sending a single-node message, indexed by message id
var sShutdown = false;
+
module.exports.init = function(iMainDir, iRegSvc) {
sMainDir = iMainDir+'/';
sTempDir = sMainDir+'temp/';
@@ -44,6 +45,7 @@ module.exports.Link = Link;
module.exports.packMsg = packMsg; // for in-process testing
+
function storeFile(iPath, iBuf, iCallback) {
fs.open(iPath, 'w', 0600, function(err, fd) {
if (err)
@@ -101,13 +103,15 @@ function getSub(iNode) {
function noop(err) { if (err) throw err; }
+
var sLock = {
- rsrc: {},
+ rsrc: {}
+};
- read: function(iId, iFn) { return this._lock(iId, iFn, 'read', 'write') } ,
- write: function(iId, iFn) { return this._lock(iId, iFn, 'write', 'read') } ,
+ sLock.read = function(iId, iFn) { return this._lock(iId, iFn, 'read', 'write') };
+ sLock.write = function(iId, iFn) { return this._lock(iId, iFn, 'write', 'read') };
- _lock: function(iId, iFn, iA, iB) {
+ sLock._lock = function(iId, iFn, iA, iB) {
if (!this.rsrc[iId])
this.rsrc[iId] = {};
if (!this.rsrc[iId][iB]) {
@@ -120,9 +124,9 @@ var sLock = {
this.rsrc[iId].queue = [];
this.rsrc[iId].queue.push(iFn);
return false;
- } ,
+ };
- free: function(iId) {
+ sLock.free = function(iId) {
var aType = this.rsrc[iId].read ? 'read' : 'write';
if (--this.rsrc[iId][aType] > 0)
return;
@@ -133,8 +137,8 @@ var sLock = {
} else {
delete this.rsrc[iId];
}
- }
-};
+ };
+
function _root(iId) { return iId.slice(-3) === 'one' ? iId.slice(0,-3) : iId }
function _isSingle(iId) { return iId.slice(-3) === 'one' }
@@ -181,7 +185,7 @@ function _newQueue(iUid, ioArray, iPrior) {
return ioArray;
}
-function _deleteQueue(iNode) {
+function deleteQueue(iNode) {
if (sQueues[iNode].timer)
throw new Error('delete of active queue');
if (sQueues[iNode].length === 0)
@@ -376,17 +380,16 @@ function LList() {
this.tail = null;
}
-LList.prototype = {
- append: function(iObj) {
+ LList.prototype.append = function(iObj) {
iObj._prev = this.tail;
iObj._next = null;
if (this.tail)
this.tail = this.tail._next = iObj;
else
this.head = this.tail = iObj;
- } ,
+ };
- remove: function(iItem) {
+ LList.prototype.remove = function(iItem) {
if (iItem._prev)
iItem._prev._next = iItem._next;
if (iItem._next)
@@ -397,52 +400,53 @@ LList.prototype = {
this.tail = iItem._prev;
delete iItem._prev;
delete iItem._next;
- }
-}
+ };
-// Linked list of inactive queues
-var sQuiet = {
+
+var sQuiet = { // Linked list of inactive queues
list: new LList(),
- timer: null,
+ timer: null
+};
- append: function(iNode) {
+ sQuiet.append = function(iNode) {
var aI = { node:iNode, lastOn:Date.now() };
this.list.append(aI);
if (!this.timer)
this.timer = setTimeout(function(){sQuiet._clean()}, sQuietCleanPeriod);
return aI;
- } ,
+ };
- remove: function(iItem) {
+ sQuiet.remove = function(iItem) {
this.list.remove(iItem);
if (!this.list.head) {
clearTimeout(this.timer);
this.timer = null;
}
- } ,
+ };
- stopClean: function() {
+ sQuiet.stopClean = function() {
if (this.timer)
clearTimeout(this.timer);
this.timer = true;
- } ,
+ };
- _clean: function() {
+ sQuiet._clean = function() {
var aCutoff = Date.now() - 15*1000; /// sQuietHoursMax * 60*60*1000;
while (this.list.head && this.list.head.lastOn < aCutoff) {
- _deleteQueue(this.list.head.node);
+ deleteQueue(this.list.head.node);
this.list.remove(this.list.head);
}
this.timer = this.list.head ? setTimeout(function(){sQuiet._clean()}, sQuietCleanPeriod) : null;
- }
-}; // sQuiet
+ };
+
var sMsgCache = {
cache: {}, // indexed by file id
list: new LList(), // ordered by add order
- size: 0,
+ size: 0
+};
- get: function(iNode, iId, iCallback) {
+ sMsgCache.get = function(iNode, iId, iCallback) {
if (this.cache[iId].msg) {
var aMsg = this.cache[iId].msg;
process.nextTick(function() { iCallback(aMsg) });
@@ -455,7 +459,8 @@ var sMsgCache = {
this.cache[iId].wait = {};
this.cache[iId].wait[iNode] = iCallback;
var aWait = this.cache[iId].wait;
- function aRead(queue) {
+ fRead(iNode);
+ function fRead(queue) {
fs.readFile(getPath(queue)+'/'+iId, function(err, data) {
if (err && err.errno !== process.ENOENT) throw err;
if (!(iId in sMsgCache.cache)) {
@@ -467,7 +472,7 @@ var sMsgCache = {
aWait[queue](null);
delete aWait[queue];
for (var a in aWait)
- return aRead(a);
+ return fRead(a);
} else {
sMsgCache._put(iId, data);
for (var a in aWait)
@@ -476,16 +481,15 @@ var sMsgCache = {
delete sMsgCache.cache[iId].wait;
});
}
- aRead(iNode);
- } ,
+ };
- add: function(iId, iMsg) {
+ sMsgCache.add = function(iId, iMsg) {
if (iId in this.cache) throw new Error('item already in cache');
this.cache[iId] = { count:0, msg:null };
this._put(iId, iMsg);
- } ,
+ };
- _put: function(iId, iMsg) {
+ sMsgCache._put = function(iId, iMsg) {
if (iMsg.length > sMsgCacheMax/10)
return;
this.cache[iId].msg = iMsg;
@@ -493,23 +497,23 @@ var sMsgCache = {
this.size += iMsg.length;
if (this.size > sMsgCacheMax)
process.nextTick(function() { sMsgCache.clean() });
- } ,
+ };
- zero: function(iId) {
+ sMsgCache.zero = function(iId) {
if (!this.cache[iId].msg)
return;
this.size -= this.cache[iId].msg.length;
this.cache[iId].msg = new Buffer(0);
- } ,
+ };
- link: function(iId) {
+ sMsgCache.link = function(iId) {
if (iId in this.cache)
++this.cache[iId].count;
else
this.cache[iId] = { count:1, msg:null };
- } ,
+ };
- unlink: function(iId) {
+ sMsgCache.unlink = function(iId) {
if (--this.cache[iId].count > 0)
return;
if (this.cache[iId].msg) {
@@ -517,31 +521,30 @@ var sMsgCache = {
this.list.remove(this.cache[iId]);
}
delete this.cache[iId];
- } ,
+ };
- clean: function() {
+ sMsgCache.clean = function() {
while (this.list.head && this.size > sMsgCacheMax) {
this.size -= this.list.head.msg.length;
this.list.head.msg = null;
this.list.remove(this.list.head);
}
- }
-};
+ };
+
// Connection handler
function Link(iConn) {
- this.loginTimer = setTimeout(function(that) {
- that.loginTimer = null;
- that.timeout();
- }, 6000, this);
this.conn = iConn;
this.uid = null;
this.node = null;
+ this.loginTimer = setTimeout(function(that) {
+ that.loginTimer = null;
+ that.conn.write(1, 'binary', packMsg({op:'quit', info:'close timeout'}));
+ that.conn.close();
+ }, 6000, this);
}
-Link.prototype = {
-
- params: {
+ Link.prototype.kParams = {
register: { userId:'string', newNode:'string', aliases:'string' },
addNode: { userId:'string', newNode:'string', prevNode:'string' },
login: { userId:'string', nodeId:'string' },
@@ -550,11 +553,11 @@ Link.prototype = {
post: { id:'string', to:'object' },
ping: { id:'string', alias:'string' },
ack: { id:'string', type:'string' }
- } ,
+ };
- kQueueOp: { register:'registered', listEdit:'listEdited', ping:'deliver', post:'deliver' } ,
+ Link.prototype.kQueueOp = { register:'registered', listEdit:'listEdited', ping:'deliver', post:'deliver' };
- handleMessage: function(iMsg) {
+ Link.prototype.handleMessage = function(iMsg) {
try {
if (!this.conn)
@@ -566,14 +569,14 @@ Link.prototype = {
var aReq = JSON.parse(iMsg.toString('ascii', 4, aJsEnd));
- if (typeof aReq.op !== 'string' || typeof this.params[aReq.op] === 'undefined')
+ if (typeof aReq.op !== 'string' || typeof this.kParams[aReq.op] === 'undefined')
throw 'invalid request op';
if (!this.node && aReq.op !== 'register' && aReq.op !== 'login' && aReq.op !== 'addNode')
throw 'illegal op on unauthenticated socket';
- for (var a in this.params[aReq.op])
- if (typeof aReq[a] !== this.params[aReq.op][a])
+ for (var a in this.kParams[aReq.op])
+ if (typeof aReq[a] !== this.kParams[aReq.op][a])
throw aReq.op+' request missing param '+a;
if (aReq.op !== 'listEdit' && aReq.op !== 'post' && aReq.op !== 'ping' && iMsg.length > aJsEnd)
@@ -590,14 +593,9 @@ Link.prototype = {
this.conn.write(1, 'binary', packMsg({op:'quit', info:err.message || err}));
this.conn.close();
}
- } ,
-
- timeout: function() {
- this.conn.write(1, 'binary', packMsg({op:'quit', info:'close timeout'}));
- this.conn.close();
- } ,
+ };
- handle_register: function(iReq) {
+ Link.prototype.handle_register = function(iReq) {
var that = this;
sRegSvc[this.node ? 'reregister' : 'register'](iReq.userId, iReq.newNode, null, iReq.aliases, function(err, aliases) {
if (!that.node || err) {
@@ -612,9 +610,9 @@ Link.prototype = {
that.conn.write(1, 'binary', packMsg({op:'registered', error: err ? err.message : toErr}));
});
});
- } ,
+ };
- handle_addNode: function(iReq) {
+ Link.prototype.handle_addNode = function(iReq) {
var that = this;
if (!iReq.newNode) {
that.conn.write(1, 'binary', packMsg({op:'added', error:'new nodename required'}));
@@ -646,9 +644,9 @@ Link.prototype = {
}
});
}
- } ,
+ };
- handle_login: function(iReq) {
+ Link.prototype.handle_login = function(iReq) {
var that = this;
clearTimeout(that.loginTimer);
that.loginTimer = null;
@@ -668,9 +666,9 @@ Link.prototype = {
that.conn.write(1, 'binary', packMsg({op:'info', info:'ok login'}));
startQueue(that.node, that.uid);
});
- } ,
+ };
- handle_listEdit: function(iReq, iBuf) {
+ Link.prototype.handle_listEdit = function(iReq, iBuf) {
var that = this;
var aListMsg = {list:iReq.to, op:iReq.type, date:(new Date).toISOString()};
switch (iReq.type) {
@@ -706,16 +704,17 @@ Link.prototype = {
aTo[iReq.to] = 3;
that.handle_post({op:'listEdit', id:iReq.id, to:aTo, etc:aListMsg}, null);
}
- } ,
+ };
- _ack: function(iId, iErr, iToErr) {
+ Link.prototype._ack = function(iId, iErr, iToErr) {
if (this.conn)
this.conn.write(1, 'binary', packMsg({op:'ack', id:iId, type: iErr ? 'error' : 'ok', error: iErr ? iErr.message : iToErr}));
- } ,
+ };
- sLastId: 0,
- sLastSubId: 1000,
- _makeId: function() {
+ Link.prototype.sLastId = 0;
+ Link.prototype.sLastSubId = 1000;
+
+ Link.prototype._makeId = function() {
var aId = Date.now();
if (aId < this.sLastId)
console.log('system clock went backwards by '+(this.sLastId-aId)+' ms');
@@ -726,9 +725,9 @@ Link.prototype = {
this.sLastId = aId;
this.sLastSubId = 1000;
return aId+'-0';
- } ,
+ };
- handle_post: function(iReq, iBuf) {
+ Link.prototype.handle_post = function(iReq, iBuf) {
var that = this;
var aPoCount = 0, aLsCount = 0, aLsErr, aPoErr;
for (var a in iReq.to) {
@@ -775,9 +774,9 @@ Link.prototype = {
if (--aPoCount === 0)
that._ack(iReq.id, aPoErr && aPoErr.message && aPoErr, aPoErr && aPoErr.message ? undefined : aPoErr);
}
- } ,
+ };
- _postSend: function(iReq, iBuf, iAckErr, iCallback) {
+ Link.prototype._postSend = function(iReq, iBuf, iAckErr, iCallback) {
var that = this;
for (var any in iReq.to) break;
if (!any && iReq.noNodes)
@@ -830,9 +829,9 @@ Link.prototype = {
}
}
});
- } ,
+ };
- handle_ping: function(iReq, iBuf) {
+ Link.prototype.handle_ping = function(iReq, iBuf) {
var that = this;
sRegSvc.lookup(iReq.alias, function(err, node) {
if (err)
@@ -844,14 +843,14 @@ Link.prototype = {
that._ack(iReq.id, err, toErr);
});
});
- } ,
+ };
- handle_ack: function(iReq) {
+ Link.prototype.handle_ack = function(iReq) {
if (iReq.type === 'ok')
deQueueItem(this.node, iReq.id);
- } ,
+ };
- finalize: function() {
+ Link.prototype.finalize = function() {
if (!this.conn) {
console.log('finalize called on finalized Link');
return;
@@ -863,7 +862,6 @@ Link.prototype = {
if (this.loginTimer)
clearTimeout(this.loginTimer);
this.conn = null;
- }
-};
+ };
44 mqserver.js
View
@@ -149,9 +149,7 @@ function RegDb(iFileName) {
this.db = aData ? JSON.parse(aData) : { uid:{}, alias:{}, list:{} };
}
-RegDb.prototype = {
-
- register: function(iUid, iNewNode, iPrevNode, iAliases, iCallback, iReReg) {
+ RegDb.prototype.register = function(iUid, iNewNode, iPrevNode, iAliases, iCallback, iReReg) {
var aHas = iUid in this.db.uid;
if (!iReReg && aHas || iReReg && !aHas)
var aErr = aHas ? 'user exists' : 'no such user';
@@ -204,13 +202,13 @@ RegDb.prototype = {
process.nextTick(function() {
iCallback(null, aAccept && aAccept.join(' '), iNewNode ? that.db.uid[iUid].nodes[iNewNode] : undefined);
});
- } ,
+ };
- reregister: function(iUid, iNewNode, iPrevNode, iAliases, iCallback) {
+ RegDb.prototype.reregister = function(iUid, iNewNode, iPrevNode, iAliases, iCallback) {
this.register(iUid, iNewNode, iPrevNode, iAliases, iCallback, true);
- } ,
+ };
- remove: function(iUid) {
+ RegDb.prototype.remove = function(iUid) {
if (!iUid || !this.db.uid[iUid])
return false;
for (var a=0; a < this.db.uid[iUid].aliases.length; ++a)
@@ -220,18 +218,18 @@ RegDb.prototype = {
delete this.db.uid[iUid];
fs.writeFileSync(this.file, JSON.stringify(this.db), 'ascii');
return true;
- } ,
+ };
- verify: function(iUid, iNode, iCallback) {
+ RegDb.prototype.verify = function(iUid, iNode, iCallback) {
var aErr = !(iUid in this.db.uid) ? new Error('userId unknown')
: !(iNode in this.db.uid[iUid].nodes) ? new Error('nodeId unknown') : null;
var aOffset = aErr ? undefined : this.db.uid[iUid].nodes[iNode];
process.nextTick(function() {
iCallback(aErr, aOffset);
});
- } ,
+ };
- getNodes: function(iUid, iCallback) {
+ RegDb.prototype.getNodes = function(iUid, iCallback) {
var aErr = this.db.uid[iUid] ? null : new Error('userId unknown');
if (this.db.uid[iUid]) {
var aList = {};
@@ -241,22 +239,22 @@ RegDb.prototype = {
process.nextTick(function() {
iCallback(aErr, iUid, aList);
});
- } ,
+ };
- lookup: function(iAlias, iCallback) {
+ RegDb.prototype.lookup = function(iAlias, iCallback) {
var that = this;
process.nextTick(function() {
var aEr = iAlias in that.db.alias ? null : new Error('alias not defined');
iCallback(aEr, that.db.alias[iAlias]);
});
- } ,
+ };
- listInvite: function(iName, iBy, iAlias, iCallback) { this._listMod('in', iName, iBy, iAlias, iCallback); } ,
- listAdd: function(iName, iBy, iMember, iCallback) { this._listMod('ad', iName, iBy, iMember, iCallback); } ,
- listRemove: function(iName, iBy, iMember, iCallback) { this._listMod('rm', iName, iBy, iMember, iCallback); } ,
- //listRenew: function(iName, iBy, iMember, iCallback) { this._listMod('nw', iName, iBy, iMember, iCallback); } ,
+ RegDb.prototype.listInvite = function(iName, iBy, iAlias, iCallback) { this._listMod('in', iName, iBy, iAlias, iCallback) };
+ RegDb.prototype.listAdd = function(iName, iBy, iMember, iCallback) { this._listMod('ad', iName, iBy, iMember, iCallback) };
+ RegDb.prototype.listRemove = function(iName, iBy, iMember, iCallback) { this._listMod('rm', iName, iBy, iMember, iCallback) };
+ //RegDb.prototype.listRenew = function(iName, iBy, iMember, iCallback) { this._listMod('nw', iName, iBy, iMember, iCallback) };
- _listMod: function(iOp, iName, iBy, iMember, iCallback) {
+ RegDb.prototype._listMod = function(iOp, iName, iBy, iMember, iCallback) {
var aHasB, aHasM, aHasL = iName in this.db.list;
switch (iOp) {
case 'rm':
@@ -313,9 +311,9 @@ RegDb.prototype = {
}
fs.writeFileSync(this.file, JSON.stringify(this.db), 'ascii');
process.nextTick(function() { iCallback(null, aReturn) });
- } ,
+ };
- listLookup: function(iName, iBy, iCallback) {
+ RegDb.prototype.listLookup = function(iName, iBy, iCallback) {
var aHasL = iName in this.db.list;
var aHasB = aHasL && iBy in this.db.list[iName];
if (!aHasL || !aHasB) {
@@ -331,9 +329,7 @@ RegDb.prototype = {
process.nextTick(function() {
iCallback(null, iName, aList);
});
- }
-
-}
+ };
main(process.argv);
Please sign in to comment.
Something went wrong with that request. Please try again.