Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Pass options to ascoltatores #55

Merged
merged 4 commits into from
May 22, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
86 changes: 84 additions & 2 deletions lib/abstract_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ var EventEmitter = require('events').EventEmitter;
*/
function AbstractAscoltatore() {
EventEmitter.call(this);

this._set_subscribe();
this._set_publish();

this._ready = false;
this._closed = false;
Expand Down Expand Up @@ -67,14 +70,55 @@ AbstractAscoltatore.prototype._raiseIfClosed = function raiseIfClosed() {
* });
*
* @param {String} topic the topic to subscribe to
* @param {Object} options (optional) Metadata associated with the subscription (e.g. qos). If you only specify 3 parameters to your method then you won't be passed this parameter.
* @param {Function} callback the callback that will be called when a new message is published.
* @param {Function} done the callback that will be called when the subscribe is completed
* @api public
*/
AbstractAscoltatore.prototype.subscribe = function(topic, callback, done) {
AbstractAscoltatore.prototype.subscribe = function(topic, options, callback, done) {
throw new Error("Subclass to implement");
};

AbstractAscoltatore.prototype._set_subscribe = function()
{
var proto = Object.getPrototypeOf(this),
f = proto.subscribe,
call_subscribe,
subscribe;

if (!f) { return; }

if (f.length === 4) {
call_subscribe = f;
} else {
call_subscribe = function (topic, options, callback, done) {
return f.call(this, topic, callback, done);
};
}

subscribe = function (topic, options, callback, done) {
if ((typeof options === 'function') ||
(callback && (typeof callback !== 'function')) ||
(done && (typeof done !== 'function'))) {
done = callback;
callback = options;
options = undefined;
}

return call_subscribe.call(this, topic, options, callback, done);
};

Object.defineProperty(this, "subscribe", {
get: function() { return subscribe; }
});

if (proto.sub === proto.subscribe) {
Object.defineProperty(this, "sub", {
get: function() { return subscribe; }
});
}
};

/**
* This method allow publishing of messages to topics.
*
Expand All @@ -86,13 +130,51 @@ AbstractAscoltatore.prototype.subscribe = function(topic, callback, done) {
*
* @param {String} topic the topic to publish to
* @param {Object} payload the callback that will be called when a new message is published.
* @param {Object} options (optional) Metadata associated with the message (e.g. qos, messageId). If you only specify 3 parameters to your method then you won't be passed this parameter.
* @param {Function} done the callback that will be called after the message has been published.
* @api public
*/
AbstractAscoltatore.prototype.publish = function(topic, payload, done) {
AbstractAscoltatore.prototype.publish = function(topic, payload, options, done) {
throw new Error("Subclass to implement");
};

AbstractAscoltatore.prototype._set_publish = function()
{
var proto = Object.getPrototypeOf(this),
f = proto.publish,
call_publish, publish;

if (!f) { return; }

if (f.length === 4) {
call_publish = f;
} else {
call_publish = function (topic, payload, options, done) {
return f.call(this, topic, payload, done);
};
}

publish = function (topic, payload, options, done) {
if ((typeof options === 'function') ||
(done && (typeof done !== 'function'))) {
done = options;
options = undefined;
}

return call_publish.call(this, topic, payload, options, done);
};

Object.defineProperty(this, "publish", {
get: function() { return publish; }
});

if (proto.pub === proto.publish) {
Object.defineProperty(this, "pub", {
get: function() { return publish; }
});
}
};

/**
* This method provides the inverse of subscribe.
*
Expand Down
19 changes: 11 additions & 8 deletions lib/decorator_ascoltatore.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use strict";

var async = require("async");
var AbstractAscoltatore = require("./abstract_ascoltatore");

/**
* Decorates totally another ascoltatore, doing nothing
Expand All @@ -12,6 +13,8 @@ var async = require("async");
*/
function DecoratorAscoltatore(decorated) {
this._ascoltatore = decorated;
AbstractAscoltatore.prototype._set_subscribe.call(this);
AbstractAscoltatore.prototype._set_publish.call(this);
}

/**
Expand Down Expand Up @@ -61,7 +64,7 @@ DecoratorAscoltatore.prototype.once = function(event, callback) {
this._ascoltatore.once(event, callback);
};

DecoratorAscoltatore.prototype.subscribe = function(topic, callback, done) {
DecoratorAscoltatore.prototype.subscribe = function(topic, options, callback, done) {
var that = this;
async.waterfall([

Expand All @@ -76,7 +79,7 @@ DecoratorAscoltatore.prototype.subscribe = function(topic, callback, done) {
},

function(newTopic, newCallback, cb) {
that._ascoltatore.subscribe(newTopic, newCallback, cb);
that._ascoltatore.subscribe(newTopic, options, newCallback, cb);
}
], done);
};
Expand All @@ -101,7 +104,7 @@ DecoratorAscoltatore.prototype.unsubscribe = function(topic, callback, done) {
], done);
};

DecoratorAscoltatore.prototype.publish = function(topic, payload, done) {
DecoratorAscoltatore.prototype.publish = function(topic, payload, options, done) {
var that = this;
async.waterfall([

Expand All @@ -116,7 +119,7 @@ DecoratorAscoltatore.prototype.publish = function(topic, payload, done) {
},

function(newTopic, newPayload, cb) {
that._ascoltatore.publish(newTopic, newPayload, cb);
that._ascoltatore.publish(newTopic, newPayload, options, cb);
}
], done);
};
Expand All @@ -133,12 +136,12 @@ DecoratorAscoltatore.prototype.unsub = function(topic, callback, done) {
this.unsubscribe(topic, callback, done);
};

DecoratorAscoltatore.prototype.sub = function(topic, callback, done) {
this.subscribe(topic, callback, done);
DecoratorAscoltatore.prototype.sub = function(topic, options, callback, done) {
this.subscribe(topic, options, callback, done);
};

DecoratorAscoltatore.prototype.pub = function(topic, payload, done) {
this.publish(topic, payload, done);
DecoratorAscoltatore.prototype.pub = function(topic, payload, options, done) {
this.publish(topic, payload, options, done);
};

module.exports = DecoratorAscoltatore;
4 changes: 2 additions & 2 deletions lib/memory_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ MemoryAscoltatore.prototype.subscribe = function subscribe(topic, callback, done
defer(done);
};

MemoryAscoltatore.prototype.publish = function publish(topic, message, done) {
MemoryAscoltatore.prototype.publish = function publish(topic, message, options, done) {
this._raiseIfClosed();
debug("new message published to " + topic);

Expand All @@ -75,7 +75,7 @@ MemoryAscoltatore.prototype.publish = function publish(topic, message, done) {
this.emit("newTopic", topic);
debug("new topic: " + topic);
}
this._event.emit(topic, topic, message);
this._event.emit(topic, topic, message, options);

defer(done);
};
Expand Down
16 changes: 9 additions & 7 deletions lib/mqtt_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ function MQTTAscoltatore(opts) {

this._opts = opts || {};
this._opts.keepalive = this._opts.keepalive || 3000;
this._opts.qos = this._opts.qos || 0;
this._opts.mqtt = this._opts.mqtt || require("mqtt");

this._subs_counter = new SubsCounter();
Expand Down Expand Up @@ -54,7 +55,8 @@ MQTTAscoltatore.prototype._startConn = function() {
if (this._client === undefined) {
settings = {
keepalive: that._opts.keepalive,
clientId: util.buildIdentifier()
clientId: util.buildIdentifier(),
qos: that._opts.qos
};

debug("connecting..");
Expand All @@ -64,12 +66,12 @@ MQTTAscoltatore.prototype._startConn = function() {
debug("connected");
that.emit("ready");
});
this._client.on("message", function(topic, payload) {
this._client.on("message", function(topic, payload, packet) {
debug("received new packet on topic " + topic);
// we need to skip out this callback, so we do not
// break the client when an exception occurs
util.defer(function() {
that._ascoltatore.publish(topic, payload);
that._ascoltatore.publish(topic, payload, packet);
});
});
this._client.on('error', function(e) {
Expand All @@ -82,14 +84,14 @@ MQTTAscoltatore.prototype._startConn = function() {
return this._client;
};

MQTTAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) {
MQTTAscoltatore.prototype.subscribe = function subscribe(topic, options, callback, done) {
this._raiseIfClosed();

if (!this._subs_counter.include(topic)) {
debug("registering new subscriber for topic " + topic);

var opts = {
qos: 1
qos: (options && (options.qos !== undefined)) ? options.qos : 1
};
this._client.subscribe(topic.replace("*", "#"), opts, function() {
debug("registered new subscriber for topic " + topic);
Expand All @@ -103,11 +105,11 @@ MQTTAscoltatore.prototype.subscribe = function subscribe(topic, callback, done)
this._ascoltatore.subscribe(topic, callback);
};

MQTTAscoltatore.prototype.publish = function publish(topic, message, done) {
MQTTAscoltatore.prototype.publish = function publish(topic, message, options, done) {
this._raiseIfClosed();

this._client.publish(topic, String(message), {
qos: 1
qos: (options && (options.qos !== undefined)) ? options.qos : 1
}, function() {
debug("new message published to " + topic);
wrap(done)();
Expand Down
11 changes: 9 additions & 2 deletions test/ascoltatori_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@ describe("ascoltatori", function() {
expect(ascoltatori.use(ascoltatore)).to.have.be.equal(ascoltatori);
});

var make_fake_ascoltatore = function() {
var r = {};
r.publish = r.subscribe = r.unsubscribe = r.close = r.on =
r.removeListener = r.registerDomain = function () {};
return r;
};

it("should delegate to the use ascoltatore for 'pub'", function() {
var ascoltatore = new ascoltatori.MemoryAscoltatore();
var ascoltatore = make_fake_ascoltatore();
var spy = this.sandbox.spy(ascoltatore, "publish");
ascoltatori.use(ascoltatore);
ascoltatori.publish("hello");
expect(spy).to.have.been.calledWith("hello");
});

it("should delegate to _global for 'subscribe'", function() {
var ascoltatore = new ascoltatori.MemoryAscoltatore();
var ascoltatore = make_fake_ascoltatore();
var spy = this.sandbox.spy(ascoltatore, "subscribe");
var func = function(argument) {};
ascoltatori.use(ascoltatore);
Expand Down
12 changes: 12 additions & 0 deletions test/memory_ascoltatore_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,16 @@ describe("ascoltatori.MemoryAscoltatore", function() {
this.instance = new ascoltatori.MemoryAscoltatore();
this.instance.on("ready", done);
});

it("should publish with options", function(done) {
var that = this;
that.instance.subscribe("hello/*", function(topic, value, options) {
expect(value).to.equal("42");
expect(options.qos).to.equal(1);
expect(options.messageId).to.equal(5);
done();
}, function() {
that.instance.publish("hello/123", "42", { qos: 1, messageId: 5 });
});
});
});