Skip to content

Commit

Permalink
add consumer error event (#22)
Browse files Browse the repository at this point in the history
* add consumer error event

* fix stalled test
  • Loading branch information
yehiyam committed Mar 10, 2022
1 parent 5d6eb4a commit ba8a845
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
3 changes: 3 additions & 0 deletions lib/consumer/consumer.js
Expand Up @@ -31,6 +31,9 @@ class Consumer extends EventEmitter {
let queue = this._queues.get(options.job.type);
if (!queue) {
queue = new Queue(options.job.type, { ...this._setting, prefix });
queue.on('failed', (job, err) => {
this.emit(Events.FAILED, job, err);
});
queue.process('*', options.job.concurrency, (job, done) => {
if (job.name === BullJob.DEFAULT_JOB_NAME) {
// workaround to detect registering __default__ handler
Expand Down
30 changes: 30 additions & 0 deletions tests/test.js
Expand Up @@ -459,6 +459,36 @@ describe('Test', function () {
await producer.createJob(options);
});
});
describe('Stall limit', function () {
it('should fire FAILED event after stall count has been reached', function (done) {
this.timeout(10000);
const options = {
job: {
type: 'test-job-job-event-failed-stalled',
data: { action: 'bla' }
},
setting: {
redis: redisConfig,
prefix: 'failed-stalled',
settings: {
stalledInterval: 500,
lockRenewTime: 2500,
lockDuration: 500,
maxStalledCount: 0
}
}
}
const producer = new Producer(options);
let consumer = new Consumer(options);
consumer.on('job-failed', (job) => {
expect(job.failedReason).to.eql('job stalled more than allowable limit')
done()
});

consumer.register(options);
producer.createJob(options);
});
});
});
describe('Stress', function () {
describe('CreateJob', function () {
Expand Down

0 comments on commit ba8a845

Please sign in to comment.