Skip to content

Commit

Permalink
add: single-node message support
Browse files Browse the repository at this point in the history
  • Loading branch information
networkimprov committed Mar 22, 2012
1 parent b023b2d commit 81aff14
Showing 1 changed file with 61 additions and 21 deletions.
82 changes: 61 additions & 21 deletions mqlib.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ var sTempDir;
var sMsgCacheMax = 100000;
var sQuietHoursMax = 28;
var sQuietCleanPeriod = 20*1000;
var sBytesPerMs = 1000/8;

var sQueues = {}; // array objects indexed by nodeid
var sPending = {}; // pending message counts indexed by uid
var sActive = {}; // connections indexed by nodeid
var sSingleNodeMsg = {}; // queues sending a single-node message, indexed by message id
var sShutdown = false;

module.exports.init = function(iMainDir, iRegSvc) {
Expand Down Expand Up @@ -122,28 +124,40 @@ var sLock = {
}
};

function _root(iId) { return iId.slice(-3) === 'one' ? iId.slice(0,-3) : iId }
function _isSingle(iId) { return iId.slice(-3) === 'one' }

function _sendNext(iNode) {
if (!(iNode in sActive) || sQueues[iNode].length === 0)
return;
++sQueues[iNode].tries;
var aN = sQueues[iNode].next;
if (!sQueues[iNode][aN]) sys.debug(sys.inspect(sQueues[iNode])+' queue '+iNode+' n '+aN+' len '+sQueues[iNode].length);
var aLn = sActive[iNode];
var aId = sQueues[iNode][aN];
sMsgCache.get(iNode, aId, function(msg) {
sMsgCache.get(iNode, _root(aId), function(msg) {
if (!msg && sQueues[iNode][aN] === aId) throw new Error('null msg for '+iNode+' '+aId);
if (sActive[iNode] === aLn && sQueues[iNode][aN] === aId)
sActive[iNode].conn.write(1, 'binary', msg, function(type) {
if (sActive[iNode] === aLn && sQueues[iNode][aN] === aId)
sQueues[iNode].timer = setTimeout(_sendNext, 10*1000, iNode);
});
if (sActive[iNode] !== aLn || sQueues[iNode][aN] !== aId)
return;
if (_isSingle(aId) && sQueues[iNode].tries === 0) {
if (msg.length === 0)
return process.nextTick(function() { deQueueItem(iNode, _root(aId)) });
if (_root(aId)+'_'+sQueues[iNode].uid in sSingleNodeMsg)
return sQueues[iNode].timer = setTimeout(_sendNext, msg.length/sBytesPerMs+10, iNode);
sSingleNodeMsg[_root(aId)+'_'+sQueues[iNode].uid] = iNode;
}
++sQueues[iNode].tries;
sActive[iNode].conn.write(1, 'binary', msg, function(type) {
if (sActive[iNode] !== aLn || sQueues[iNode][aN] !== aId)
return;
sQueues[iNode].timer = setTimeout(_sendNext, 10*1000, iNode);
});
});
}

function _newQueue(iUid, ioArray, iPrior) {
ioArray.sort();
for (var a=0; a < ioArray.length; ++a)
sMsgCache.link(ioArray[a]);
sMsgCache.link(_root(ioArray[a]));
ioArray.timer = null;
ioArray.tries = 0;
ioArray.next = 0;
Expand All @@ -164,7 +178,7 @@ function _deleteQueue(iNode) {
});
for (var a=0; a < sQueues[iNode].length; ++a)
if (sQueues[iNode][a])
sMsgCache.unlink(sQueues[iNode][a]);
sMsgCache.unlink(_root(sQueues[iNode][a]));
if (--sPending[sQueues[iNode].uid].q === 0)
delete sPending[sQueues[iNode].uid];
delete sQueues[iNode];
Expand Down Expand Up @@ -207,25 +221,31 @@ function startQueue(iNode, iUid, iQuiet) {
}

function stopQueue(iNode) {
var aId = sQueues[iNode].length ? sQueues[iNode][sQueues[iNode].next] : '';
if (_isSingle(aId) && sSingleNodeMsg[_root(aId)+'_'+sQueues[iNode].uid] === iNode) {
sQueue[iNode].tries = 0;
delete sSingleNodeMsg[_root(aId)+'_'+sQueues[iNode].uid];
}
if (sQueues[iNode].timer) {
clearTimeout(sQueues[iNode].timer);
sQueues[iNode].timer = null;
}
sQueues[iNode].quiet = sQuiet.append(iNode);
}

function queueItem(iNode, iId, iCallback) {
if (!sLock.write(iNode, function(){queueItem(iNode, iId, iCallback)}))
function queueItem(iNode, iId, iToType, iCallback) {
if (!sLock.write(iNode, function(){queueItem(iNode, iId, iToType, iCallback)}))
return;
fs.mkdir(getSub(iNode), 0700, function(errSub) {
if (errSub && errSub.errno !== process.EEXIST) throw errSub;
fs.mkdir(getPath(iNode), 0700, function(errNode) {
if (errNode && errNode.errno !== process.EEXIST) throw errNode;
fs.link(sTempDir+iId, getPath(iNode)+'/'+iId, function(err) {
var aQid = iId + (iToType === 0 ? 'one' : '');
fs.link(sTempDir+iId, getPath(iNode)+'/'+aQid, function(err) {
if (err) throw err;
if (sQueues[iNode] && 'tries' in sQueues[iNode]) {
sMsgCache.link(iId);
sQueues[iNode].push(iId);
sQueues[iNode].push(aQid);
if (sQueues[iNode].pending) {
delete sQueues[iNode].pending[iId];
for (var any in sQueues[iNode].pending) break;
Expand Down Expand Up @@ -254,9 +274,22 @@ function queueItem(iNode, iId, iCallback) {
}

function deQueueItem(iNode, iId) {
if (!sQueues[iNode] || !sQueues[iNode].length || sQueues[iNode][sQueues[iNode].next] !== iId)
if (!sQueues[iNode] || !sQueues[iNode].length || _root(sQueues[iNode][sQueues[iNode].next]) !== iId)
return;
fs.unlink(getPath(iNode)+'/'+iId, noop);
var aPath = getPath(iNode)+'/'+sQueues[iNode][sQueues[iNode].next];
if (_isSingle(sQueues[iNode][sQueues[iNode].next]) && sSingleNodeMsg[iId+'_'+sQueues[iNode].uid] === iNode) {
sMsgCache.zero(iId);
delete sSingleNodeMsg[iId+'_'+sQueues[iNode].uid];
fs.open(aPath, 'w', 0600, function(err, fd) {
if (err) throw err;
fs.close(fd, function(err) {
if (err) throw err;
fs.unlink(aPath, noop);
});
});
} else {
fs.unlink(aPath, noop);
}
sMsgCache.unlink(iId);
sQueues[iNode][sQueues[iNode].next] = null;
sQueues[iNode].tries = 0;
Expand Down Expand Up @@ -447,6 +480,13 @@ var sMsgCache = {
process.nextTick(function() { sMsgCache.clean() });
} ,

zero: function(iId) {
if (!this.cache[iId].msg)
return;
this.size -= this.cache[iId].msg.length;
this.cache[iId].msg = new Buffer(0);
} ,

link: function(iId) {
if (iId in this.cache)
++this.cache[iId].count;
Expand Down Expand Up @@ -650,7 +690,7 @@ Link.prototype = {
return aId +'-'+ ++this.sLastSubId;
this.sLastId = aId;
this.sLastSubId = 1000;
return aId.toString();
return aId+'-0';
} ,

handle_post: function(iReq, iBuf) {
Expand All @@ -664,7 +704,7 @@ Link.prototype = {
sRegSvc.listLookup(a, that.uid, aSend);
}
if (aCbCount === 0)
that._postSend(iReq, iBuf);
that._postSend(iReq, iBuf, null);

function aSend(err, list, members) {
if (err) {
Expand All @@ -681,7 +721,7 @@ Link.prototype = {
if (aCbErr)
that._ackFail(iReq.id, aCbErr);
else
that._postSend(iReq, iBuf);
that._postSend(iReq, iBuf, null);
}
} ,

Expand All @@ -697,7 +737,7 @@ Link.prototype = {
fs.close(fd, noop);
if (err) return that._ackFail(iReq.id, err, iOp);
sMsgCache.put(aId, aMsg);
var aTo = {}, aToCount = 1;
var aTo = {}, aToCount = 1, aAckErr;
for (var aUid in iReq.to) {
++aToCount;
addPending(aUid, aId);
Expand All @@ -709,12 +749,12 @@ Link.prototype = {
if (err) throw err;
for (var aN in list)
if (uid in iReq.to || uid+aN !== that.node)
aTo[uid+aN] = list[aN];
aTo[uid+aN] = uid in iReq.to ? iReq.to[uid] : 1;
if (--aToCount > 0)
return;
for (var aN in aTo) {
++aToCount;
queueItem(aN, aId, fToCb);
queueItem(aN, aId, aTo[aN], fToCb);
}
if (aToCount === 0)
fToCb();
Expand Down

0 comments on commit 81aff14

Please sign in to comment.