From 196206cde487fdc9d2afba42f9f456270e7ff155 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 23 May 2016 22:54:26 +0200 Subject: [PATCH 1/4] use steed --- lib/amqp_ascoltatore.js | 18 +++++++++--------- lib/ascoltatori.js | 2 +- lib/behave_like_an_ascoltatore.js | 22 +++++++++++----------- lib/decorator_ascoltatore.js | 8 ++++---- lib/event_emitter2_ascoltatore.js | 9 ++++----- lib/kafka_ascoltatore.js | 16 +++++++--------- lib/mongo_ascoltatore.js | 4 ++-- lib/mqtt_ascoltatore.js | 20 ++++++++++---------- lib/redis_ascoltatore.js | 10 +++++----- lib/trie_ascoltatore.js | 9 ++++----- lib/util.js | 16 +--------------- lib/zeromq_ascoltatore.js | 14 +++++++------- package.json | 4 ++-- test/amqp_ascoltatore_spec.js | 4 +++- test/common.js | 1 - test/kafka_ascoltatore_spec.js | 3 ++- test/mongo_ascoltatore_spec.js | 4 ++-- test/mqtt_ascoltatore_spec.js | 29 +++++++++++++++-------------- test/redis_ascoltatore_spec.js | 3 ++- test/zeromq_ascoltatore_spec.js | 10 ++++++---- 20 files changed, 97 insertions(+), 109 deletions(-) diff --git a/lib/amqp_ascoltatore.js b/lib/amqp_ascoltatore.js index 3f999b7..7465539 100644 --- a/lib/amqp_ascoltatore.js +++ b/lib/amqp_ascoltatore.js @@ -4,7 +4,7 @@ var util = require("./util"); var wrap = util.wrap; var TrieAscoltatore = require("./trie_ascoltatore"); var AbstractAscoltatore = require('./abstract_ascoltatore'); -var async = require("async"); +var steed = require("steed")(); var SubsCounter = require("./subs_counter"); var debug = require("debug")("ascoltatori:amqp"); @@ -71,7 +71,7 @@ AMQPAscoltatore.prototype._startConn = function() { debug("connecting to " + this._opts.client); - async.series([ + steed.series([ function(callback) { that._client_conn.once("ready", wrap(callback)); @@ -106,7 +106,7 @@ AMQPAscoltatore.prototype._startConn = function() { that._ascoltatore.publish(topic, message.data.toString()); }); that._queue.once("basicConsumeOk", function() { - util.defer(callback); + setImmediate(callback); }); }, @@ -134,13 +134,13 @@ AMQPAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) // as advertised setTimeout(function() { debug("queue bound to topic " + topic); - util.defer(done); + setImmediate(done); }, 5); }); this._queue.bind(this._exchange, this._subTopic(topic)); } else { - util.defer(done); + setImmediate(done); } this._subs_counter.add(topic); @@ -154,7 +154,7 @@ AMQPAscoltatore.prototype.publish = function publish(topic, message, done) { debug("new message published to " + topic); this._exchange.publish(this._pubTopic(topic), String(message)); - util.defer(done); + setImmediate(done); }; AMQPAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) { @@ -168,12 +168,12 @@ AMQPAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do if (!this._subs_counter.include(topic)) { this._queue.once("queueUnbindOk", function() { debug("queue unbound to topic " + topic); - util.defer(done); + setImmediate(done); }); this._queue.unbind(this._exchange, this._subTopic(topic)); } else { - util.defer(done); + setImmediate(done); } return this; @@ -202,7 +202,7 @@ AMQPAscoltatore.prototype.close = function close(done) { } debug("closed"); - util.defer(done); + setImmediate(done); that.emit("closed"); }; diff --git a/lib/ascoltatori.js b/lib/ascoltatori.js index c544796..68edede 100644 --- a/lib/ascoltatori.js +++ b/lib/ascoltatori.js @@ -85,7 +85,7 @@ module.exports.build = function build(opts, done) { } if (done) { - module.exports.util.defer(function() { + setImmediate(function() { result.once("ready", function() { result.removeListener("error", done); done(null, result); diff --git a/lib/behave_like_an_ascoltatore.js b/lib/behave_like_an_ascoltatore.js index aae79e8..be6daf9 100644 --- a/lib/behave_like_an_ascoltatore.js +++ b/lib/behave_like_an_ascoltatore.js @@ -1,7 +1,7 @@ "use strict"; var wrap = require("./util").wrap; -var async = require("async"); +var steed = require("steed")(); /** * This is a shared mocha test for verifying that an @@ -61,7 +61,7 @@ module.exports = function() { var that = this; that.instance.sub("hello", function() {}); setTimeout(function() { - // still, some Ascoltatore are async + // still, some Ascoltatore are steed // and we must give them some time done(); }, 10); @@ -220,7 +220,7 @@ module.exports = function() { funcToRemove = function(topic, value) { throw "that should never run"; }; - async.series([ + steed.series([ function(cb) { that.instance.sub("hello", funcToRemove, cb); @@ -246,7 +246,7 @@ module.exports = function() { funcToRemove = function(topic, value) { throw "that should never run"; }; - async.series([ + steed.series([ function(cb) { that.instance.sub("hello/42", wrap(done), cb); @@ -289,7 +289,7 @@ module.exports = function() { a.push(subscribe); } - async.parallel(a, instance.publish.bind(instance, "hello", null)); + steed.parallel(a, instance.publish.bind(instance, "hello", null)); }); it("should emit the ready event", function(done) { @@ -311,7 +311,7 @@ module.exports = function() { for (i = counter; i > 0; i = i - 1) { a.push(this.instance.on.bind(this.instance, "ready", callback)); } - async.parallel(a); + steed.parallel(a); }); it("should support removing a single listener", function(done) { @@ -321,7 +321,7 @@ module.exports = function() { funcToRemove = function(topic, value) { throw "that should never run"; }; - async.series([ + steed.series([ function(cb) { that.instance.sub("hello", wrap(done), cb); @@ -373,7 +373,7 @@ module.exports = function() { it("should allow the close method to be called twice", function(done) { var that = this; - async.series([ + steed.series([ this.instance.publish.bind(this.instance, "hello", "world"), this.instance.close.bind(this.instance), this.instance.close.bind(this.instance) @@ -417,7 +417,7 @@ module.exports = function() { // the nextTick hack is needed to skip out // of mocha control - async.setImmediate(function() { + steed.setImmediate(function() { that.instance.publish("throw"); }); }); @@ -446,7 +446,7 @@ module.exports = function() { }, cb); }; - async.series([ + steed.series([ sub, sub, function(cb) { @@ -477,7 +477,7 @@ module.exports = function() { }, cb); }; - async.series([ + steed.series([ function (cb) { sub("a/+", cb); }, diff --git a/lib/decorator_ascoltatore.js b/lib/decorator_ascoltatore.js index 2098634..ef99a1c 100644 --- a/lib/decorator_ascoltatore.js +++ b/lib/decorator_ascoltatore.js @@ -1,6 +1,6 @@ "use strict"; -var async = require("async"); +var steed = require("steed")(); var AbstractAscoltatore = require("./abstract_ascoltatore"); /** @@ -72,7 +72,7 @@ DecoratorAscoltatore.prototype.removeListener = function(event, callback) { DecoratorAscoltatore.prototype.subscribe = function(topic, callback, done) { var that = this; - async.waterfall([ + steed.waterfall([ function(cb) { that.wrapTopic(topic, cb); @@ -92,7 +92,7 @@ DecoratorAscoltatore.prototype.subscribe = function(topic, callback, done) { DecoratorAscoltatore.prototype.unsubscribe = function(topic, callback, done) { var that = this; - async.waterfall([ + steed.waterfall([ function(cb) { that.wrapTopic(topic, cb); @@ -112,7 +112,7 @@ DecoratorAscoltatore.prototype.unsubscribe = function(topic, callback, done) { DecoratorAscoltatore.prototype.publish = function(topic, payload, options, done) { var that = this; - async.waterfall([ + steed.waterfall([ function(cb) { that.wrapTopic(topic, cb); diff --git a/lib/event_emitter2_ascoltatore.js b/lib/event_emitter2_ascoltatore.js index 2769a40..ceb5c52 100644 --- a/lib/event_emitter2_ascoltatore.js +++ b/lib/event_emitter2_ascoltatore.js @@ -2,7 +2,6 @@ var AbstractAscoltatore = require("./abstract_ascoltatore"); var util = require("./util"); -var defer = util.defer; var debug = require("debug")("ascoltatori:ee2"); var EventEmitter2 = require("eventemitter2").EventEmitter2; var ascoltatori = require('./ascoltatori'); @@ -43,7 +42,7 @@ EventEmitter2Ascoltatore.prototype.subscribe = function subscribe(topic, callbac debug("registered new subscriber for topic " + topic); this._event.on(this._subTopic(topic).replace(/^\//g, ''), callback); - defer(done); + setImmediate(done); }; EventEmitter2Ascoltatore.prototype.publish = function (topic, message, options, done) { @@ -52,7 +51,7 @@ EventEmitter2Ascoltatore.prototype.publish = function (topic, message, options, this._event.emit(this._pubTopic(topic).replace(/^\//g, ''), topic, message, options); - defer(done); + setImmediate(done); }; EventEmitter2Ascoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) { @@ -62,7 +61,7 @@ EventEmitter2Ascoltatore.prototype.unsubscribe = function unsubscribe(topic, cal this._event.off(this._subTopic(topic).replace(/^\//g, ''), callback); - defer(done); + setImmediate(done); }; EventEmitter2Ascoltatore.prototype.close = function close(done) { @@ -71,7 +70,7 @@ EventEmitter2Ascoltatore.prototype.close = function close(done) { debug("closed"); - defer(done); + setImmediate(done); }; EventEmitter2Ascoltatore.prototype.registerDomain = function(domain) { diff --git a/lib/kafka_ascoltatore.js b/lib/kafka_ascoltatore.js index 174b01d..1f31a0c 100644 --- a/lib/kafka_ascoltatore.js +++ b/lib/kafka_ascoltatore.js @@ -5,9 +5,7 @@ var TrieAscoltatore = require("./trie_ascoltatore"); var AbstractAscoltatore = require('./abstract_ascoltatore'); var debug = require("debug")("ascoltatori:kafka"); var Qlobber = require('qlobber').Qlobber; - var SubsCounter = require("./subs_counter"); -var async = require("async"); /** * KafkaAscoltatore is a class that inherits from AbstractAscoltatore. @@ -194,7 +192,7 @@ KafkaAscoltatore.prototype._startConn = function(cb) { debug("error in client",e); that.emit("error", e); }); - util.defer(newcb); + setImmediate(newcb); }; that._consumerClient.on('ready',function() { that.withTopicOffsetsAdded(that._consumerClient,subscriptions,initConsumer); @@ -228,7 +226,7 @@ KafkaAscoltatore.prototype.subscribe = function subscribe(topic, onMessageReceiv this._ascoltatore.subscribe(topic, onMessageReceived); if(subscribe_topics.length === 0){ - util.defer(done); + setImmediate(done); return; } @@ -256,7 +254,7 @@ KafkaAscoltatore.prototype.addKafkaSubscriptions = function addKafkaSubscription return; } debug("registered new kafka subscriptions", subscribe_topics); - util.defer(done); + setImmediate(done); },true); }); }; @@ -334,7 +332,7 @@ KafkaAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, d var newDone = function() { debug("deregistered subscriber for topic " + subtopic); - util.defer(done); + setImmediate(done); }; this._subs_matcher.remove(subtopic); @@ -401,11 +399,11 @@ KafkaAscoltatore.prototype.createTopics = function createTopics(topics,callback) this._producer.createTopics(topics, false, function (err, data) { if(err){ debug("problem creating topics",err); - util.defer(function(){callback(err);}); + setImmediate(function(){callback(err);}); return; } that._knowntopics = that._knowntopics.concat(topics); - util.defer(callback); + setImmediate(callback); }); }; @@ -432,7 +430,7 @@ KafkaAscoltatore.prototype.close = function close(done) { delete that._consumerClient; delete that._consumer; that.emit("closed"); - util.defer(done); + setImmediate(done); }); }); }else{ diff --git a/lib/mongo_ascoltatore.js b/lib/mongo_ascoltatore.js index d913b9e..2315dbc 100644 --- a/lib/mongo_ascoltatore.js +++ b/lib/mongo_ascoltatore.js @@ -9,7 +9,7 @@ var debug = require("debug")("ascoltatori:mongodb"); var mongo = require('mongodb'); var MongoClient = require('mongodb').MongoClient; var ObjectID = require('mongodb').ObjectID; -var async = require("async"); +var steed = require("steed"); /** * MongoAscoltatore is a class that inherits from AbstractAscoltatore. @@ -331,7 +331,7 @@ MongoAscoltatore.prototype.close = function close(done) { that._closed = true; - async.series([ + steed.series([ function(cb) { if (that._cursor) { that._cursor.close(cb); diff --git a/lib/mqtt_ascoltatore.js b/lib/mqtt_ascoltatore.js index 96b992d..6fa0157 100644 --- a/lib/mqtt_ascoltatore.js +++ b/lib/mqtt_ascoltatore.js @@ -6,7 +6,7 @@ var TrieAscoltatore = require("./trie_ascoltatore"); var AbstractAscoltatore = require('./abstract_ascoltatore'); var debug = require("debug")("ascoltatori:mqtt"); var SubsCounter = require("./subs_counter"); -var async = require("async"); +var steed = require("steed")(); /** * MQTTAscoltatore is a class that inherits from AbstractAscoltatore. @@ -74,7 +74,7 @@ MQTTAscoltatore.prototype._startConn = function() { debug("received new packet on topic " + topic); // we need to skip out this callback, so we do not // break the client when an exception occurs - util.defer(function() { + setImmediate(function() { that._ascoltatore.publish(that._recvTopic(topic), payload, packet); }); }); @@ -90,14 +90,14 @@ MQTTAscoltatore.prototype._startConn = function() { MQTTAscoltatore.prototype.reconnectTopics = function reconnectTopics(cb) { var that = this; - + var subscribedTopics = that._subs_counter.keys(); - + var opts = { qos: 1 }; - - async.each(subscribedTopics, function(topic, callback) { + + steed.each(subscribedTopics, function(topic, callback) { that._client.subscribe(that._subTopic(topic), opts, function() { debug("re-registered subscriber for topic " + topic); callback(); @@ -119,10 +119,10 @@ MQTTAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) }; this._client.subscribe(this._subTopic(topic), opts, function() { debug("registered new subscriber for topic " + topic); - util.defer(done); + setImmediate(done); }); } else { - util.defer(done); + setImmediate(done); } this._subs_counter.add(topic); @@ -147,7 +147,7 @@ MQTTAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do newDone = function() { debug("deregistered subscriber for topic " + topic); - util.defer(done); + setImmediate(done); }; this._ascoltatore.unsubscribe(topic, callback); @@ -172,7 +172,7 @@ MQTTAscoltatore.prototype.close = function close(done) { that._ascoltatore.close(); delete that._client; that.emit("closed"); - util.defer(done); + setImmediate(done); }); this._client.end(); } else { diff --git a/lib/redis_ascoltatore.js b/lib/redis_ascoltatore.js index d5bd0a7..f00b266 100644 --- a/lib/redis_ascoltatore.js +++ b/lib/redis_ascoltatore.js @@ -121,7 +121,7 @@ RedisAscoltatore.prototype._startSub = function() { handler = function(sub, topic, payload) { topic = topic.toString(); // cast to string in case of Buffer instance (nodejs-redis >=2.0) debug("new message received for topic " + topic); - util.defer(function() { + setImmediate(function() { // we need to skip out this callback, so we do not // break the client when an exception occurs var ascoltatore = that._ascoltatores[sub]; @@ -166,7 +166,7 @@ RedisAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) var newDone = function() { debug("registered new subscriber for topic " + topic); - util.defer(done); + setImmediate(done); }; var subTopic = this._subTopic(topic); @@ -217,7 +217,7 @@ RedisAscoltatore.prototype.publish = function publish(topic, message, options, d this._client.publish(topic, msgpack.encode(payload), function() { debug("new message published to " + topic); - util.defer(done); + setImmediate(done); }); }; @@ -241,7 +241,7 @@ RedisAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, d var newDone = function() { debug("deregistered subscriber for topic " + topic); - util.defer(done); + setImmediate(done); }; if (this._subs_counter.include(subTopic)) { @@ -270,7 +270,7 @@ RedisAscoltatore.prototype.close = function close(done) { newDone = function() { debug("closed"); - util.defer(done); + setImmediate(done); }; if (this._closed) { diff --git a/lib/trie_ascoltatore.js b/lib/trie_ascoltatore.js index 35cdb59..b3ced3f 100644 --- a/lib/trie_ascoltatore.js +++ b/lib/trie_ascoltatore.js @@ -2,7 +2,6 @@ var AbstractAscoltatore = require("./abstract_ascoltatore"); var util = require("./util"); -var defer = util.defer; var debug = require("debug")("ascoltatori:trie"); var Qlobber = require("qlobber").Qlobber; var ascoltatori = require('./ascoltatori'); @@ -41,7 +40,7 @@ TrieAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) debug("registered new subscriber for topic " + topic); this._matcher.add(topic, callback); - defer(done); + setImmediate(done); }; TrieAscoltatore.prototype.publish = function (topic, message, options, done) { @@ -54,7 +53,7 @@ TrieAscoltatore.prototype.publish = function (topic, message, options, done) { cbs[i](topic, message, options); } - defer(done); + setImmediate(done); }; TrieAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) { @@ -64,7 +63,7 @@ TrieAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do this._matcher.remove(topic, callback); - defer(done); + setImmediate(done); }; TrieAscoltatore.prototype.close = function close(done) { @@ -73,7 +72,7 @@ TrieAscoltatore.prototype.close = function close(done) { debug("closed"); - defer(done); + setImmediate(done); }; TrieAscoltatore.prototype.registerDomain = function(domain) { diff --git a/lib/util.js b/lib/util.js index b785383..8cecd5e 100644 --- a/lib/util.js +++ b/lib/util.js @@ -2,7 +2,7 @@ var util = require('util'); var uuid = require("node-uuid"); -var async = require("async"); +var steed = require("steed"); /** * A function to build an unique identifier. @@ -76,17 +76,3 @@ function wrap(done) { }; } module.exports.wrap = wrap; - -/** - * Defer the execution of the passed function to the - * next tick. The function might be null. - * - * @api public - * @param {Function} done the funcion to be deferred. - */ -function defer(done) { - if (typeof done === 'function') { - async.setImmediate(done); - } -} -module.exports.defer = defer; diff --git a/lib/zeromq_ascoltatore.js b/lib/zeromq_ascoltatore.js index c5f3eaa..e5f61b6 100644 --- a/lib/zeromq_ascoltatore.js +++ b/lib/zeromq_ascoltatore.js @@ -5,7 +5,7 @@ var wrap = util.wrap; var TrieAscoltatore = require("./trie_ascoltatore"); var AbstractAscoltatore = require('./abstract_ascoltatore'); var debug = require("debug")("ascoltatori:zmq"); -var async = require("async"); +var steed = require("steed")(); /** * ZeromqAscoltatore is a class that inherits from AbstractAscoltatore. @@ -164,7 +164,7 @@ ZeromqAscoltatore.prototype.connect = function connect(port, callback) { }; }); - async.parallel(dests, function() { + steed.parallel(dests, function() { setTimeout(function() { wrap(callback)(); }, that._opts.delay); @@ -206,7 +206,7 @@ ZeromqAscoltatore.prototype._connectSub = function(port, callback) { setTimeout(function() { debug("connected and subscribed to " + port); - util.defer(callback); + setImmediate(callback); }, this._opts.delay); return this; @@ -230,21 +230,21 @@ ZeromqAscoltatore.prototype.publish = function publish(topic, message, done) { this._pub_conn.send(toSend); debug("new message published to " + topic); - util.defer(done); // simulate some asynchronicity + setImmediate(done); // simulate some steedhronicity }; ZeromqAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) { this._raiseIfClosed(); debug("deregistered subscriber for topic " + topic); this._ascoltatore.unsubscribe(topic, callback); - util.defer(done); // simulate some asynchronicity + setImmediate(done); // simulate some steedhronicity }; ZeromqAscoltatore.prototype.close = function close(done) { var that = this; if (this._closed) { - util.defer(done); + setImmediate(done); return; } @@ -274,7 +274,7 @@ ZeromqAscoltatore.prototype.close = function close(done) { debug("closed"); that._ascoltatore.close(); that.emit("closed"); - util.defer(done); + setImmediate(done); }, this._opts.delay); }; diff --git a/package.json b/package.json index f6e3f81..fd76c85 100644 --- a/package.json +++ b/package.json @@ -59,10 +59,10 @@ "pre-commit": "^1.1.2" }, "dependencies": { - "async": "^1.5.0", "debug": "^2.2.0", "node-uuid": "~1.4.3", - "qlobber": "~0.7.0" + "qlobber": "~0.7.0", + "steed": "^1.1.3" }, "optionalDependencies": { "ioredis": "^1.15.1", diff --git a/test/amqp_ascoltatore_spec.js b/test/amqp_ascoltatore_spec.js index 7751d53..054e5e6 100644 --- a/test/amqp_ascoltatore_spec.js +++ b/test/amqp_ascoltatore_spec.js @@ -1,3 +1,5 @@ +var steed = require('steed')(); + describeAscoltatore("AMQP", function() { afterEach(function() { this.instance.close(); @@ -11,7 +13,7 @@ describeAscoltatore("AMQP", function() { it("should sync two instances", function(done) { var other = new ascoltatori.AMQPAscoltatore(this.instance._opts); var that = this; - async.series([ + steed.series([ function(cb) { other.on("ready", cb); diff --git a/test/common.js b/test/common.js index 9114988..7574eae 100644 --- a/test/common.js +++ b/test/common.js @@ -3,7 +3,6 @@ global.sinon = require("sinon"); global.chai = require("chai"); global.expect = require("chai").expect; -global.async = require("async"); global.redisSettings = function() { return { diff --git a/test/kafka_ascoltatore_spec.js b/test/kafka_ascoltatore_spec.js index eeb6bb8..2104992 100644 --- a/test/kafka_ascoltatore_spec.js +++ b/test/kafka_ascoltatore_spec.js @@ -1,5 +1,6 @@ var fs = require("fs"); var util = require("../lib/util"); +var steed = require('steed')(); describeAscoltatore("kafka", function() { @@ -11,7 +12,7 @@ describeAscoltatore("kafka", function() { it("should sync two instances", function(done) { var other = new ascoltatori.KafkaAscoltatore(kafkaSettings()); var that = this; - async.series([ + steed.series([ function(cb) { other.on("ready", cb); diff --git a/test/mongo_ascoltatore_spec.js b/test/mongo_ascoltatore_spec.js index 61f49c9..b506727 100644 --- a/test/mongo_ascoltatore_spec.js +++ b/test/mongo_ascoltatore_spec.js @@ -1,6 +1,6 @@ var MongoClient = require('mongodb').MongoClient; -var async = require('async'); +var steed = require('steed')(); describeAscoltatore("mongo", function() { afterEach(function(done) { @@ -51,7 +51,7 @@ describeAscoltatore("mongo", function() { done(); } }, function() { - async.times(max, doPub); + steed.times(max, doPub); }); }); diff --git a/test/mqtt_ascoltatore_spec.js b/test/mqtt_ascoltatore_spec.js index 0d4b0be..88cb464 100644 --- a/test/mqtt_ascoltatore_spec.js +++ b/test/mqtt_ascoltatore_spec.js @@ -1,3 +1,4 @@ +var steed = require('steed')(); describeAscoltatore("MQTT", function() { @@ -11,7 +12,7 @@ describeAscoltatore("MQTT", function() { it("should sync two instances", function(done) { var other = new ascoltatori.MQTTAscoltatore(MQTTSettings()); var that = this; - async.series([ + steed.series([ function(cb) { other.on("ready", cb); @@ -49,7 +50,7 @@ describe("MQTT Reconnect Test", function() { var that = this; var mosca = require("mosca"); var msgReceived = false; - + var moscaOpts = { port: 6884, stats: false, @@ -57,51 +58,51 @@ describe("MQTT Reconnect Test", function() { level: "fatal" } }; - + var clientOpts = { json: false, mqtt: require("mqtt"), host: "127.0.0.1", port: 6884 }; - + var mqttTestServer = new mosca.Server(moscaOpts); var newClient = new ascoltatori.MQTTAscoltatore(clientOpts); - - async.series([ + + steed.series([ function(cb) { - newClient.once('ready',cb); + newClient.once('ready',cb); }, - + function(cb) { // Subscribe to topic for test newClient.subscribe('reconnect/test', function() { newClient.emit('success'); }, cb); }, - + // Stop the MQTT server function(cb) { mqttTestServer.close(cb); }, - + // Start the MQTT server function(cb) { mqttTestServer = new mosca.Server(moscaOpts, cb); }, - + // Setup listener and send message function(cb) { newClient.once('success', function() { msgReceived = true; cb(); }); - + newClient.once('ready', function(){ - newClient.publish('reconnect/test', 'blah'); + newClient.publish('reconnect/test', 'blah'); }); }, - + ], function() { if (msgReceived) { done(); diff --git a/test/redis_ascoltatore_spec.js b/test/redis_ascoltatore_spec.js index 9bf25bd..58b996f 100644 --- a/test/redis_ascoltatore_spec.js +++ b/test/redis_ascoltatore_spec.js @@ -1,5 +1,6 @@ var fs = require("fs"); var Redis = require("ioredis"); +var steed = require("steed")(); describeAscoltatore("redis", function() { @@ -27,7 +28,7 @@ describeAscoltatore("redis", function() { it("should sync two instances", function(done) { var other = new ascoltatori.RedisAscoltatore(redisSettings()); var that = this; - async.series([ + steed.series([ function(cb) { other.on("ready", cb); diff --git a/test/zeromq_ascoltatore_spec.js b/test/zeromq_ascoltatore_spec.js index 2a09fea..c688606 100644 --- a/test/zeromq_ascoltatore_spec.js +++ b/test/zeromq_ascoltatore_spec.js @@ -1,3 +1,5 @@ +var steed = require("steed")(); + describeAscoltatore("zeromq", function() { var toClose = null; @@ -7,15 +9,15 @@ describeAscoltatore("zeromq", function() { }); afterEach(function(done) { - async.each(toClose, function(i, cb) { + steed.each(toClose, function(i, cb) { i.close(cb); - }, async.setImmediate.bind(null, done)); + }, setImmediate.bind(null, done)); }); it("should sync two instances", function(done) { var instance = this.instance; var other = new ascoltatori.ZeromqAscoltatore(zeromqSettings()); - async.series([ + steed.series([ function(cb) { other.on("ready", cb); @@ -49,7 +51,7 @@ describeAscoltatore("zeromq", function() { } }; - async.series([ + steed.series([ function(cb) { other.on("ready", cb); From 642ed6bab34c4a2d9b8ed6c66009dc952d2f65aa Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 24 May 2016 08:15:09 +0200 Subject: [PATCH 2/4] restored defer --- lib/amqp_ascoltatore.js | 15 ++++++----- lib/behave_like_an_ascoltatore.js | 44 ------------------------------- lib/event_emitter2_ascoltatore.js | 9 ++++--- lib/kafka_ascoltatore.js | 15 ++++++----- lib/mqtt_ascoltatore.js | 11 ++++---- lib/redis_ascoltatore.js | 11 ++++---- lib/trie_ascoltatore.js | 9 ++++--- lib/util.js | 7 +++++ lib/zeromq_ascoltatore.js | 11 ++++---- 9 files changed, 51 insertions(+), 81 deletions(-) diff --git a/lib/amqp_ascoltatore.js b/lib/amqp_ascoltatore.js index 7465539..b278c62 100644 --- a/lib/amqp_ascoltatore.js +++ b/lib/amqp_ascoltatore.js @@ -2,6 +2,7 @@ var util = require("./util"); var wrap = util.wrap; +var defer = util.defer; var TrieAscoltatore = require("./trie_ascoltatore"); var AbstractAscoltatore = require('./abstract_ascoltatore'); var steed = require("steed")(); @@ -106,7 +107,7 @@ AMQPAscoltatore.prototype._startConn = function() { that._ascoltatore.publish(topic, message.data.toString()); }); that._queue.once("basicConsumeOk", function() { - setImmediate(callback); + defer(callback); }); }, @@ -134,13 +135,13 @@ AMQPAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) // as advertised setTimeout(function() { debug("queue bound to topic " + topic); - setImmediate(done); + defer(done); }, 5); }); this._queue.bind(this._exchange, this._subTopic(topic)); } else { - setImmediate(done); + defer(done); } this._subs_counter.add(topic); @@ -154,7 +155,7 @@ AMQPAscoltatore.prototype.publish = function publish(topic, message, done) { debug("new message published to " + topic); this._exchange.publish(this._pubTopic(topic), String(message)); - setImmediate(done); + defer(done); }; AMQPAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) { @@ -168,12 +169,12 @@ AMQPAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do if (!this._subs_counter.include(topic)) { this._queue.once("queueUnbindOk", function() { debug("queue unbound to topic " + topic); - setImmediate(done); + defer(done); }); this._queue.unbind(this._exchange, this._subTopic(topic)); } else { - setImmediate(done); + defer(done); } return this; @@ -202,7 +203,7 @@ AMQPAscoltatore.prototype.close = function close(done) { } debug("closed"); - setImmediate(done); + defer(done); that.emit("closed"); }; diff --git a/lib/behave_like_an_ascoltatore.js b/lib/behave_like_an_ascoltatore.js index be6daf9..39051c1 100644 --- a/lib/behave_like_an_ascoltatore.js +++ b/lib/behave_like_an_ascoltatore.js @@ -380,50 +380,6 @@ module.exports = function() { ], done); }); - // skip the tests, we need to drop these anyway - describe.skip("wrapping uncaughtException", function() { - var uncaughtExceptionHandler, dm; - - beforeEach(function(done) { - dm = domain.create(); - - var listeners = process.listeners("uncaughtException"); - uncaughtExceptionHandler = listeners[listeners.length - 1]; - process.removeListener("uncaughtException", uncaughtExceptionHandler); - done(); - }); - - afterEach(function(done) { - process.on("uncaughtException", uncaughtExceptionHandler); - dm.dispose(); - done(); - }); - - it("should support domains", function(done) { - var that = this; - - dm.on("error", function(err) { - expect(err.message).to.equal("ahaha"); - done(); - }); - - that.instance.registerDomain(dm); - - that.instance.subscribe("throw", function() { - throw new Error("ahaha"); - }, function() { - // we need to properly wait that the subscribe - // has happened correctly - - // the nextTick hack is needed to skip out - // of mocha control - steed.setImmediate(function() { - that.instance.publish("throw"); - }); - }); - }); - }); - it("should not deliver message twice for double subscription", function(done) { var that = this, count = 2, diff --git a/lib/event_emitter2_ascoltatore.js b/lib/event_emitter2_ascoltatore.js index ceb5c52..2769a40 100644 --- a/lib/event_emitter2_ascoltatore.js +++ b/lib/event_emitter2_ascoltatore.js @@ -2,6 +2,7 @@ var AbstractAscoltatore = require("./abstract_ascoltatore"); var util = require("./util"); +var defer = util.defer; var debug = require("debug")("ascoltatori:ee2"); var EventEmitter2 = require("eventemitter2").EventEmitter2; var ascoltatori = require('./ascoltatori'); @@ -42,7 +43,7 @@ EventEmitter2Ascoltatore.prototype.subscribe = function subscribe(topic, callbac debug("registered new subscriber for topic " + topic); this._event.on(this._subTopic(topic).replace(/^\//g, ''), callback); - setImmediate(done); + defer(done); }; EventEmitter2Ascoltatore.prototype.publish = function (topic, message, options, done) { @@ -51,7 +52,7 @@ EventEmitter2Ascoltatore.prototype.publish = function (topic, message, options, this._event.emit(this._pubTopic(topic).replace(/^\//g, ''), topic, message, options); - setImmediate(done); + defer(done); }; EventEmitter2Ascoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) { @@ -61,7 +62,7 @@ EventEmitter2Ascoltatore.prototype.unsubscribe = function unsubscribe(topic, cal this._event.off(this._subTopic(topic).replace(/^\//g, ''), callback); - setImmediate(done); + defer(done); }; EventEmitter2Ascoltatore.prototype.close = function close(done) { @@ -70,7 +71,7 @@ EventEmitter2Ascoltatore.prototype.close = function close(done) { debug("closed"); - setImmediate(done); + defer(done); }; EventEmitter2Ascoltatore.prototype.registerDomain = function(domain) { diff --git a/lib/kafka_ascoltatore.js b/lib/kafka_ascoltatore.js index 1f31a0c..6f49633 100644 --- a/lib/kafka_ascoltatore.js +++ b/lib/kafka_ascoltatore.js @@ -1,6 +1,7 @@ "use strict"; var util = require("./util"); +var defer = util.defer; var TrieAscoltatore = require("./trie_ascoltatore"); var AbstractAscoltatore = require('./abstract_ascoltatore'); var debug = require("debug")("ascoltatori:kafka"); @@ -192,7 +193,7 @@ KafkaAscoltatore.prototype._startConn = function(cb) { debug("error in client",e); that.emit("error", e); }); - setImmediate(newcb); + defer(newcb); }; that._consumerClient.on('ready',function() { that.withTopicOffsetsAdded(that._consumerClient,subscriptions,initConsumer); @@ -226,7 +227,7 @@ KafkaAscoltatore.prototype.subscribe = function subscribe(topic, onMessageReceiv this._ascoltatore.subscribe(topic, onMessageReceived); if(subscribe_topics.length === 0){ - setImmediate(done); + defer(done); return; } @@ -254,7 +255,7 @@ KafkaAscoltatore.prototype.addKafkaSubscriptions = function addKafkaSubscription return; } debug("registered new kafka subscriptions", subscribe_topics); - setImmediate(done); + defer(done); },true); }); }; @@ -332,7 +333,7 @@ KafkaAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, d var newDone = function() { debug("deregistered subscriber for topic " + subtopic); - setImmediate(done); + defer(done); }; this._subs_matcher.remove(subtopic); @@ -399,11 +400,11 @@ KafkaAscoltatore.prototype.createTopics = function createTopics(topics,callback) this._producer.createTopics(topics, false, function (err, data) { if(err){ debug("problem creating topics",err); - setImmediate(function(){callback(err);}); + defer(function(){callback(err);}); return; } that._knowntopics = that._knowntopics.concat(topics); - setImmediate(callback); + defer(callback); }); }; @@ -430,7 +431,7 @@ KafkaAscoltatore.prototype.close = function close(done) { delete that._consumerClient; delete that._consumer; that.emit("closed"); - setImmediate(done); + defer(done); }); }); }else{ diff --git a/lib/mqtt_ascoltatore.js b/lib/mqtt_ascoltatore.js index 6fa0157..aa1d5e5 100644 --- a/lib/mqtt_ascoltatore.js +++ b/lib/mqtt_ascoltatore.js @@ -2,6 +2,7 @@ var util = require("./util"); var wrap = util.wrap; +var defer = util.defer; var TrieAscoltatore = require("./trie_ascoltatore"); var AbstractAscoltatore = require('./abstract_ascoltatore'); var debug = require("debug")("ascoltatori:mqtt"); @@ -74,7 +75,7 @@ MQTTAscoltatore.prototype._startConn = function() { debug("received new packet on topic " + topic); // we need to skip out this callback, so we do not // break the client when an exception occurs - setImmediate(function() { + defer(function() { that._ascoltatore.publish(that._recvTopic(topic), payload, packet); }); }); @@ -119,10 +120,10 @@ MQTTAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) }; this._client.subscribe(this._subTopic(topic), opts, function() { debug("registered new subscriber for topic " + topic); - setImmediate(done); + defer(done); }); } else { - setImmediate(done); + defer(done); } this._subs_counter.add(topic); @@ -147,7 +148,7 @@ MQTTAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do newDone = function() { debug("deregistered subscriber for topic " + topic); - setImmediate(done); + defer(done); }; this._ascoltatore.unsubscribe(topic, callback); @@ -172,7 +173,7 @@ MQTTAscoltatore.prototype.close = function close(done) { that._ascoltatore.close(); delete that._client; that.emit("closed"); - setImmediate(done); + defer(done); }); this._client.end(); } else { diff --git a/lib/redis_ascoltatore.js b/lib/redis_ascoltatore.js index f00b266..70aa9e9 100644 --- a/lib/redis_ascoltatore.js +++ b/lib/redis_ascoltatore.js @@ -4,6 +4,7 @@ var Redis = require('ioredis'); var msgpack = require('msgpack-lite'); var util = require("./util"); var wrap = util.wrap; +var defer = util.defer; var TrieAscoltatore = require("./trie_ascoltatore"); var AbstractAscoltatore = require('./abstract_ascoltatore'); var SubsCounter = require("./subs_counter"); @@ -121,7 +122,7 @@ RedisAscoltatore.prototype._startSub = function() { handler = function(sub, topic, payload) { topic = topic.toString(); // cast to string in case of Buffer instance (nodejs-redis >=2.0) debug("new message received for topic " + topic); - setImmediate(function() { + defer(function() { // we need to skip out this callback, so we do not // break the client when an exception occurs var ascoltatore = that._ascoltatores[sub]; @@ -166,7 +167,7 @@ RedisAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) var newDone = function() { debug("registered new subscriber for topic " + topic); - setImmediate(done); + defer(done); }; var subTopic = this._subTopic(topic); @@ -217,7 +218,7 @@ RedisAscoltatore.prototype.publish = function publish(topic, message, options, d this._client.publish(topic, msgpack.encode(payload), function() { debug("new message published to " + topic); - setImmediate(done); + defer(done); }); }; @@ -241,7 +242,7 @@ RedisAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, d var newDone = function() { debug("deregistered subscriber for topic " + topic); - setImmediate(done); + defer(done); }; if (this._subs_counter.include(subTopic)) { @@ -270,7 +271,7 @@ RedisAscoltatore.prototype.close = function close(done) { newDone = function() { debug("closed"); - setImmediate(done); + defer(done); }; if (this._closed) { diff --git a/lib/trie_ascoltatore.js b/lib/trie_ascoltatore.js index b3ced3f..35cdb59 100644 --- a/lib/trie_ascoltatore.js +++ b/lib/trie_ascoltatore.js @@ -2,6 +2,7 @@ var AbstractAscoltatore = require("./abstract_ascoltatore"); var util = require("./util"); +var defer = util.defer; var debug = require("debug")("ascoltatori:trie"); var Qlobber = require("qlobber").Qlobber; var ascoltatori = require('./ascoltatori'); @@ -40,7 +41,7 @@ TrieAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) debug("registered new subscriber for topic " + topic); this._matcher.add(topic, callback); - setImmediate(done); + defer(done); }; TrieAscoltatore.prototype.publish = function (topic, message, options, done) { @@ -53,7 +54,7 @@ TrieAscoltatore.prototype.publish = function (topic, message, options, done) { cbs[i](topic, message, options); } - setImmediate(done); + defer(done); }; TrieAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) { @@ -63,7 +64,7 @@ TrieAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do this._matcher.remove(topic, callback); - setImmediate(done); + defer(done); }; TrieAscoltatore.prototype.close = function close(done) { @@ -72,7 +73,7 @@ TrieAscoltatore.prototype.close = function close(done) { debug("closed"); - setImmediate(done); + defer(done); }; TrieAscoltatore.prototype.registerDomain = function(domain) { diff --git a/lib/util.js b/lib/util.js index 8cecd5e..72da7c0 100644 --- a/lib/util.js +++ b/lib/util.js @@ -76,3 +76,10 @@ function wrap(done) { }; } module.exports.wrap = wrap; + +function defer (done) { + if (typeof done === "function") { + setImmediate(done); + } +} +module.exports.defer = defer; diff --git a/lib/zeromq_ascoltatore.js b/lib/zeromq_ascoltatore.js index e5f61b6..5c12689 100644 --- a/lib/zeromq_ascoltatore.js +++ b/lib/zeromq_ascoltatore.js @@ -2,6 +2,7 @@ var util = require("./util"); var wrap = util.wrap; +var defer = util.defer; var TrieAscoltatore = require("./trie_ascoltatore"); var AbstractAscoltatore = require('./abstract_ascoltatore'); var debug = require("debug")("ascoltatori:zmq"); @@ -206,7 +207,7 @@ ZeromqAscoltatore.prototype._connectSub = function(port, callback) { setTimeout(function() { debug("connected and subscribed to " + port); - setImmediate(callback); + defer(callback); }, this._opts.delay); return this; @@ -230,21 +231,21 @@ ZeromqAscoltatore.prototype.publish = function publish(topic, message, done) { this._pub_conn.send(toSend); debug("new message published to " + topic); - setImmediate(done); // simulate some steedhronicity + defer(done); // simulate some steedhronicity }; ZeromqAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) { this._raiseIfClosed(); debug("deregistered subscriber for topic " + topic); this._ascoltatore.unsubscribe(topic, callback); - setImmediate(done); // simulate some steedhronicity + defer(done); // simulate some steedhronicity }; ZeromqAscoltatore.prototype.close = function close(done) { var that = this; if (this._closed) { - setImmediate(done); + defer(done); return; } @@ -274,7 +275,7 @@ ZeromqAscoltatore.prototype.close = function close(done) { debug("closed"); that._ascoltatore.close(); that.emit("closed"); - setImmediate(done); + defer(done); }, this._opts.delay); }; From e8db0af230b311cc6ffc854ab771d2435ac92ea5 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 24 May 2016 08:47:25 +0200 Subject: [PATCH 3/4] removed domains --- README.md | 28 ---------------------------- lib/abstract_ascoltatore.js | 11 ----------- lib/behave_like_an_ascoltatore.js | 7 +------ lib/decorator_ascoltatore.js | 4 ---- lib/event_emitter2_ascoltatore.js | 10 ---------- lib/filesystem_ascoltatore.js | 17 ----------------- lib/redis_ascoltatore.js | 16 ---------------- lib/trie_ascoltatore.js | 10 ---------- 8 files changed, 1 insertion(+), 102 deletions(-) diff --git a/README.md b/README.md index ce0c724..0392381 100644 --- a/README.md +++ b/README.md @@ -316,34 +316,6 @@ If you publish to a kafka topic that doesn't exist, that topic will be created u If you subscribe to a kafka topic that doesn't exist, that subscription will take affect only when something is published to the kafka topic through this ascoltatori. -## Domain support - -Ascoltatori supports the [node.js domain API](http://nodejs.org/api/domain.html). -Use it calling the `registerDomain` function on your Ascoltatore and it will take -care of routing the exceptions to the given domain. Look at this example: - -```javascript -var ascoltatori = require('ascoltatori'); -var domain = require('domain'); - -var d = domain.create(); -d.on('error', function() { - console.log(arguments); -}); - -ascoltatori.build(function (err, ascoltatore) { - ascoltatore.registerDomain(d); - - ascoltatore.subscribe('hello/*', function() { - throw new Error(); - }); - - ascoltatore.publish('hello/42', 'a message', function() { - console.log('message published'); - }); -}); -``` - ## Debugging diff --git a/lib/abstract_ascoltatore.js b/lib/abstract_ascoltatore.js index 6dadbeb..0b78491 100644 --- a/lib/abstract_ascoltatore.js +++ b/lib/abstract_ascoltatore.js @@ -175,17 +175,6 @@ AbstractAscoltatore.prototype.close = function(done) { throw new Error("Subclass to implement"); }; -/** - * You can register a nodejs domain so that every callback is - * jailed and cannot crash the process. - * - * @param {Domain} domain the node.js error domain to use. - * @api public - */ -AbstractAscoltatore.prototype.registerDomain = function(domain) { - this._ascoltatore.registerDomain(domain); -}; - AbstractAscoltatore.prototype._subTopic = function(topic) { if (this._reInSeparator) { topic = topic.replace(this._reInSeparator, diff --git a/lib/behave_like_an_ascoltatore.js b/lib/behave_like_an_ascoltatore.js index 39051c1..b7851a6 100644 --- a/lib/behave_like_an_ascoltatore.js +++ b/lib/behave_like_an_ascoltatore.js @@ -17,12 +17,7 @@ var steed = require("steed")(); */ module.exports = function() { - var domain, expect; - - // we need to require this here - // so we do not force a dependency on the new - // domain stuff - domain = require("domain"); + var expect; // you MUST depend on chai only if you plan to run // this test diff --git a/lib/decorator_ascoltatore.js b/lib/decorator_ascoltatore.js index ef99a1c..1346e8f 100644 --- a/lib/decorator_ascoltatore.js +++ b/lib/decorator_ascoltatore.js @@ -134,10 +134,6 @@ DecoratorAscoltatore.prototype.close = function(done) { this._ascoltatore.close(done); }; -DecoratorAscoltatore.prototype.registerDomain = function(domain) { - this._ascoltatore.registerDomain(domain); -}; - DecoratorAscoltatore.prototype.unsub = function(topic, callback, done) { this.unsubscribe(topic, callback, done); }; diff --git a/lib/event_emitter2_ascoltatore.js b/lib/event_emitter2_ascoltatore.js index 2769a40..d344feb 100644 --- a/lib/event_emitter2_ascoltatore.js +++ b/lib/event_emitter2_ascoltatore.js @@ -74,16 +74,6 @@ EventEmitter2Ascoltatore.prototype.close = function close(done) { defer(done); }; -EventEmitter2Ascoltatore.prototype.registerDomain = function(domain) { - debug("registered domain"); - - if (!this._publish) { - this._publish = this.publish; - } - - this.publish = domain.bind(this._publish); -}; - util.aliasAscoltatore(EventEmitter2Ascoltatore.prototype); /** diff --git a/lib/filesystem_ascoltatore.js b/lib/filesystem_ascoltatore.js index eda5d4c..bd3bb01 100644 --- a/lib/filesystem_ascoltatore.js +++ b/lib/filesystem_ascoltatore.js @@ -56,10 +56,6 @@ FileSystemAscoltatore.prototype.subscribe = function (topic, callback, done) var f = cb; - if (this._domain) { - f = this._domain.bind(cb); - } - callback[this._dehnd] = callback[this._dehnd] || f; this._fsq.subscribe(this._subTopic(topic), @@ -87,19 +83,6 @@ FileSystemAscoltatore.prototype.close = function (done) this._fsq.stop_watching(done); }; -FileSystemAscoltatore.prototype.registerDomain = function (domain) -{ - debug('registered domain'); - - this._domain = domain; - - var ths = this; - - domain.on('error', function () { - ths._fsq.stop_watching(); - }); -}; - util.aliasAscoltatore(FileSystemAscoltatore.prototype); /** diff --git a/lib/redis_ascoltatore.js b/lib/redis_ascoltatore.js index 70aa9e9..72241c0 100644 --- a/lib/redis_ascoltatore.js +++ b/lib/redis_ascoltatore.js @@ -44,18 +44,6 @@ function RedisAscoltatore(opts) { this._ascoltatores = {}; - this._domain = null; - - var that = this; - this._ascoltatore = { - registerDomain: function (domain) { - that._domain = domain; - for (var subTopic in that._ascoltatores) { - that._ascoltatores[subTopic].registerDomain(domain); - } - } - }; - this._startSub(); this._startPub(); @@ -189,10 +177,6 @@ RedisAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) if (!ascoltatore) { ascoltatore = this._ascoltatores[subTopic] = new TrieAscoltatore(this._opts); - - if (this._domain) { - ascoltatore.registerDomain(this._domain); - } } ascoltatore.subscribe(topic, callback); diff --git a/lib/trie_ascoltatore.js b/lib/trie_ascoltatore.js index 35cdb59..f62fa08 100644 --- a/lib/trie_ascoltatore.js +++ b/lib/trie_ascoltatore.js @@ -76,16 +76,6 @@ TrieAscoltatore.prototype.close = function close(done) { defer(done); }; -TrieAscoltatore.prototype.registerDomain = function(domain) { - debug("registered domain"); - - if (!this._publish) { - this._publish = this.publish; - } - - this.publish = domain.bind(this._publish); -}; - util.aliasAscoltatore(TrieAscoltatore.prototype); /** From 855905ba39b97687f13b336689e5ae5869e89f77 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 26 May 2016 15:48:20 +0200 Subject: [PATCH 4/4] do not run travis on node v5 --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 262f8ea..2b1eb18 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,6 @@ language: node_js sudo: false node_js: - "6" - - "5" - "4" - "0.12" script: