Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

add: single-node message support

  • Loading branch information...
commit 81aff1423d2090b0b3f239fc6bf10cb103b6ea3a 1 parent b023b2d
@networkimprov authored
Showing with 61 additions and 21 deletions.
  1. +61 −21 mqlib.js
View
82 mqlib.js
@@ -8,10 +8,12 @@ var sTempDir;
var sMsgCacheMax = 100000;
var sQuietHoursMax = 28;
var sQuietCleanPeriod = 20*1000;
+var sBytesPerMs = 1000/8;
var sQueues = {}; // array objects indexed by nodeid
var sPending = {}; // pending message counts indexed by uid
var sActive = {}; // connections indexed by nodeid
+var sSingleNodeMsg = {}; // queues sending a single-node message, indexed by message id
var sShutdown = false;
module.exports.init = function(iMainDir, iRegSvc) {
@@ -122,28 +124,40 @@ var sLock = {
}
};
+function _root(iId) { return iId.slice(-3) === 'one' ? iId.slice(0,-3) : iId }
+function _isSingle(iId) { return iId.slice(-3) === 'one' }
+
function _sendNext(iNode) {
if (!(iNode in sActive) || sQueues[iNode].length === 0)
return;
- ++sQueues[iNode].tries;
var aN = sQueues[iNode].next;
if (!sQueues[iNode][aN]) sys.debug(sys.inspect(sQueues[iNode])+' queue '+iNode+' n '+aN+' len '+sQueues[iNode].length);
var aLn = sActive[iNode];
var aId = sQueues[iNode][aN];
- sMsgCache.get(iNode, aId, function(msg) {
+ sMsgCache.get(iNode, _root(aId), function(msg) {
if (!msg && sQueues[iNode][aN] === aId) throw new Error('null msg for '+iNode+' '+aId);
- if (sActive[iNode] === aLn && sQueues[iNode][aN] === aId)
- sActive[iNode].conn.write(1, 'binary', msg, function(type) {
- if (sActive[iNode] === aLn && sQueues[iNode][aN] === aId)
- sQueues[iNode].timer = setTimeout(_sendNext, 10*1000, iNode);
- });
+ if (sActive[iNode] !== aLn || sQueues[iNode][aN] !== aId)
+ return;
+ if (_isSingle(aId) && sQueues[iNode].tries === 0) {
+ if (msg.length === 0)
+ return process.nextTick(function() { deQueueItem(iNode, _root(aId)) });
+ if (_root(aId)+'_'+sQueues[iNode].uid in sSingleNodeMsg)
+ return sQueues[iNode].timer = setTimeout(_sendNext, msg.length/sBytesPerMs+10, iNode);
+ sSingleNodeMsg[_root(aId)+'_'+sQueues[iNode].uid] = iNode;
+ }
+ ++sQueues[iNode].tries;
+ sActive[iNode].conn.write(1, 'binary', msg, function(type) {
+ if (sActive[iNode] !== aLn || sQueues[iNode][aN] !== aId)
+ return;
+ sQueues[iNode].timer = setTimeout(_sendNext, 10*1000, iNode);
+ });
});
}
function _newQueue(iUid, ioArray, iPrior) {
ioArray.sort();
for (var a=0; a < ioArray.length; ++a)
- sMsgCache.link(ioArray[a]);
+ sMsgCache.link(_root(ioArray[a]));
ioArray.timer = null;
ioArray.tries = 0;
ioArray.next = 0;
@@ -164,7 +178,7 @@ function _deleteQueue(iNode) {
});
for (var a=0; a < sQueues[iNode].length; ++a)
if (sQueues[iNode][a])
- sMsgCache.unlink(sQueues[iNode][a]);
+ sMsgCache.unlink(_root(sQueues[iNode][a]));
if (--sPending[sQueues[iNode].uid].q === 0)
delete sPending[sQueues[iNode].uid];
delete sQueues[iNode];
@@ -207,6 +221,11 @@ function startQueue(iNode, iUid, iQuiet) {
}
function stopQueue(iNode) {
+ var aId = sQueues[iNode].length ? sQueues[iNode][sQueues[iNode].next] : '';
+ if (_isSingle(aId) && sSingleNodeMsg[_root(aId)+'_'+sQueues[iNode].uid] === iNode) {
+ sQueue[iNode].tries = 0;
+ delete sSingleNodeMsg[_root(aId)+'_'+sQueues[iNode].uid];
+ }
if (sQueues[iNode].timer) {
clearTimeout(sQueues[iNode].timer);
sQueues[iNode].timer = null;
@@ -214,18 +233,19 @@ function stopQueue(iNode) {
sQueues[iNode].quiet = sQuiet.append(iNode);
}
-function queueItem(iNode, iId, iCallback) {
- if (!sLock.write(iNode, function(){queueItem(iNode, iId, iCallback)}))
+function queueItem(iNode, iId, iToType, iCallback) {
+ if (!sLock.write(iNode, function(){queueItem(iNode, iId, iToType, iCallback)}))
return;
fs.mkdir(getSub(iNode), 0700, function(errSub) {
if (errSub && errSub.errno !== process.EEXIST) throw errSub;
fs.mkdir(getPath(iNode), 0700, function(errNode) {
if (errNode && errNode.errno !== process.EEXIST) throw errNode;
- fs.link(sTempDir+iId, getPath(iNode)+'/'+iId, function(err) {
+ var aQid = iId + (iToType === 0 ? 'one' : '');
+ fs.link(sTempDir+iId, getPath(iNode)+'/'+aQid, function(err) {
if (err) throw err;
if (sQueues[iNode] && 'tries' in sQueues[iNode]) {
sMsgCache.link(iId);
- sQueues[iNode].push(iId);
+ sQueues[iNode].push(aQid);
if (sQueues[iNode].pending) {
delete sQueues[iNode].pending[iId];
for (var any in sQueues[iNode].pending) break;
@@ -254,9 +274,22 @@ function queueItem(iNode, iId, iCallback) {
}
function deQueueItem(iNode, iId) {
- if (!sQueues[iNode] || !sQueues[iNode].length || sQueues[iNode][sQueues[iNode].next] !== iId)
+ if (!sQueues[iNode] || !sQueues[iNode].length || _root(sQueues[iNode][sQueues[iNode].next]) !== iId)
return;
- fs.unlink(getPath(iNode)+'/'+iId, noop);
+ var aPath = getPath(iNode)+'/'+sQueues[iNode][sQueues[iNode].next];
+ if (_isSingle(sQueues[iNode][sQueues[iNode].next]) && sSingleNodeMsg[iId+'_'+sQueues[iNode].uid] === iNode) {
+ sMsgCache.zero(iId);
+ delete sSingleNodeMsg[iId+'_'+sQueues[iNode].uid];
+ fs.open(aPath, 'w', 0600, function(err, fd) {
+ if (err) throw err;
+ fs.close(fd, function(err) {
+ if (err) throw err;
+ fs.unlink(aPath, noop);
+ });
+ });
+ } else {
+ fs.unlink(aPath, noop);
+ }
sMsgCache.unlink(iId);
sQueues[iNode][sQueues[iNode].next] = null;
sQueues[iNode].tries = 0;
@@ -447,6 +480,13 @@ var sMsgCache = {
process.nextTick(function() { sMsgCache.clean() });
} ,
+ zero: function(iId) {
+ if (!this.cache[iId].msg)
+ return;
+ this.size -= this.cache[iId].msg.length;
+ this.cache[iId].msg = new Buffer(0);
+ } ,
+
link: function(iId) {
if (iId in this.cache)
++this.cache[iId].count;
@@ -650,7 +690,7 @@ Link.prototype = {
return aId +'-'+ ++this.sLastSubId;
this.sLastId = aId;
this.sLastSubId = 1000;
- return aId.toString();
+ return aId+'-0';
} ,
handle_post: function(iReq, iBuf) {
@@ -664,7 +704,7 @@ Link.prototype = {
sRegSvc.listLookup(a, that.uid, aSend);
}
if (aCbCount === 0)
- that._postSend(iReq, iBuf);
+ that._postSend(iReq, iBuf, null);
function aSend(err, list, members) {
if (err) {
@@ -681,7 +721,7 @@ Link.prototype = {
if (aCbErr)
that._ackFail(iReq.id, aCbErr);
else
- that._postSend(iReq, iBuf);
+ that._postSend(iReq, iBuf, null);
}
} ,
@@ -697,7 +737,7 @@ Link.prototype = {
fs.close(fd, noop);
if (err) return that._ackFail(iReq.id, err, iOp);
sMsgCache.put(aId, aMsg);
- var aTo = {}, aToCount = 1;
+ var aTo = {}, aToCount = 1, aAckErr;
for (var aUid in iReq.to) {
++aToCount;
addPending(aUid, aId);
@@ -709,12 +749,12 @@ Link.prototype = {
if (err) throw err;
for (var aN in list)
if (uid in iReq.to || uid+aN !== that.node)
- aTo[uid+aN] = list[aN];
+ aTo[uid+aN] = uid in iReq.to ? iReq.to[uid] : 1;
if (--aToCount > 0)
return;
for (var aN in aTo) {
++aToCount;
- queueItem(aN, aId, fToCb);
+ queueItem(aN, aId, aTo[aN], fToCb);
}
if (aToCount === 0)
fToCb();
Please sign in to comment.
Something went wrong with that request. Please try again.