Skip to content

Commit

Permalink
safe error counting, close #11
Browse files Browse the repository at this point in the history
  • Loading branch information
titarenko committed Feb 16, 2017
1 parent 4c9bbe7 commit a2ac760
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 11 deletions.
2 changes: 1 addition & 1 deletion modules/builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ function createRetryBuffer (channel) {
if (--data.ttl) {
channel.sendToQueue('worque-buffer', new Buffer(JSON.stringify(data)), { persistent: true });
} else {
channel.sendToQueue(data.name, new Buffer(JSON.stringify(_.pick(data, ['context', 'content']))), { persistent: true });
channel.sendToQueue(data.name, new Buffer(JSON.stringify(_.pick(data, ['context', 'content', 'failures']))), { persistent: true });
}
channel.ack(message);
});
Expand Down
15 changes: 6 additions & 9 deletions modules/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Queue.prototype.bind = function (source, pattern, options) {
};

Queue.prototype.publish = function (content, context, options) {
var data = { context: context, content: content };
var data = { context: context, content: content, failures: 0 };
var rawContent = new Buffer(JSON.stringify(data));
return this._exec(function (channel) {
return channel.sendToQueue(this._name, rawContent, options || { persistent: true });
Expand All @@ -64,11 +64,11 @@ Queue.prototype.subscribe = function (handler, options) {
try {
data = JSON.parse(rawContent.toString());
} catch (e) {
self.emit('failure', { error: error });
self.emit('failure', { error: error, name: self._name });
channel.ack(message);
}
self.emit('task', _.extend(data, { name: self._name }));
return Promise.resolve().then(function () {
return Promise.try(function () {
return handler.call(data.context, data.content);
}).tap(function (result) {
self.emit('result', _.extend({ result: result }, data));
Expand Down Expand Up @@ -112,12 +112,9 @@ Queue.prototype.retry = function (getTtl) {
}

return this._exec(function (channel) {
var errorCounter = 0, bufferQueue = this._bufferQueue;
this.on('result', function (ev) {
errorCounter = 0;
});
var bufferQueue = this._bufferQueue;
this.on('failure', function (ev) {
var ttl = getTtl(errorCounter++);
var ttl = getTtl(ev.failures++);
if (!ttl) {
return;
}
Expand All @@ -133,4 +130,4 @@ Queue.prototype.retry = function (getTtl) {
};
};
}
};
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "worque",
"version": "0.10.6",
"version": "0.11.0",
"description": "AMQP-based work queue",
"main": "index.js",
"scripts": {
Expand Down

0 comments on commit a2ac760

Please sign in to comment.