Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
Merge branch 'feat/amqplib' of https://github.com/phucpnt/ascoltatori
Browse files Browse the repository at this point in the history
…into phucpnt-feat/amqplib
  • Loading branch information
mcollina committed Sep 13, 2016
2 parents 6631638 + eb13909 commit 30644f0
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 0 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,22 @@ ascoltatori.build(settings, function (err, ascoltatore) {
});
```

Use with [amqplib](https://www.npmjs.com/package/amqplib)

```javascript
var ascoltatori = require('ascoltatori');
var settings = {
type: 'amqplib',
json: false,
amqp: require('amqplib/callback_api'),
exchange: 'ascolatore5672'
};

ascoltatori.build(settings, function (err, ascoltatore) {
// ...
});
```

### ZeroMQ

```javascript
Expand Down
222 changes: 222 additions & 0 deletions lib/amqplib_ascoltatore.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
"use strict";

var util = require("./util");
var wrap = util.wrap;
var defer = util.defer;
var TrieAscoltatore = require("./trie_ascoltatore");
var AbstractAscoltatore = require('./abstract_ascoltatore');
var steed = require("steed")();
var SubsCounter = require("./subs_counter");
var debug = require("debug")("ascoltatori:amqplib");

/**
* The AMQPAscoltatore is a class that inherits from AbstractAscoltatore.
* It is backed by node-amqp.
* It creates or use an exchange with the given name, using a "topic" topology.
* It creates a single amqp queue for this process, in order to keep
* the overhead low.
*
* It accepts these options:
* - `client`, which is passed through to the amq.createConnection method;
* - `exchange`, the exchange name;
* - `amqp`, the amqp module (it will automatically be required if not present);
*
* @param {Object} opts The options for creating this ascoltatore.
* @api public
*/

function AMQPLibAscoltatore(opts) {
AbstractAscoltatore.call(this, opts, {
separator: '.',
wildcardOne: '*',
wildcardSome: '#'
});

this._opts = opts || {};
this._opts.amqp = this._opts.amqp || require("amqplib/callback_api");
this._ascoltatore = new TrieAscoltatore(opts);

this._subs_counter = new SubsCounter();
this._startConn();
}

/**
* The client connection decends from AbstractAscoltatore.
*
* @api private
*/
AMQPLibAscoltatore.prototype = Object.create(AbstractAscoltatore.prototype);

/**
* Starts a new connection to an AMQP server.
* Do nothing if it is already started.
*
* @api private
*/
AMQPLibAscoltatore.prototype._startConn = function () {
var conn = null,
channel = null,
that = this;

if (this._client_conn === undefined) {

var url = this._opts.url || 'amqp://127.0.0.1:5672';

var socketOptions = this._opts.socketOptions || {};

debug("connecting to " + this._opts.url);

steed.series([
function (callback) {
that._opts.amqp.connect(url, socketOptions, function (err, conn) {
that._client_conn = conn;
conn.on("error", function (error) {
if (typeof error === 'string') {
error = (new Error(error));
}

that.emit("error", error);
});
callback();
});
},

function (callback) {
debug('connected');
that._client_conn.createChannel(function(err, channel){
that._channel = channel;
that._channel.prefetch(42); // magic number?
callback();
});
},

function(callback){
debug('channel created');
that._queue = util.buildIdentifier();
that._channel.assertQueue(that._queue, null, wrap(callback));
},

function (callback){
debug('queue created');
that._channel.assertExchange(that._opts.exchange, 'topic', {}, wrap(callback));
},

function (callback){
debug('exchange existed');
that._channel.consume(that._queue, function(msg){
that._channel.ack(msg);
var topic = that._recvTopic(msg.fields.routingKey);
debug("new message received from queue on topic " + topic);
that._ascoltatore.publish(topic, msg.content.toString());
}, null, wrap(callback));
},

function (callback) {
debug("subscribed to queue");
that.emit("ready");
callback();
}
]);
}
return this._client_conn;
};

AMQPLibAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) {
this._raiseIfClosed();

this._ascoltatore.subscribe(topic, callback);

if (!this._subs_counter.include(topic)) {
debug("binding queue to topic " + topic);

this._channel.bindQueue(this._queue, this._opts.exchange, this._subTopic(topic), {}, function(err, ok){
debug("queue bound to topic " + topic);
defer(done);
});
} else {
defer(done);
}

this._subs_counter.add(topic);

debug("registered new subscriber for topic " + topic);
};

