Skip to content

Commit

Permalink
Provide a method to shutdown a producer, update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Jan 30, 2018
1 parent d377385 commit 9c2c158
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 14 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
example-dev
node_modules
.idea
.idea
package-lock.json
21 changes: 10 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ so the message is re-queued again and considered to be unacknowledged.
* Producer class constructor `Producer(queueName, config)`:
* `queueName` *(string): Required.* The name of the queue where produced messages are queued.
* `config` *(object): Required.* Configuration object.
* A Producer instance provides 2 methods:
* A Producer instance provides 3 methods:
* `produce(message, cb)`: Produce a message.
* `message` *(mixed): Required.* The actual message, to be consumed by a consumer.
* `cb(err)` *(function): Required.* Callback function. When called without error argument, the message is
Expand All @@ -127,7 +127,10 @@ so the message is re-queued again and considered to be unacknowledged.
* `ttl` *(Integer): Required.* Message TTL in milliseconds.
* `cb(err)` *(function): Required.* Callback function. When called without error argument, the message is
successfully published.

* `shutdown()`: Gracefully shutdown the producer and disconnect from the redis server. This method should be used
only in rare cases where we need to force the producer to terminate its work. Normally a producer should be kept
always online.

#### Producer example

```javascript
Expand All @@ -141,19 +144,15 @@ const Producer = require('redis-smq').Producer;
const producer = new Producer('test_queue', config);

producer.produce({ hello: 'world' }, (err) => {
if (err) throw err;
else {
console.log('Successfully published!');
process.exit(0);
}
if (err) throw err;
console.log('Successfully published!');
producer.shutdown();
});

producer.produceWithTTL({ hello: 'world' }, 60000, (err) => {
if (err) throw err;
else {
console.log('Successfully published!');
process.exit(0);
}
console.log('Successfully published!');
producer.shutdown();
});
```

Expand Down
2 changes: 1 addition & 1 deletion example/test-queue-producer-launch.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ produceNTimes(producer, { hello: 'world' }, 1000000, (err) => {
if (err) throw err;
else {
console.log('Produced successfully!');
process.exit(0);
producer.shutdown();
}
});

9 changes: 9 additions & 0 deletions src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ class Producer extends EventEmitter {
this[produceMessage](message, ttl, cb);
}

/**
*
* @returns {boolean}
*/
shutdown() {
if (this.stats) this.stats.stop();
this.client.end(true);
this.client = null;
}
}

module.exports = Producer;
8 changes: 7 additions & 1 deletion src/stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function stats(eventEmitter, config) {
clearInterval(interval);
client.end(true);
client = null;
halt = false;
eventEmitter.emit('stats_halt');
}

Expand Down Expand Up @@ -113,19 +114,24 @@ function stats(eventEmitter, config) {

/**
*
* @returns {boolean}
*/
start() {
halt = false;
if (halt) return false;
client = redisClient.getNewInstance(config);
if (eventEmitter.hasOwnProperty('consumerId')) runConsumerStats();
else runProducerStats();
return true;
},

/**
*
* @returns {boolean}
*/
stop() {
if (halt) return false;
halt = true;
return true;
},
};
}
Expand Down

0 comments on commit 9c2c158

Please sign in to comment.