Permalink
Browse files

Remove puts and hardcoded queue names

  • Loading branch information...
xaviershay committed Nov 28, 2009
1 parent f067b25 commit 8603b5d50117fec0b3ef450da830ad17848f2054
Showing with 41 additions and 20 deletions.
  1. +7 −4 client.js
  2. +6 −3 lib/amqp/index.js
  3. +28 −13 lib/amqp/queue.js
View
@@ -9,10 +9,13 @@ var conn = AMQP.createConnection({
});
conn.addListener("connect", function() {
- sys.puts("Connected to AMQP server");
- var queue = conn.queue('events');
+ var queue = conn.queue('my-events-receiver');
+
+
+ queue.addListener("connect", function() {
+ queue.bind('events');
+ });
queue.addListener("receive", function(content) {
- sys.puts("RECEIVE");
- sys.puts(content);
+ sys.puts("RECV: " + content);
});
});
View
@@ -24,6 +24,7 @@ exports.Connection = function(options) {
sys.inherits(exports.Connection, process.EventEmitter);
var proto = exports.Connection.prototype;
+
proto.init = function(options) {
var self = this;
var opts = {};
@@ -59,7 +60,7 @@ proto.init = function(options) {
});
});
- conn.addListener("message", function(message) {
+ var handshakeListener = function(message) {
if (message.matchMethod(C.Connection.Start)) {
conn.send(Method.serialize(C.Connection.StartOk, C.Channel.All, {
version: '0.0.1',
@@ -78,10 +79,12 @@ proto.init = function(options) {
} else if (message.matchMethod(C.Channel.OpenOk)) {
self.conn = conn;
self.emit('connect');
+ conn.removeListener(handshakeListener);
}
- });
+ }
+ conn.addListener("message", handshakeListener);
}
proto.queue = function(name) {
- return new Queue.Queue(this.conn, {});
+ return new Queue.Queue(this.conn, {name: name});
}
View
@@ -13,25 +13,25 @@ inherits(exports.Queue, process.EventEmitter);
return (message.method[0] == method[0] && message.method[1] == method[1]);
}
var proto = exports.Queue.prototype;
+
proto.init = function(conn, options) {
var self = this;
- conn.addListener("message", function(message) {
- puts(inspect(message));
+ self.conn = conn;
+ self.options = options;
+
+ var declareListener = function(message) {
if (message.matchMethod(C.Queue.DeclareOk)) {
- conn.sendDebug(Method.serialize(C.Queue.Bind, 1, 1, 'events', 'events', 0, {}));
- } else if (message.matchMethod(C.Queue.BindOk)) {
- conn.sendDebug(Method.serialize(C.Basic.Consume, 1, 1, 'events', 'events', 2));
- } else if (message.matchMethod(C.Basic.ConsumeOk)) {
- puts("CONSUMING");
- } else if (message.matchContentHeader()) {
- puts("HEADER");
+ self.emit("connect");
+ self.conn.removeListener(declareListener);
+ }
+ }
+ conn.addListener("message", declareListener);
+ conn.addListener("message", function(message) {
+ if (message.matchContentHeader()) {
self.contentBuffer = '';
self.contentSize = message.contentHeader.contentSize;
} else if (message.matchContent()) {
- puts("CONTENT");
self.contentBuffer += message.content;
- puts(self.contentSize);
- puts(self.contentBuffer.length);
if (self.contentBuffer.length >= self.contentSize) {
self.emit("receive", self.contentBuffer);
@@ -40,5 +40,20 @@ proto.init = function(conn, options) {
}
}
});
- conn.send(Method.serialize(C.Queue.Declare, 1, 1, 'events', 0, {}));
+ conn.send(Method.serialize(C.Queue.Declare, 1, 1, self.options.name, 0, {}));
+}
+
+proto.bind = function(exchange) {
+ var self = this;
+ var bindListener = function(message) {
+ if (message.matchMethod(C.Queue.BindOk)) {
+ self.conn.send(Method.serialize(C.Basic.Consume, 1, 1, self.options.name, 'tag-1', 2));
+ } else if (message.matchMethod(C.Basic.ConsumeOk)) {
+ // Setup new listeners
+ self.emit("bound")
+ self.conn.removeListener(bindListener);
+ }
+ }
+ self.conn.addListener("message", bindListener);
+ self.conn.send(Method.serialize(C.Queue.Bind, 1, 1, self.options.name, exchange, 0, {}));
}

0 comments on commit 8603b5d

Please sign in to comment.