Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Use '+' as single level wildcard to make it consistent with mqtt ascoltatore #62

Merged
merged 17 commits into from Jun 28, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -10,6 +10,8 @@ before_install:
- sudo make install
- sudo /sbin/ldconfig
- cd ..
before_script:
- (cd node_modules/mosca/node_modules && rm -rf ascoltatori && ln -s ../../.. ascoltatori)
language: node_js
node_js:
- 0.8
Expand Down
8 changes: 4 additions & 4 deletions lib/amqp_ascoltatore.js
Expand Up @@ -96,7 +96,7 @@ AMQPAscoltatore.prototype._startConn = function() {
}, function(message, headers, deliveryInfo) {
that._queue.shift();

var topic = deliveryInfo.routingKey.replace(".", "/");
var topic = deliveryInfo.routingKey.replace(/\./g, "/");
debug("new message received from queue on topic " + topic);

that._ascoltatore.publish(topic, message.data.toString());
Expand Down Expand Up @@ -134,7 +134,7 @@ AMQPAscoltatore.prototype.subscribe = function subscribe(topic, callback, done)
}, 5);
});

this._queue.bind(this._exchange, topic.replace("/", ".").replace("*", "#"));
this._queue.bind(this._exchange, topic.replace(/\//g, ".").replace(/\*/g, "#").replace(/\+/g, '*'));
} else {
util.defer(done);
}
Expand All @@ -149,7 +149,7 @@ AMQPAscoltatore.prototype.publish = function publish(topic, message, done) {

debug("new message published to " + topic);

this._exchange.publish(topic.replace("/", "."), String(message));
this._exchange.publish(topic.replace(/\//g, "."), String(message));
util.defer(done);
};

Expand All @@ -164,7 +164,7 @@ AMQPAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do
debug("queue unbound to topic " + topic);
util.defer(done);
});
this._queue.unbind(this._exchange, topic.replace("/", ".").replace("*", "#"));
this._queue.unbind(this._exchange, topic.replace(/\//g, ".").replace(/\*/g, "#").replace(/\+/g, '*'));
} else {
util.defer(done);
}
Expand Down
6 changes: 3 additions & 3 deletions lib/ascoltatori.js
Expand Up @@ -66,7 +66,7 @@ module.exports.build = function build(opts, done) {
result = null;

Klass = (typeof opts.type === 'function') ? opts.type :
(classes[opts.type] || module.exports.MemoryAscoltatore);
(classes[opts.type] || module.exports.TrieAscoltatore);

result = new Klass(opts, module.exports);

Expand Down Expand Up @@ -112,11 +112,11 @@ module.exports.use = function use(ascoltatore) {
};

/**
* The default global Ascoltatore is a MemoryAscoltatore.
* The default global Ascoltatore is a TrieAscoltatore.
*
* @api public
*/
module.exports.use(new module.exports.MemoryAscoltatore());
module.exports.use(new module.exports.TrieAscoltatore());

/**
* These are just utilities
Expand Down
80 changes: 79 additions & 1 deletion lib/behave_like_an_ascoltatore.js
Expand Up @@ -71,13 +71,91 @@ module.exports = function() {
this.instance.pub("hello", "world", done);
});

it("should support wildcards", function(done) {
it("should support multi-level wildcard at start of topic", function(done) {
var that = this;
that.instance.sub("*/hello", wrap(done), function() {
that.instance.pub("42/there/hello");
});
});

it("should support multi-level wildcard in middle of topic", function(done) {
var that = this;
that.instance.sub("hello/*/end", wrap(done), function() {
that.instance.pub("hello/there/42/end");
});
});

it("should support multi-level wildcard at end of topic", function(done) {
var that = this;
that.instance.sub("hello/*", wrap(done), function() {
that.instance.pub("hello/there/42");
});
});

it("should support single-level wildcard at start of topic", function(done) {
var that = this;
that.instance.sub("+/hello", wrap(done), function() {
that.instance.pub("42/hello");
});
});

it("should support single-level wildcard in middle of topic", function(done) {
var that = this;
that.instance.sub("hello/+/end", wrap(done), function() {
that.instance.pub("hello/42/end");
});
});

it("should support single-level wildcard at end of topic", function(done) {
var that = this;
that.instance.sub("hello/+", wrap(done), function() {
that.instance.pub("hello/42");
});
});

it("should support both wildcards in topic", function(done) {
var that = this;
that.instance.sub("hello/+/there/*/end", wrap(done), function() {
that.instance.pub("hello/foo/there/bar/42/end");
});
});

it("should not match multiple levels with single wildcard", function(done) {
var that = this,
callback = null;

callback = function(topic) {
expect(topic).to.equal("hello/42/there");
done();
};

that.instance.sub("hello/+/there", callback, function () {
that.instance.pub("hello/42/43/there");
that.instance.pub("hello/42/there");
});
});

it("should unsubscribe from wildcard topics independently", function(done) {
var that = this,
callback1 = null,
callback2 = null;

callback1 = function(topic) {
expect(topic).to.equal("hello/42/there");
done();
};

callback2 = function () { };

that.instance.sub("hello/*/there", callback2, function () {
that.instance.sub("hello/+/there", callback1, function () {
that.instance.unsub("hello/*/there", callback2, function () {
that.instance.pub("hello/42/there");
});
});
});
});

