Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

added rabbitmq specific confirm ability to exchanges and emits ack fr…

…om a published task object
  • Loading branch information...
commit 14e25019d55c1664dc95e3ceaf0e5a928ac67a8d 1 parent 76cab75
@barshow barshow authored
Showing with 3,367 additions and 2 deletions.
  1. +3,320 −0 amqp-0-9-1-rabbit.xml
  2. +1 −1  amqp-definitions-0-9-1.js
  3. +37 −1 amqp.js
  4. +9 −0 promise.js
View
3,320 amqp-0-9-1-rabbit.xml
3,320 additions, 0 deletions not shown
View
2  amqp-definitions-0-9-1.js
@@ -1,2 +1,2 @@
exports.constants = [[1,"frameMethod"],[2,"frameHeader"],[3,"frameBody"],[8,"frameHeartbeat"],[200,"replySuccess"],[206,"frameEnd"],[311,"contentTooLarge"],[313,"noConsumers"],[320,"connectionForced"],[402,"invalidPath"],[403,"accessRefused"],[404,"notFound"],[405,"resourceLocked"],[406,"preconditionFailed"],[501,"frameError"],[502,"syntaxError"],[503,"commandInvalid"],[504,"channelError"],[505,"unexpectedFrame"],[506,"resourceError"],[530,"notAllowed"],[540,"notImplemented"],[541,"internalError"],[4096,"frameMinSize"]];
-exports.classes = [{"name":"connection","index":10,"fields":[],"methods":[{"name":"start","index":10,"fields":[{"name":"versionMajor","domain":"octet"},{"name":"versionMinor","domain":"octet"},{"name":"serverProperties","domain":"table"},{"name":"mechanisms","domain":"longstr"},{"name":"locales","domain":"longstr"}]},{"name":"startOk","index":11,"fields":[{"name":"clientProperties","domain":"table"},{"name":"mechanism","domain":"shortstr"},{"name":"response","domain":"longstr"},{"name":"locale","domain":"shortstr"}]},{"name":"secure","index":20,"fields":[{"name":"challenge","domain":"longstr"}]},{"name":"secureOk","index":21,"fields":[{"name":"response","domain":"longstr"}]},{"name":"tune","index":30,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"tuneOk","index":31,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"open","index":40,"fields":[{"name":"virtualHost","domain":"shortstr"},{"name":"reserved1","domain":"shortstr"},{"name":"reserved2","domain":"bit"}]},{"name":"openOk","index":41,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"close","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":51,"fields":[]}]},{"name":"channel","index":20,"fields":[],"methods":[{"name":"open","index":10,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"openOk","index":11,"fields":[{"name":"reserved1","domain":"longstr"}]},{"name":"flow","index":20,"fields":[{"name":"active","domain":"bit"}]},{"name":"flowOk","index":21,"fields":[{"name":"active","domain":"bit"}]},{"name":"close","index":40,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":41,"fields":[]}]},{"name":"exchange","index":40,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"type","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"reserved2","domain":"bit"},{"name":"reserved3","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[]},{"name":"delete","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":21,"fields":[]}]},{"name":"queue","index":50,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[{"name":"queue","domain":"shortstr"},{"name":"messageCount","domain":"long"},{"name":"consumerCount","domain":"long"}]},{"name":"bind","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"bindOk","index":21,"fields":[]},{"name":"unbind","index":50,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"arguments","domain":"table"}]},{"name":"unbindOk","index":51,"fields":[]},{"name":"purge","index":30,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"purgeOk","index":31,"fields":[{"name":"messageCount","domain":"long"}]},{"name":"delete","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"ifEmpty","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":41,"fields":[{"name":"messageCount","domain":"long"}]}]},{"name":"basic","index":60,"fields":[{"name":"contentType","domain":"shortstr"},{"name":"contentEncoding","domain":"shortstr"},{"name":"headers","domain":"table"},{"name":"deliveryMode","domain":"octet"},{"name":"priority","domain":"octet"},{"name":"correlationId","domain":"shortstr"},{"name":"replyTo","domain":"shortstr"},{"name":"expiration","domain":"shortstr"},{"name":"messageId","domain":"shortstr"},{"name":"timestamp","domain":"timestamp"},{"name":"type","domain":"shortstr"},{"name":"userId","domain":"shortstr"},{"name":"appId","domain":"shortstr"},{"name":"reserved","domain":"shortstr"}],"methods":[{"name":"qos","index":10,"fields":[{"name":"prefetchSize","domain":"long"},{"name":"prefetchCount","domain":"short"},{"name":"global","domain":"bit"}]},{"name":"qosOk","index":11,"fields":[]},{"name":"consume","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"consumerTag","domain":"shortstr"},{"name":"noLocal","domain":"bit"},{"name":"noAck","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"consumeOk","index":21,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"cancel","index":30,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"cancelOk","index":31,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"publish","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"mandatory","domain":"bit"},{"name":"immediate","domain":"bit"}]},{"name":"return","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"deliver","index":60,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"get","index":70,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noAck","domain":"bit"}]},{"name":"getOk","index":71,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"messageCount","domain":"long"}]},{"name":"getEmpty","index":72,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"ack","index":80,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"multiple","domain":"bit"}]},{"name":"reject","index":90,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"requeue","domain":"bit"}]},{"name":"recoverAsync","index":100,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recover","index":110,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recoverOk","index":111,"fields":[]}]},{"name":"tx","index":90,"fields":[],"methods":[{"name":"select","index":10,"fields":[]},{"name":"selectOk","index":11,"fields":[]},{"name":"commit","index":20,"fields":[]},{"name":"commitOk","index":21,"fields":[]},{"name":"rollback","index":30,"fields":[]},{"name":"rollbackOk","index":31,"fields":[]}]}];
+exports.classes = [ { "name":"connection", "index":10, "fields":[ ], "methods":[ { "name":"start", "index":10, "fields":[ { "name":"versionMajor", "domain":"octet" }, { "name":"versionMinor", "domain":"octet" }, { "name":"serverProperties", "domain":"table" }, { "name":"mechanisms", "domain":"longstr" }, { "name":"locales", "domain":"longstr" } ] }, { "name":"startOk", "index":11, "fields":[ { "name":"clientProperties", "domain":"table" }, { "name":"mechanism", "domain":"shortstr" }, { "name":"response", "domain":"longstr" }, { "name":"locale", "domain":"shortstr" } ] }, { "name":"secure", "index":20, "fields":[ { "name":"challenge", "domain":"longstr" } ] }, { "name":"secureOk", "index":21, "fields":[ { "name":"response", "domain":"longstr" } ] }, { "name":"tune", "index":30, "fields":[ { "name":"channelMax", "domain":"short" }, { "name":"frameMax", "domain":"long" }, { "name":"heartbeat", "domain":"short" } ] }, { "name":"tuneOk", "index":31, "fields":[ { "name":"channelMax", "domain":"short" }, { "name":"frameMax", "domain":"long" }, { "name":"heartbeat", "domain":"short" } ] }, { "name":"open", "index":40, "fields":[ { "name":"virtualHost", "domain":"shortstr" }, { "name":"reserved1", "domain":"shortstr" }, { "name":"reserved2", "domain":"bit" } ] }, { "name":"openOk", "index":41, "fields":[ { "name":"reserved1", "domain":"shortstr" } ] }, { "name":"close", "index":50, "fields":[ { "name":"replyCode", "domain":"short" }, { "name":"replyText", "domain":"shortstr" }, { "name":"classId", "domain":"short" }, { "name":"methodId", "domain":"short" } ] }, { "name":"closeOk", "index":51, "fields":[ ] } ] }, { "name":"channel", "index":20, "fields":[ ], "methods":[ { "name":"open", "index":10, "fields":[ { "name":"reserved1", "domain":"shortstr" } ] }, { "name":"openOk", "index":11, "fields":[ { "name":"reserved1", "domain":"longstr" } ] }, { "name":"flow", "index":20, "fields":[ { "name":"active", "domain":"bit" } ] }, { "name":"flowOk", "index":21, "fields":[ { "name":"active", "domain":"bit" } ] }, { "name":"close", "index":40, "fields":[ { "name":"replyCode", "domain":"short" }, { "name":"replyText", "domain":"shortstr" }, { "name":"classId", "domain":"short" }, { "name":"methodId", "domain":"short" } ] }, { "name":"closeOk", "index":41, "fields":[ ] } ] }, { "name":"exchange", "index":40, "fields":[ ], "methods":[ { "name":"declare", "index":10, "fields":[ { "name":"reserved1", "domain":"short" }, { "name":"exchange", "domain":"shortstr" }, { "name":"type", "domain":"shortstr" }, { "name":"passive", "domain":"bit" }, { "name":"durable", "domain":"bit" }, { "name":"autoDelete", "domain":"bit" }, { "name":"reserved2", "domain":"bit" }, { "name":"reserved3", "domain":"bit" }, { "name":"noWait", "domain":"bit" }, { "name":"arguments", "domain":"table" } ] }, { "name":"declareOk", "index":11, "fields":[ ] }, { "name":"delete", "index":20, "fields":[ { "name":"reserved1", "domain":"short" }, { "name":"exchange", "domain":"shortstr" }, { "name":"ifUnused", "domain":"bit" }, { "name":"noWait", "domain":"bit" } ] }, { "name":"deleteOk", "index":21, "fields":[ ] } ] }, { "name":"queue", "index":50, "fields":[ ], "methods":[ { "name":"declare", "index":10, "fields":[ { "name":"reserved1", "domain":"short" }, { "name":"queue", "domain":"shortstr" }, { "name":"passive", "domain":"bit" }, { "name":"durable", "domain":"bit" }, { "name":"exclusive", "domain":"bit" }, { "name":"autoDelete", "domain":"bit" }, { "name":"noWait", "domain":"bit" }, { "name":"arguments", "domain":"table" } ] }, { "name":"declareOk", "index":11, "fields":[ { "name":"queue", "domain":"shortstr" }, { "name":"messageCount", "domain":"long" }, { "name":"consumerCount", "domain":"long" } ] }, { "name":"bind", "index":20, "fields":[ { "name":"reserved1", "domain":"short" }, { "name":"queue", "domain":"shortstr" }, { "name":"exchange", "domain":"shortstr" }, { "name":"routingKey", "domain":"shortstr" }, { "name":"noWait", "domain":"bit" }, { "name":"arguments", "domain":"table" } ] }, { "name":"bindOk", "index":21, "fields":[ ] }, { "name":"unbind", "index":50, "fields":[ { "name":"reserved1", "domain":"short" }, { "name":"queue", "domain":"shortstr" }, { "name":"exchange", "domain":"shortstr" }, { "name":"routingKey", "domain":"shortstr" }, { "name":"arguments", "domain":"table" } ] }, { "name":"unbindOk", "index":51, "fields":[ ] }, { "name":"purge", "index":30, "fields":[ { "name":"reserved1", "domain":"short" }, { "name":"queue", "domain":"shortstr" }, { "name":"noWait", "domain":"bit" } ] }, { "name":"purgeOk", "index":31, "fields":[ { "name":"messageCount", "domain":"long" } ] }, { "name":"delete", "index":40, "fields":[ { "name":"reserved1", "domain":"short" }, { "name":"queue", "domain":"shortstr" }, { "name":"ifUnused", "domain":"bit" }, { "name":"ifEmpty", "domain":"bit" }, { "name":"noWait", "domain":"bit" } ] }, { "name":"deleteOk", "index":41, "fields":[ { "name":"messageCount", "domain":"long" } ] } ] }, { "name":"basic", "index":60, "fields":[ { "name":"contentType", "domain":"shortstr" }, { "name":"contentEncoding", "domain":"shortstr" }, { "name":"headers", "domain":"table" }, { "name":"deliveryMode", "domain":"octet" }, { "name":"priority", "domain":"octet" }, { "name":"correlationId", "domain":"shortstr" }, { "name":"replyTo", "domain":"shortstr" }, { "name":"expiration", "domain":"shortstr" }, { "name":"messageId", "domain":"shortstr" }, { "name":"timestamp", "domain":"timestamp" }, { "name":"type", "domain":"shortstr" }, { "name":"userId", "domain":"shortstr" }, { "name":"appId", "domain":"shortstr" }, { "name":"reserved", "domain":"shortstr" } ], "methods":[ { "name":"qos", "index":10, "fields":[ { "name":"prefetchSize", "domain":"long" }, { "name":"prefetchCount", "domain":"short" }, { "name":"global", "domain":"bit" } ] }, { "name":"qosOk", "index":11, "fields":[ ] }, { "name":"consume", "index":20, "fields":[ { "name":"reserved1", "domain":"short" }, { "name":"queue", "domain":"shortstr" }, { "name":"consumerTag", "domain":"shortstr" }, { "name":"noLocal", "domain":"bit" }, { "name":"noAck", "domain":"bit" }, { "name":"exclusive", "domain":"bit" }, { "name":"noWait", "domain":"bit" }, { "name":"arguments", "domain":"table" } ] }, { "name":"consumeOk", "index":21, "fields":[ { "name":"consumerTag", "domain":"shortstr" } ] }, { "name":"cancel", "index":30, "fields":[ { "name":"consumerTag", "domain":"shortstr" }, { "name":"noWait", "domain":"bit" } ] }, { "name":"cancelOk", "index":31, "fields":[ { "name":"consumerTag", "domain":"shortstr" } ] }, { "name":"publish", "index":40, "fields":[ { "name":"reserved1", "domain":"short" }, { "name":"exchange", "domain":"shortstr" }, { "name":"routingKey", "domain":"shortstr" }, { "name":"mandatory", "domain":"bit" }, { "name":"immediate", "domain":"bit" } ] }, { "name":"return", "index":50, "fields":[ { "name":"replyCode", "domain":"short" }, { "name":"replyText", "domain":"shortstr" }, { "name":"exchange", "domain":"shortstr" }, { "name":"routingKey", "domain":"shortstr" } ] }, { "name":"deliver", "index":60, "fields":[ { "name":"consumerTag", "domain":"shortstr" }, { "name":"deliveryTag", "domain":"longlong" }, { "name":"redelivered", "domain":"bit" }, { "name":"exchange", "domain":"shortstr" }, { "name":"routingKey", "domain":"shortstr" } ] }, { "name":"get", "index":70, "fields":[ { "name":"reserved1", "domain":"short" }, { "name":"queue", "domain":"shortstr" }, { "name":"noAck", "domain":"bit" } ] }, { "name":"getOk", "index":71, "fields":[ { "name":"deliveryTag", "domain":"longlong" }, { "name":"redelivered", "domain":"bit" }, { "name":"exchange", "domain":"shortstr" }, { "name":"routingKey", "domain":"shortstr" }, { "name":"messageCount", "domain":"long" } ] }, { "name":"getEmpty", "index":72, "fields":[ { "name":"reserved1", "domain":"shortstr" } ] }, { "name":"ack", "index":80, "fields":[ { "name":"deliveryTag", "domain":"longlong" }, { "name":"multiple", "domain":"bit" } ] }, { "name":"reject", "index":90, "fields":[ { "name":"deliveryTag", "domain":"longlong" }, { "name":"requeue", "domain":"bit" } ] }, { "name":"recoverAsync", "index":100, "fields":[ { "name":"requeue", "domain":"bit" } ] }, { "name":"recover", "index":110, "fields":[ { "name":"requeue", "domain":"bit" } ] }, { "name":"recoverOk", "index":111, "fields":[ ] } ] }, { "name":"tx", "index":90, "fields":[ ], "methods":[ { "name":"select", "index":10, "fields":[ ] }, { "name":"selectOk", "index":11, "fields":[ ] }, { "name":"commit", "index":20, "fields":[ ] }, { "name":"commitOk", "index":21, "fields":[ ] }, { "name":"rollback", "index":30, "fields":[ ] }, { "name":"rollbackOk", "index":31, "fields":[ ] } ] }, { "name":"confirm", "index":85, "fields":[ ], "methods":[ { "name":"select", "index":10, "fields":[ { "name":"noWait", "domain":"bit" } ] }, { "name":"selectOk", "index":11, "fields":[ ] } ] }];
View
38 amqp.js
@@ -1733,6 +1733,11 @@ Queue.prototype._onMethod = function (channel, method, args) {
case methods.basicQosOk:
break;
+ case methods.confirmSelectOk:
+ this._sequence = 1;
+ this.confirm = true;
+ break;
+
case methods.channelClose:
this.state = "closed";
this.connection.queueClosed(this.name);
@@ -1788,6 +1793,9 @@ function Exchange (connection, channel, name, options, openCallback) {
this.binds = 0; // keep track of queues bound
this.options = options || { autoDelete: true};
this._openCallback = openCallback;
+
+ this._sequence = null;
+ this._unAcked = {};
}
util.inherits(Exchange, Channel);
@@ -1835,6 +1843,12 @@ Exchange.prototype._onMethod = function (channel, method, args) {
this._openCallback(this);
this._openCallback = null;
}
+
+ if (this.options.confirm){
+ this.connection._sendMethod(channel, methods.confirmSelect,
+ { noWait: false });
+ }
+
break;
case methods.channelClose:
@@ -1851,6 +1865,20 @@ Exchange.prototype._onMethod = function (channel, method, args) {
this.emit('close');
break;
+ case methods.confirmSelectOk:
+ this._sequence = 1;
+ break;
+
+ case methods.basicAck:
+ this.emit('basic-ack', args);
+
+ if(this._unAcked[args.deliveryTag]){
+ this._unAcked[args.deliveryTag].emitAck()
+ delete this._unAcked[args.deliveryTag]
+ }
+
+ break;
+
case methods.basicReturn:
this.emit('basic-return', args);
break;
@@ -1892,7 +1920,7 @@ Exchange.prototype.publish = function (routingKey, data, options) {
options.immediate = options.immediate ? true : false;
options.reserved1 = 0;
- return this._taskPush(null, function () {
+ task = this._taskPush(null, function () {
self.connection._sendMethod(self.channel, methods.basicPublish, options);
// This interface is probably not appropriate for streaming large files.
// (Of course it's arguable about whether AMQP is the appropriate
@@ -1903,6 +1931,14 @@ Exchange.prototype.publish = function (routingKey, data, options) {
// If you need to stream something large, chunk it yourself.
self.connection._sendBody(self.channel, data, options);
});
+
+ if (self.options.confirm){
+ task.sequence = self._sequence
+ self._unAcked[self._sequence] = task
+ self._sequence++
+ }
+
+ return task
};
// do any necessary cleanups eg. after queue destruction
View
9 promise.js
@@ -5,6 +5,7 @@ exports.Promise = function () {
events.EventEmitter.call(this);
this._blocking = false;
this.hasFired = false;
+ this.hasAcked = false;
this._values = undefined;
};
inherits(exports.Promise, events.EventEmitter);
@@ -49,6 +50,14 @@ exports.Promise.prototype.emitSuccess = function() {
this.emit.apply(this, ['success'].concat(this._values));
};
+exports.Promise.prototype.emitAck = function() {
+ if (this.hasAcked) return;
+ this.hasAcked = 'true';
+
+ this._values = Array.prototype.slice.call(arguments);
+ this.emit.apply(this, ['ack'].concat(this._values));
+};
+
exports.Promise.prototype.emitError = function() {
if (this.hasFired) return;
this.hasFired = 'error';
Please sign in to comment.
Something went wrong with that request. Please try again.