diff --git a/README.md b/README.md index 0392381..7fbebf7 100644 --- a/README.md +++ b/README.md @@ -229,6 +229,22 @@ ascoltatori.build(settings, function (err, ascoltatore) { }); ``` +Use with [amqplib](https://www.npmjs.com/package/amqplib) + +```javascript +var ascoltatori = require('ascoltatori'); +var settings = { + type: 'amqplib', + json: false, + amqp: require('amqplib/callback_api'), + exchange: 'ascolatore5672' +}; + +ascoltatori.build(settings, function (err, ascoltatore) { + // ... +}); +``` + ### ZeroMQ ```javascript diff --git a/lib/amqplib_ascoltatore.js b/lib/amqplib_ascoltatore.js new file mode 100644 index 0000000..664cc6b --- /dev/null +++ b/lib/amqplib_ascoltatore.js @@ -0,0 +1,222 @@ +"use strict"; + +var util = require("./util"); +var wrap = util.wrap; +var defer = util.defer; +var TrieAscoltatore = require("./trie_ascoltatore"); +var AbstractAscoltatore = require('./abstract_ascoltatore'); +var steed = require("steed")(); +var SubsCounter = require("./subs_counter"); +var debug = require("debug")("ascoltatori:amqplib"); + +/** + * The AMQPAscoltatore is a class that inherits from AbstractAscoltatore. + * It is backed by node-amqp. + * It creates or use an exchange with the given name, using a "topic" topology. + * It creates a single amqp queue for this process, in order to keep + * the overhead low. + * + * It accepts these options: + * - `client`, which is passed through to the amq.createConnection method; + * - `exchange`, the exchange name; + * - `amqp`, the amqp module (it will automatically be required if not present); + * + * @param {Object} opts The options for creating this ascoltatore. + * @api public + */ + +function AMQPLibAscoltatore(opts) { + AbstractAscoltatore.call(this, opts, { + separator: '.', + wildcardOne: '*', + wildcardSome: '#' + }); + + this._opts = opts || {}; + this._opts.amqp = this._opts.amqp || require("amqplib/callback_api"); + this._ascoltatore = new TrieAscoltatore(opts); + + this._subs_counter = new SubsCounter(); + this._startConn(); +} + +/** + * The client connection decends from AbstractAscoltatore. + * + * @api private + */ +AMQPLibAscoltatore.prototype = Object.create(AbstractAscoltatore.prototype); + +/** + * Starts a new connection to an AMQP server. + * Do nothing if it is already started. + * + * @api private + */ +AMQPLibAscoltatore.prototype._startConn = function () { + var conn = null, + channel = null, + that = this; + + if (this._client_conn === undefined) { + + var url = this._opts.url || 'amqp://127.0.0.1:5672'; + + var socketOptions = this._opts.socketOptions || {}; + + debug("connecting to " + this._opts.url); + + steed.series([ + function (callback) { + that._opts.amqp.connect(url, socketOptions, function (err, conn) { + that._client_conn = conn; + conn.on("error", function (error) { + if (typeof error === 'string') { + error = (new Error(error)); + } + + that.emit("error", error); + }); + callback(); + }); + }, + + function (callback) { + debug('connected'); + that._client_conn.createChannel(function(err, channel){ + that._channel = channel; + that._channel.prefetch(42); // magic number? + callback(); + }); + }, + + function(callback){ + debug('channel created'); + that._queue = util.buildIdentifier(); + that._channel.assertQueue(that._queue, null, wrap(callback)); + }, + + function (callback){ + debug('queue created'); + that._channel.assertExchange(that._opts.exchange, 'topic', {}, wrap(callback)); + }, + + function (callback){ + debug('exchange existed'); + that._channel.consume(that._queue, function(msg){ + that._channel.ack(msg); + var topic = that._recvTopic(msg.fields.routingKey); + debug("new message received from queue on topic " + topic); + that._ascoltatore.publish(topic, msg.content.toString()); + }, null, wrap(callback)); + }, + + function (callback) { + debug("subscribed to queue"); + that.emit("ready"); + callback(); + } + ]); + } + return this._client_conn; +}; + +AMQPLibAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) { + this._raiseIfClosed(); + + this._ascoltatore.subscribe(topic, callback); + + if (!this._subs_counter.include(topic)) { + debug("binding queue to topic " + topic); + + this._channel.bindQueue(this._queue, this._opts.exchange, this._subTopic(topic), {}, function(err, ok){ + debug("queue bound to topic " + topic); + defer(done); + }); + } else { + defer(done); + } + + this._subs_counter.add(topic); + + debug("registered new subscriber for topic " + topic); +}; + +AMQPLibAscoltatore.prototype.publish = function publish(topic, message, done) { + this._raiseIfClosed(); + + debug("new message published to " + topic); + + this._channel.publish(this._opts.exchange, this._pubTopic(topic), new Buffer(String(message))); + defer(done); +}; + +AMQPLibAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) { + this._raiseIfClosed(); + this._subs_counter.remove(topic); + + debug("deregistered subscriber for topic " + topic); + + this._ascoltatore.unsubscribe(topic, callback); + + if (!this._subs_counter.include(topic)) { + this._channel.unbindQueue(this._queue, this._opts.exchange, this._subTopic(topic), {}, function(err, ok) { + debug("queue unbound to topic " + topic); + defer(done); + }); + } else { + defer(done); + } + + return this; +}; + +AMQPLibAscoltatore.prototype.close = function close(done) { + var that = this; + + if (this._closed) { + wrap(done)(); + return; + } + + if (this._closing) { + this.on("closed", done); + return; + } + + this._closing = true; + + if (this._client_conn !== undefined) { + var doClose = function () { + if (that._closed) { + debug("closing twice, one was an error"); + return; + } + + debug("closed"); + defer(done); + that.emit("closed"); + }; + + this._client_conn.on("close", doClose); + this._channel.deleteQueue(this._queue); + this._channel.close(); + + this._client_conn.close(); + this._client_conn.removeAllListeners("error"); + this._client_conn.on("error", doClose); + + delete this._client_conn; + delete this._channel; + delete this._queue; + } +}; + +util.aliasAscoltatore(AMQPLibAscoltatore.prototype); + +/** + * Exports the AMQPAscoltatore + * + * @api public + */ +module.exports = AMQPLibAscoltatore; diff --git a/lib/ascoltatori.js b/lib/ascoltatori.js index 68edede..0e4df04 100644 --- a/lib/ascoltatori.js +++ b/lib/ascoltatori.js @@ -14,6 +14,7 @@ module.exports.EventEmitter2Ascoltatore = require('./event_emitter2_ascoltatore' module.exports.RedisAscoltatore = require("./redis_ascoltatore"); module.exports.ZeromqAscoltatore = require("./zeromq_ascoltatore"); module.exports.AMQPAscoltatore = require("./amqp_ascoltatore"); +module.exports.AMQPLibAscoltatore = require("./amqplib_ascoltatore"); module.exports.MQTTAscoltatore = require("./mqtt_ascoltatore"); module.exports.PrefixAscoltatore = require("./prefix_acoltatore"); module.exports.MongoAscoltatore = require('./mongo_ascoltatore'); @@ -28,6 +29,7 @@ module.exports.KafkaAscoltatore = require("./kafka_ascoltatore"); */ var classes = { "amqp": module.exports.AMQPAscoltatore, + "amqplib": module.exports.AMQPLibAscoltatore, "trie": module.exports.TrieAscoltatore, "eventemitter2": module.exports.EventEmitter2Ascoltatore, "mqtt": module.exports.MQTTAscoltatore, diff --git a/package.json b/package.json index 103271a..a6ea22d 100644 --- a/package.json +++ b/package.json @@ -69,6 +69,7 @@ "msgpack-lite": "^0.1.20", "zmq": "^2.14.0", "amqp": "~0.2.4", + "amqplib": "~0.4.1", "mqtt": "^1.10.0", "mongodb": "^2.1.18", "kerberos": "~0.0", diff --git a/test/amqplib_ascoltatore_spec.js b/test/amqplib_ascoltatore_spec.js new file mode 100644 index 0000000..1a14356 --- /dev/null +++ b/test/amqplib_ascoltatore_spec.js @@ -0,0 +1,31 @@ +var steed = require('steed')(); + +describeAscoltatore("AMQPLib", function() { + afterEach(function() { + this.instance.close(); + this.instance.on("error", function () { + console.log(arguments); + // we should just close it, + // avoid errors + }); + }); + + it("should sync two instances", function(done) { + var other = new ascoltatori.AMQPLibAscoltatore(this.instance._opts); + var that = this; + steed.series([ + + function(cb) { + other.on("ready", cb); + }, + + function(cb) { + that.instance.subscribe("hello", wrap(done), cb); + }, + + function(cb) { + other.publish("hello", null, cb); + } + ]); + }); +}); diff --git a/test/common.js b/test/common.js index 7574eae..3f4ac72 100644 --- a/test/common.js +++ b/test/common.js @@ -47,6 +47,14 @@ global.AMQPSettings = function() { }; }; +global.AMQPLibSettings = function() { + return { + json: false, + amqp: require("amqplib/callback_api"), + exchange: "ascolatore" + global.nextPort() + }; +}; + global.MQTTSettings = function() { return { json: false,