Permalink
Browse files

Generate constants from AMQP XML spec

  • Loading branch information...
1 parent 8603b5d commit b25afcbf6dfd2cf9e0627e856908b36a45dcb4f0 @xaviershay committed Nov 28, 2009
Showing with 12,241 additions and 38 deletions.
  1. +1 −0 .gitignore
  2. +9 −3 README
  3. +3 −0 Rakefile
  4. +4 −26 lib/amqp/constants.js
  5. +3 −3 lib/amqp/index.js
  6. +22 −1 lib/amqp/method.js
  7. +3 −3 lib/amqp/queue.js
  8. +5 −2 lib/amqp/serialization.js
  9. +255 −0 tools/qparser.rb
  10. +3,908 −0 tools/xml/amqp-0.8.xml
  11. +2,843 −0 tools/xml/amqp-0.9.1.xml
  12. +5,185 −0 tools/xml/amqp-0.9.xml
View
@@ -0,0 +1 @@
+lib/amqp/constants-generated.js
View
12 README
@@ -1,10 +1,16 @@
NodeJS and RabbitMQ can be best friends
-This is totally not complete at all - you can't even subscribe or publish yet so it's a bit useless currently.
+This is totally not complete at all - you can subscribe to a queue,
+but you can't publish or interact with exchanges yet.
See client.js for an example
+Part of this code is generated from the AMQP spec. Before using,
+you must run:
+
+ rake generate_from_xml
+
Major TODOs:
-- Subscribe to a queue
+- Helper interfaces for receiving messages
- Publish to an exchange
-- Generate constants.js from amqp xml spec
+- Test suite
View
@@ -0,0 +1,3 @@
+task :generate_from_xml do
+ `tools/qparser.rb tools/xml/amqp-0.9.1.xml`
+end
View
@@ -1,27 +1,5 @@
-exports.Connection = {
- Start: [10, 10],
- StartOk: [10, 11, 'table', 'shortstr', 'table', 'shortstr'],
- Tune: [10, 30],
- TuneOk: [10, 31, 'short', 'long', 'short'],
- Open: [10, 40, 'shortstr', 'shortstr', 'octet'], // TODO: Last is actually a bit
- OpenOk: [10, 41]
-}
+process.mixin(exports, require('./constants-generated'))
-exports.Channel = {
- All: 0,
- Open: [20, 10, 'shortstr'],
- OpenOk: [20, 11, 'longstr']
-}
-
-exports.Queue = {
- Declare: [50, 10, 'short', 'shortstr', 'octet', 'table'], // TODO: 3rd is actually 5 bits
- DeclareOk: [50, 11, 'shortstr', 'long', 'long'],
- Bind: [50, 20, 'short', 'shortstr', 'shortstr', 'short', 'table'],
- BindOk: [50, 21]
-}
-
-exports.Basic = {
- Consume: [60, 20, 'short', 'shortstr', 'shortstr', 'octet'],
- ConsumeOk: [60, 21],
- Deliver: [60, 60]
-}
+exports.Channel.All = 0;
+exports.Connection.Open[4] = 'shortstr' // xml-spec says "bit", PDF/rabbit say shortstr
+exports.Basic.Consume.pop() // rabbit doesn't like the table bit on the end
View
@@ -4,6 +4,7 @@ var Frame = require("./frame");
var Method = require("./method");
var Queue = require('./queue');
var C = require('./constants');
+var S11n = require('./serialization');
exports.createConnection = function(opts) {
return new exports.Connection(opts);
@@ -37,7 +38,6 @@ proto.init = function(options) {
var conn = tcp.createConnection(opts.port, opts.host);
conn.sendDebug = function(data) {
- sys.puts("SEND: " + sys.inspect(data));
conn.send(data);
}
conn.addListener("connect", function() {
@@ -68,12 +68,12 @@ proto.init = function(options) {
information: 'no',
product: 'node-amqp' },
'AMQPLAIN',
- {LOGIN: opts.login, PASSWORD: opts.password},
+ S11n.format({LOGIN: opts.login, PASSWORD: opts.password}, 'tableNoHeader'),
'en_US'
));
} else if (message.matchMethod(C.Connection.Tune)) {
conn.send(Method.serialize(C.Connection.TuneOk, C.Channel.All, 0, 131072, 0));
- conn.send(Method.serialize(C.Connection.Open, C.Channel.All, opts.vhost, '', 0));
+ conn.send(Method.serialize(C.Connection.Open, C.Channel.All, opts.vhost, '', ''));
} else if (message.matchMethod(C.Connection.OpenOk)) {
conn.send(Method.serialize(C.Channel.Open, 1, ''));
} else if (message.matchMethod(C.Channel.OpenOk)) {
View
@@ -3,19 +3,40 @@ S11n = require("./serialization");
exports.serialize = function() {
var method = arguments[0];
var channel = arguments[1];
+ var bitBuffer = 0;
+ var bitCount = 0;
+
+
+ var flushBits = function() {
+ if (bitCount > 0) {
+ payload += S11n.format(bitBuffer, 'octet');
+ bitBuffer = 0;
+ bitCount = 0;
+ }
+ }
payload = '';
payload += S11n.format(method[0], 'short');
payload += S11n.format(method[1], 'short');
for(var i = 2; i < arguments.length; i++) {
- payload += S11n.format(arguments[i], method[i]);
+ if (method[i] == 'bit') {
+ bitBuffer += (1 << bitCount) * arguments[i];
+ bitCount += 1
+ if (bitCount == 8)
+ flushBits();
+ } else {
+ flushBits();
+ payload += S11n.format(arguments[i], method[i]);
+ }
}
+ flushBits();
// TODO: Move to Frame.serialize
message = S11n.format(1, 'octet');
message += S11n.format(channel, 'short');
message += S11n.format(payload.length, 'long');
message += payload;
message += String.fromCharCode(206);
+
return message;
}
View
@@ -40,20 +40,20 @@ proto.init = function(conn, options) {
}
}
});
- conn.send(Method.serialize(C.Queue.Declare, 1, 1, self.options.name, 0, {}));
+ conn.send(Method.serialize(C.Queue.Declare, 1, 1, self.options.name, false, false, false, false, false, {}));
}
proto.bind = function(exchange) {
var self = this;
var bindListener = function(message) {
if (message.matchMethod(C.Queue.BindOk)) {
- self.conn.send(Method.serialize(C.Basic.Consume, 1, 1, self.options.name, 'tag-1', 2));
+ self.conn.send(Method.serialize(C.Basic.Consume, 1, 1, self.options.name, 'tag-1', false, true, false, false));
} else if (message.matchMethod(C.Basic.ConsumeOk)) {
// Setup new listeners
self.emit("bound")
self.conn.removeListener(bindListener);
}
}
self.conn.addListener("message", bindListener);
- self.conn.send(Method.serialize(C.Queue.Bind, 1, 1, self.options.name, exchange, 0, {}));
+ self.conn.send(Method.serialize(C.Queue.Bind, 1, 1, self.options.name, exchange, '', false, {}));
}
@@ -32,14 +32,17 @@ exports.format = function(value, f) {
buffer += x;
return buffer;
}
- formatters['table'] = function(x) {
+ formatters['tableNoHeader'] = function(x) {
var payload = '';
for (key in x) {
payload += formatters.shortstr(key);
payload += 'S';
payload += formatters.longstr(x[key]);
}
- return formatters.longstr(payload);
+ return payload;
+ }
+ formatters['table'] = function(x) {
+ return formatters.longstr(formatters.tableNoHeader(x));
}
return formatters[f](value);
Oops, something went wrong.

0 comments on commit b25afcb

Please sign in to comment.