Skip to content

Commit

Permalink
fix: sMsgCache.add(), call add() & unlink() from Link#_postSend()
Browse files Browse the repository at this point in the history
  • Loading branch information
networkimprov committed Mar 27, 2012
1 parent 59d66d9 commit ae783b6
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions mqlib.js
Expand Up @@ -457,7 +457,7 @@ var sMsgCache = {
for (var a in aWait)
return aRead(a);
} else {
sMsgCache.put(iId, data);
sMsgCache._put(iId, data);
for (var a in aWait)
aWait[a](data);
}
Expand All @@ -467,13 +467,16 @@ var sMsgCache = {
aRead(iNode);
} ,

put: function(iId, iMsg) {
add: function(iId, iMsg) {
if (iId in this.cache) throw new Error('item already in cache');
this.cache[iId] = { count:0, msg:null };
this._put(iId, iMsg);
} ,

_put: function(iId, iMsg) {
if (iMsg.length > sMsgCacheMax/10)
return;
if (iId in this.cache)
this.cache[iId].msg = iMsg;
else
this.cache[iId] = { count:0, msg:iMsg };
this.cache[iId].msg = iMsg;
this.list.append(this.cache[iId]);
this.size += iMsg.length;
if (this.size > sMsgCacheMax)
Expand Down Expand Up @@ -737,7 +740,7 @@ Link.prototype = {
fs.fsync(fd, function(err) {
fs.close(fd, noop);
if (err) return that._ackFail(iReq.id, err, iOp);
sMsgCache.put(aId, aMsg);
sMsgCache.add(aId, aMsg);
var aTo = {}, aToCount = 0;
for (var aUid in iReq.to) {
++aToCount;
Expand Down Expand Up @@ -765,8 +768,10 @@ Link.prototype = {
++aToCount;
queueItem(aN, aId, aTo[aN], fToCb);
}
if (aToCount === 0)
if (aToCount === 0) {
sMsgCache.unlink(aId);
fToCb();
}
function fToCb() {
if (--aToCount > 0)
return;
Expand Down

0 comments on commit ae783b6

Please sign in to comment.