it("should call each matching callback", function(done) {
var that = this,
callback = null,
Expand Down
6 changes: 3 additions & 3 deletions lib/memory_ascoltatore.js
Expand Up @@ -35,14 +35,14 @@ function MemoryAscoltatore() {
MemoryAscoltatore.prototype = Object.create(AbstractAscoltatore.prototype);

function containsWildcard(topic) {
return topic.indexOf("*") >= 0;
return (topic.indexOf("*") >= 0) || (topic.indexOf("+") >= 0);
}

MemoryAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) {
this._raiseIfClosed();
debug("registered new subscriber for topic " + topic);
if (containsWildcard(topic)) {
var regexp = new RegExp(topic.replace("*", ".+")),
var regexp = new RegExp(topic.replace(/\*/g, ".*?").replace(/\+/g, "[^/]+?")),
that = this,
handler = null;

Expand Down Expand Up @@ -89,7 +89,7 @@ MemoryAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback,
regexp = null;
if (containsWildcard(topic)) {
this.removeListener("newTopic", callback._ascoltatori_global_handler);
regexp = new RegExp(topic.replace("*", ".+"));
regexp = new RegExp(topic.replace(/\*/g, ".*?").replace(/\+/g, "[^/]+?"));
this._set.forEach(function(e) {
if (e.match(regexp)) {
that.unsub(e, callback);
Expand Down
4 changes: 2 additions & 2 deletions lib/mqtt_ascoltatore.js
Expand Up @@ -93,7 +93,7 @@ MQTTAscoltatore.prototype.subscribe = function subscribe(topic, callback, done)
var opts = {
qos: 1
};
this._client.subscribe(topic.replace("*", "#"), opts, function() {
this._client.subscribe(topic.replace(/\*/g, "#"), opts, function() {
debug("registered new subscriber for topic " + topic);
util.defer(done);
});
Expand Down Expand Up @@ -135,7 +135,7 @@ MQTTAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do
}

debug("deregistering subscriber for topic " + topic);
this._client.unsubscribe(topic.replace("*", "#"), newDone);
this._client.unsubscribe(topic.replace(/\*/g, "#"), newDone);
};

MQTTAscoltatore.prototype.close = function close(done) {
Expand Down
31 changes: 21 additions & 10 deletions lib/redis_ascoltatore.js
Expand Up @@ -134,7 +134,7 @@ RedisAscoltatore.prototype._updateReady = function updateReady(key) {
};

function containsWildcard(topic) {
return topic.indexOf("*") >= 0;
return (topic.indexOf("*") >= 0) || (topic.indexOf("+") >= 0);
}

RedisAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) {
Expand All @@ -145,13 +145,17 @@ RedisAscoltatore.prototype.subscribe = function subscribe(topic, callback, done)
util.defer(done);
};

var subTopic = topic;

if (containsWildcard(topic)) {
this._sub.psubscribe(topic, newDone);
subTopic = topic.replace(/\+/g, "*");
this._sub.psubscribe(subTopic, newDone);
} else {
this._sub.subscribe(topic, newDone);
this._sub.subscribe(subTopic, newDone);
}

this._subs_counter.add(topic);
this._subs_counter.add(subTopic);

this._ascoltatore.subscribe(topic, callback);
};

Expand All @@ -170,24 +174,31 @@ RedisAscoltatore.prototype.publish = function publish(topic, message, done) {

RedisAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) {
this._raiseIfClosed();
this._subs_counter.remove(topic);

var isWildcard = containsWildcard(topic),
subTopic = topic;

if (isWildcard) {
subTopic = topic.replace(/\+/g, "*");
}

this._subs_counter.remove(subTopic);
this._ascoltatore.unsubscribe(topic, callback);

var newDone = function() {
debug("deregistered subscriber for topic " + topic);
util.defer(done);
};


if (this._subs_counter.include(topic)) {
if (this._subs_counter.include(subTopic)) {
newDone();
return this;
}

if (containsWildcard(topic)) {
this._sub.punsubscribe(topic, newDone);
if (isWildcard) {
this._sub.punsubscribe(subTopic, newDone);
} else {
this._sub.unsubscribe(topic, newDone);
this._sub.unsubscribe(subTopic, newDone);
}

return this;
Expand Down
2 changes: 1 addition & 1 deletion lib/trie_ascoltatore.js
Expand Up @@ -17,7 +17,7 @@ function TrieAscoltatore(settings) {

this._matcher = new Qlobber({
separator: '/',
wildcard_one: '?',
wildcard_one: '+',
wildcard_some: '*'
});

Expand Down
2 changes: 1 addition & 1 deletion test/ascoltatori_spec.js
Expand Up @@ -114,7 +114,7 @@ describe("ascoltatori", function() {
json: false
});
toClose.push(a);
expect(a).to.be.instanceOf(ascoltatori.MemoryAscoltatore);
expect(a).to.be.instanceOf(ascoltatori.TrieAscoltatore);
});

it("should create a new AbstractAscoltatore using function", function() {
Expand Down