Permalink
Browse files

clean: Link#_postSend() take callback; callers handle ack

  • Loading branch information...
1 parent ae783b6 commit d547e86682ddb7af448a0f80152e397ff0bddc15 @networkimprov committed Mar 30, 2012
Showing with 29 additions and 21 deletions.
  1. +29 −21 mqlib.js
View
50 mqlib.js
@@ -540,6 +540,8 @@ Link.prototype = {
ack: { id:'string', type:'string' }
} ,
+ kQueueOp: { register:'registered', listEdit:'listEdited', ping:'deliver', post:'deliver' } ,
+
handleMessage: function(iMsg) {
try {
@@ -587,15 +589,17 @@ Link.prototype = {
handle_register: function(iReq) {
var that = this;
sRegSvc[this.node ? 'reregister' : 'register'](iReq.userId, iReq.newNode, null, iReq.aliases, function(err, aliases) {
- if (!that.conn)
- return;
if (!that.node || err) {
- that.conn.write(1, 'binary', packMsg({op:'registered', etc:aliases, error:err ? err.message : undefined}));
+ if (that.conn)
+ that.conn.write(1, 'binary', packMsg({op:'registered', etc:aliases, error:err ? err.message : undefined}));
return;
}
var aTo = {};
aTo[that.uid] = 1;
- that._postSend({to:aTo, etc:aliases}, null, 'registered');
+ that._postSend({op:'register', to:aTo, etc:aliases}, null, null, function(err, toErr) {
+ if ((err || toErr) && that.conn)
+ that.conn.write(1, 'binary', packMsg({op:'registered', error: err ? err.message : toErr}));
+ });
});
} ,
@@ -668,17 +672,17 @@ Link.prototype = {
if (iBuf) {
var aTo = {};
aTo[iReq.to] = 3;
- that.handle_post({id:iReq.id, to:aTo, etc:iReq.etc}, iBuf);
+ that.handle_post({op:'post', id:iReq.id, to:aTo, etc:iReq.etc}, iBuf);
} else {
if (that.conn)
that.conn.write(1, 'binary', packMsg({op:'ack', type:'ok', id:iReq.id}));
}
}
} ,
- _ackFail: function(iId, iErr, iOp) {
+ _ack: function(iId, iErr, iToErr) {
if (this.conn)
- this.conn.write(1, 'binary', packMsg(iOp ? {op:iOp, error:iErr.message} : {op:'ack', type:'error', error:iErr.message, id:iId}));
+ this.conn.write(1, 'binary', packMsg({op:'ack', id:iId, type: iErr ? 'error' : 'ok', error: iErr ? iErr.message : iToErr}));
} ,
sLastId: 0,
@@ -704,12 +708,12 @@ Link.prototype = {
if (iReq.to[a] === NaN || iReq.to[a] < 2 || iReq.to[a] > 3)
continue;
++aCbCount;
- sRegSvc.listLookup(a, that.uid, aSend);
+ sRegSvc.listLookup(a, that.uid, fSend);
}
if (aCbCount === 0)
- that._postSend(iReq, iBuf, null);
+ that._postSend(iReq, iBuf, null, fAck);
- function aSend(err, list, members) {
+ function fSend(err, list, members) {
if (err) {
if (!aCbErr) aCbErr = '';
aCbErr += (aCbErr && '\n') + err.message;
@@ -722,24 +726,27 @@ Link.prototype = {
iReq.to[that.uid] = 1;
delete iReq.to[list];
if (--aCbCount === 0)
- that._postSend(iReq, iBuf, null, aCbErr);
+ that._postSend(iReq, iBuf, aCbErr, fAck);
+ }
+ function fAck(err, toErr) {
+ that._ack(iReq.id, err, toErr);
}
} ,
- _postSend: function(iReq, iBuf, iOp, iAckErr) {
+ _postSend: function(iReq, iBuf, iAckErr, iCallback) {
var that = this;
for (var any in iReq.to) break;
if (!any && iReq.noNodes)
- return that._ackFail(iReq.id, {message:'msg lacks recipients'}, iOp);
+ return iCallback({message:'msg lacks recipients'});
var aId = this._makeId();
- var aMsg = packMsg({op:iOp || 'deliver', id:aId, from:that.uid, etc:iReq.etc}, iBuf);
+ var aMsg = packMsg({op:that.kQueueOp[iReq.op], id:aId, from:that.uid, etc:iReq.etc}, iBuf);
fs.open(sTempDir+aId, 'w', 0600, function(err, fd) {
- if (err) return that._ackFail(iReq.id, err, iOp);
+ if (err) return iCallback(err);
writeAll(fd, aMsg, function(err) { // attempt write to temp
- if (err) { fs.close(fd, noop); return that._ackFail(iReq.id, err, iOp); }
+ if (err) { fs.close(fd, noop); return iCallback(err); }
fs.fsync(fd, function(err) {
fs.close(fd, noop);
- if (err) return that._ackFail(iReq.id, err, iOp);
+ if (err) return iCallback(err);
sMsgCache.add(aId, aMsg);
var aTo = {}, aToCount = 0;
for (var aUid in iReq.to) {
@@ -779,9 +786,8 @@ Link.prototype = {
delPending(aUid, aId);
if (!iReq.noNodes && !(that.uid in iReq.to))
delPending(that.uid, aId);
- if (that.conn && !iOp)
- that.conn.write(1, 'binary', packMsg({op:'ack', type:'ok', id:iReq.id, error:iAckErr}));
fs.unlink(sTempDir+aId, noop);
+ iCallback(null, iAckErr);
}
}
});
@@ -793,11 +799,13 @@ Link.prototype = {
var that = this;
sRegSvc.lookup(iReq.alias, function(err, node) {
if (err)
- return that._ackFail(iReq.id, err);
+ return that._ack(iReq.id, err);
delete iReq.alias;
iReq.to = {};
iReq.to[node] = 1;
- that._postSend(iReq, iBuf);
+ that._postSend(iReq, iBuf, null, function(err, toErr) {
+ that._ack(iReq.id, err, toErr);
+ });
});
} ,

0 comments on commit d547e86

Please sign in to comment.