Skip to content

Commit

Permalink
Merge c903f15 into e126c25
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed May 28, 2020
2 parents e126c25 + c903f15 commit b258cf5
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 50 deletions.
10 changes: 6 additions & 4 deletions src/classes/queue-scheduler.ts
Expand Up @@ -48,10 +48,11 @@ export class QueueScheduler extends QueueBase {

const key = this.keys.delay;
const opts = this.opts as QueueSchedulerOptions;
const delaySet = await this.updateDelaySet(Date.now());

const [nextTimestamp] = delaySet;
let streamLastId = delaySet[1] || '0-0';
const [nextTimestamp, streamId = '0-0'] = await this.updateDelaySet(
Date.now(),
);
let streamLastId = streamId;

if (nextTimestamp) {
this.nextTimestamp = nextTimestamp / 4096;
Expand All @@ -64,6 +65,7 @@ export class QueueScheduler extends QueueBase {
// Listen to the delay event stream from lastDelayStreamTimestamp
// Can we use XGROUPS to reduce redundancy?
const nextDelay = this.nextTimestamp - Date.now();

const blockTime = Math.round(
Math.min(opts.stalledInterval, Math.max(nextDelay, 0)),
);
Expand Down Expand Up @@ -104,7 +106,7 @@ export class QueueScheduler extends QueueBase {
if (delay <= 0) {
const [nextTimestamp, id] = await this.updateDelaySet(now);
if (nextTimestamp) {
this.nextTimestamp = nextTimestamp / 4096;
this.nextTimestamp = nextTimestamp;
streamLastId = id;
} else {
this.nextTimestamp = Number.MAX_VALUE;
Expand Down
3 changes: 2 additions & 1 deletion src/commands/updateDelaySet-7.lua
Expand Up @@ -65,7 +65,8 @@ end
local nextTimestamp = rcall("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")[2]
local id
if (nextTimestamp ~= nil) then
id = rcall("XADD", KEYS[7], "*", "nextTimestamp", nextTimestamp / 0x1000)
nextTimestamp = nextTimestamp / 0x1000
id = rcall("XADD", KEYS[7], "*", "nextTimestamp", nextTimestamp)
end

return {nextTimestamp, id}
91 changes: 46 additions & 45 deletions src/test/test_delay.ts
Expand Up @@ -37,7 +37,6 @@ describe('Delayed jobs', function() {
let publishHappened = false;

queueEvents.on('delayed', () => {
console.log('delayed!');
publishHappened = true;
});

Expand Down Expand Up @@ -71,6 +70,7 @@ describe('Delayed jobs', function() {
});

it('should process delayed jobs in correct order', async function() {
this.timeout(20000);
let order = 0;
const queueScheduler = new QueueScheduler(queueName);
await queueScheduler.waitUntilReady();
Expand Down Expand Up @@ -113,52 +113,53 @@ describe('Delayed jobs', function() {
await queueScheduler.close();
});

/*
it('should process delayed jobs in correct order even in case of restart', function(done) {
this.timeout(15000);
var QUEUE_NAME = 'delayed queue multiple' + uuid();
var order = 1;
queue = new Queue(QUEUE_NAME);
var fn = function(job, jobDone) {
expect(order).to.be.equal(job.data.order);
jobDone();
if (order === 4) {
queue.close().then(done, done);
}
order++;
};
Bluebird.join(
queue.add({ order: 2 }, { delay: 300 }),
queue.add({ order: 4 }, { delay: 500 }),
queue.add({ order: 1 }, { delay: 200 }),
queue.add({ order: 3 }, { delay: 400 }),
)
.then(function() {
//
// Start processing so that jobs get into the delay set.
//
queue.process(fn);
return Bluebird.delay(20);
})
.then(function() {
//We simulate a restart
// console.log('RESTART');
// return queue.close().then(function () {
// console.log('CLOSED');
// return Promise.delay(100).then(function () {
// queue = new Queue(QUEUE_NAME);
// queue.process(fn);
// });
// });
it('should process delayed jobs in correct order even in case of restart', async function() {
this.timeout(5000);

let worker: Worker;
const queueName = 'delayed queue multiple' + v4();
let order = 1;

let secondQueueScheduler: QueueScheduler;
const firstQueueScheduler = new QueueScheduler(queueName);
await firstQueueScheduler.waitUntilReady();

queue = new Queue(queueName);

const processing = new Promise((resolve, reject) => {
worker = new Worker(queueName, async (job: Job) => {
try {
expect(order).to.be.equal(job.data.order);

if (order === 1) {
await firstQueueScheduler.close();
secondQueueScheduler = new QueueScheduler(queueName);
await secondQueueScheduler.waitUntilReady();
}

if (order === 4) {
resolve();
}
order++;
} catch (err) {
reject(err);
}
});
});

await Promise.all([
queue.add('test', { order: 2 }, { delay: 500 }),
queue.add('test', { order: 4 }, { delay: 1500 }),
queue.add('test', { order: 1 }, { delay: 200 }),
queue.add('test', { order: 3 }, { delay: 800 }),
]);

await processing;

await queue.close();
worker && (await worker.close());
secondQueueScheduler && (await secondQueueScheduler.close());
});
*/

it('should process delayed jobs with exact same timestamps in correct order (FIFO)', async function() {
let order = 1;
Expand Down

0 comments on commit b258cf5

Please sign in to comment.