From b68c845c77de6b2973ec31d2f22958ab60ad87aa Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Tue, 3 Mar 2020 22:01:46 +0100 Subject: [PATCH 1/2] fix(worker): return this.closing when calling close --- src/classes/worker.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index a97729c412..3f836de327 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -389,5 +389,6 @@ export class Worker extends QueueBase { resolve(); }); } + return this.closing; } } From c903f1552ba9c89f08fdc3f6bd8232218c0d60f6 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Fri, 29 May 2020 00:01:18 +0200 Subject: [PATCH 2/2] fix(scheduler): divide timestamp by 4096 in update set fixes #168 --- src/classes/queue-scheduler.ts | 10 ++-- src/commands/updateDelaySet-7.lua | 3 +- src/test/test_delay.ts | 91 ++++++++++++++++--------------- 3 files changed, 54 insertions(+), 50 deletions(-) diff --git a/src/classes/queue-scheduler.ts b/src/classes/queue-scheduler.ts index b94441f33f..7459fa7ec7 100644 --- a/src/classes/queue-scheduler.ts +++ b/src/classes/queue-scheduler.ts @@ -48,10 +48,11 @@ export class QueueScheduler extends QueueBase { const key = this.keys.delay; const opts = this.opts as QueueSchedulerOptions; - const delaySet = await this.updateDelaySet(Date.now()); - const [nextTimestamp] = delaySet; - let streamLastId = delaySet[1] || '0-0'; + const [nextTimestamp, streamId = '0-0'] = await this.updateDelaySet( + Date.now(), + ); + let streamLastId = streamId; if (nextTimestamp) { this.nextTimestamp = nextTimestamp; @@ -64,6 +65,7 @@ export class QueueScheduler extends QueueBase { // Listen to the delay event stream from lastDelayStreamTimestamp // Can we use XGROUPS to reduce redundancy? const nextDelay = this.nextTimestamp - Date.now(); + const blockTime = Math.round( Math.min(opts.stalledInterval, Math.max(nextDelay, 0)), ); @@ -104,7 +106,7 @@ export class QueueScheduler extends QueueBase { if (delay <= 0) { const [nextTimestamp, id] = await this.updateDelaySet(now); if (nextTimestamp) { - this.nextTimestamp = nextTimestamp / 4096; + this.nextTimestamp = nextTimestamp; streamLastId = id; } else { this.nextTimestamp = Number.MAX_VALUE; diff --git a/src/commands/updateDelaySet-7.lua b/src/commands/updateDelaySet-7.lua index c849963a75..df2a00e20d 100644 --- a/src/commands/updateDelaySet-7.lua +++ b/src/commands/updateDelaySet-7.lua @@ -65,7 +65,8 @@ end local nextTimestamp = rcall("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")[2] local id if (nextTimestamp ~= nil) then - id = rcall("XADD", KEYS[7], "*", "nextTimestamp", nextTimestamp / 0x1000) + nextTimestamp = nextTimestamp / 0x1000 + id = rcall("XADD", KEYS[7], "*", "nextTimestamp", nextTimestamp) end return {nextTimestamp, id} diff --git a/src/test/test_delay.ts b/src/test/test_delay.ts index 3ef41746ad..1dbf7b2b45 100644 --- a/src/test/test_delay.ts +++ b/src/test/test_delay.ts @@ -37,7 +37,6 @@ describe('Delayed jobs', function() { let publishHappened = false; queueEvents.on('delayed', () => { - console.log('delayed!'); publishHappened = true; }); @@ -71,6 +70,7 @@ describe('Delayed jobs', function() { }); it('should process delayed jobs in correct order', async function() { + this.timeout(20000); let order = 0; const queueScheduler = new QueueScheduler(queueName); await queueScheduler.waitUntilReady(); @@ -113,52 +113,53 @@ describe('Delayed jobs', function() { await queueScheduler.close(); }); - /* - it('should process delayed jobs in correct order even in case of restart', function(done) { - this.timeout(15000); - - var QUEUE_NAME = 'delayed queue multiple' + uuid(); - var order = 1; - - queue = new Queue(QUEUE_NAME); - - var fn = function(job, jobDone) { - expect(order).to.be.equal(job.data.order); - jobDone(); - - if (order === 4) { - queue.close().then(done, done); - } - - order++; - }; - - Bluebird.join( - queue.add({ order: 2 }, { delay: 300 }), - queue.add({ order: 4 }, { delay: 500 }), - queue.add({ order: 1 }, { delay: 200 }), - queue.add({ order: 3 }, { delay: 400 }), - ) - .then(function() { - // - // Start processing so that jobs get into the delay set. - // - queue.process(fn); - return Bluebird.delay(20); - }) - .then(function() { - //We simulate a restart - // console.log('RESTART'); - // return queue.close().then(function () { - // console.log('CLOSED'); - // return Promise.delay(100).then(function () { - // queue = new Queue(QUEUE_NAME); - // queue.process(fn); - // }); - // }); + it('should process delayed jobs in correct order even in case of restart', async function() { + this.timeout(5000); + + let worker: Worker; + const queueName = 'delayed queue multiple' + v4(); + let order = 1; + + let secondQueueScheduler: QueueScheduler; + const firstQueueScheduler = new QueueScheduler(queueName); + await firstQueueScheduler.waitUntilReady(); + + queue = new Queue(queueName); + + const processing = new Promise((resolve, reject) => { + worker = new Worker(queueName, async (job: Job) => { + try { + expect(order).to.be.equal(job.data.order); + + if (order === 1) { + await firstQueueScheduler.close(); + secondQueueScheduler = new QueueScheduler(queueName); + await secondQueueScheduler.waitUntilReady(); + } + + if (order === 4) { + resolve(); + } + order++; + } catch (err) { + reject(err); + } }); + }); + + await Promise.all([ + queue.add('test', { order: 2 }, { delay: 500 }), + queue.add('test', { order: 4 }, { delay: 1500 }), + queue.add('test', { order: 1 }, { delay: 200 }), + queue.add('test', { order: 3 }, { delay: 800 }), + ]); + + await processing; + + await queue.close(); + worker && (await worker.close()); + secondQueueScheduler && (await secondQueueScheduler.close()); }); -*/ it('should process delayed jobs with exact same timestamps in correct order (FIFO)', async function() { let order = 1;