AMQPLibAscoltatore.prototype.publish = function publish(topic, message, done) {
this._raiseIfClosed();

debug("new message published to " + topic);

this._channel.publish(this._opts.exchange, this._pubTopic(topic), new Buffer(String(message)));
defer(done);
};

AMQPLibAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) {
this._raiseIfClosed();
this._subs_counter.remove(topic);

debug("deregistered subscriber for topic " + topic);

this._ascoltatore.unsubscribe(topic, callback);

if (!this._subs_counter.include(topic)) {
this._channel.unbindQueue(this._queue, this._opts.exchange, this._subTopic(topic), {}, function(err, ok) {
debug("queue unbound to topic " + topic);
defer(done);
});
} else {
defer(done);
}

return this;
};

AMQPLibAscoltatore.prototype.close = function close(done) {
var that = this;

if (this._closed) {
wrap(done)();
return;
}

if (this._closing) {
this.on("closed", done);
return;
}

this._closing = true;

if (this._client_conn !== undefined) {
var doClose = function () {
if (that._closed) {
debug("closing twice, one was an error");
return;
}

debug("closed");
defer(done);
that.emit("closed");
};

this._client_conn.on("close", doClose);
this._channel.deleteQueue(this._queue);
this._channel.close();

this._client_conn.close();
this._client_conn.removeAllListeners("error");
this._client_conn.on("error", doClose);

delete this._client_conn;
delete this._channel;
delete this._queue;
}
};

util.aliasAscoltatore(AMQPLibAscoltatore.prototype);

/**
* Exports the AMQPAscoltatore
*
* @api public
*/
module.exports = AMQPLibAscoltatore;
2 changes: 2 additions & 0 deletions lib/ascoltatori.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module.exports.EventEmitter2Ascoltatore = require('./event_emitter2_ascoltatore'
module.exports.RedisAscoltatore = require("./redis_ascoltatore");
module.exports.ZeromqAscoltatore = require("./zeromq_ascoltatore");
module.exports.AMQPAscoltatore = require("./amqp_ascoltatore");
module.exports.AMQPLibAscoltatore = require("./amqplib_ascoltatore");
module.exports.MQTTAscoltatore = require("./mqtt_ascoltatore");
module.exports.PrefixAscoltatore = require("./prefix_acoltatore");
module.exports.MongoAscoltatore = require('./mongo_ascoltatore');
Expand All @@ -28,6 +29,7 @@ module.exports.KafkaAscoltatore = require("./kafka_ascoltatore");
*/
var classes = {
"amqp": module.exports.AMQPAscoltatore,
"amqplib": module.exports.AMQPLibAscoltatore,
"trie": module.exports.TrieAscoltatore,
"eventemitter2": module.exports.EventEmitter2Ascoltatore,
"mqtt": module.exports.MQTTAscoltatore,
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"msgpack-lite": "^0.1.20",
"zmq": "^2.14.0",
"amqp": "~0.2.4",
"amqplib": "~0.4.1",
"mqtt": "^1.10.0",
"mongodb": "^2.1.18",
"kerberos": "~0.0",
Expand Down
31 changes: 31 additions & 0 deletions test/amqplib_ascoltatore_spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
var steed = require('steed')();

describeAscoltatore("AMQPLib", function() {
afterEach(function() {
this.instance.close();
this.instance.on("error", function () {
console.log(arguments);
// we should just close it,
// avoid errors
});
});

it("should sync two instances", function(done) {
var other = new ascoltatori.AMQPLibAscoltatore(this.instance._opts);
var that = this;
steed.series([

function(cb) {
other.on("ready", cb);
},

function(cb) {
that.instance.subscribe("hello", wrap(done), cb);
},

function(cb) {
other.publish("hello", null, cb);
}
]);
});
});
8 changes: 8 additions & 0 deletions test/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ global.AMQPSettings = function() {
};
};

global.AMQPLibSettings = function() {
return {
json: false,
amqp: require("amqplib/callback_api"),
exchange: "ascolatore" + global.nextPort()
};
};

global.MQTTSettings = function() {
return {
json: false,
Expand Down

0 comments on commit 30644f0

Please sign in to comment.