From 3e97fa936d8fd307d9c685a34bfc44f7bf715383 Mon Sep 17 00:00:00 2001 From: Mehdi Hasan Date: Wed, 16 Oct 2019 15:08:39 +0600 Subject: [PATCH] Add tests for result/failed/dead list trimming --- __tests__/01_producer.js | 10 - __tests__/02_consumer_cleanup.js | 15 -- __tests__/03_consumer_unit.js | 15 -- __tests__/05_list_trimming_and_order.js | 316 ++++++++++++++++++++++++ src/consumer-unit.ts | 28 ++- src/producer.ts | 3 +- 6 files changed, 334 insertions(+), 53 deletions(-) create mode 100644 __tests__/05_list_trimming_and_order.js diff --git a/__tests__/01_producer.js b/__tests__/01_producer.js index 6c0794e..4768981 100644 --- a/__tests__/01_producer.js +++ b/__tests__/01_producer.js @@ -14,16 +14,6 @@ describe('Producer', () => { producer = new Producer('test-queue', {}); }); - afterAll(async () => { - // try { - // await producer.disconnect(); - // await redis.flushall(); - // await redis.disconnect(); - // } catch (e) { - // console.error(e); - // } - }); - test('should create task', async () => { const id = await producer.addTask('test'); expect(id).toBeDefined(); diff --git a/__tests__/02_consumer_cleanup.js b/__tests__/02_consumer_cleanup.js index 559c822..7eb2e64 100644 --- a/__tests__/02_consumer_cleanup.js +++ b/__tests__/02_consumer_cleanup.js @@ -17,21 +17,6 @@ describe('Consumer Unit - Cleanup', () => { await redis.flushall(); }); - afterAll(async () => { - // try { - // for (const producer of producers) { - // await producer.disconnect(); - // } - // for (const consumer of consumers) { - // await consumer._disconnect(); - // } - // await redis.flushall(); - // await redis.disconnect(); - // } catch (e) { - // console.error(e); - // } - }); - test('claim ownership of unfinished tasks from inactive consumers', async () => { const qname = `queue-test-${shortid.generate()}`; diff --git a/__tests__/03_consumer_unit.js b/__tests__/03_consumer_unit.js index 07c99dd..a5a1bf3 100644 --- a/__tests__/03_consumer_unit.js +++ b/__tests__/03_consumer_unit.js @@ -18,21 +18,6 @@ describe('Consumer Unit', () => { await redis.flushall(); }); - afterAll(async () => { - // try { - // for (const producer of producers) { - // await producer.disconnect(); - // } - // for (const consumer of consumers) { - // await consumer._disconnect(); - // } - // await redis.flushall(); - // await redis.disconnect(); - // } catch (e) { - // console.error(e); - // } - }); - test('should process task', async done => { const taskData = `test-${shortid.generate()}`; diff --git a/__tests__/05_list_trimming_and_order.js b/__tests__/05_list_trimming_and_order.js new file mode 100644 index 0000000..060348d --- /dev/null +++ b/__tests__/05_list_trimming_and_order.js @@ -0,0 +1,316 @@ +const IORedis = require('ioredis'); + +// const { defaultOptions: defaults } = require('../lib/defaults'); +const { delay } = require('../lib/common'); + +describe('Result/Failed/Dead lists', () => { + let redis; + let maxIndividualQueueResultSize; + let maxGlobalListSize; + let defaults; + let Producer; + let Consumer; + + beforeEach(async () => { + // TODO: Since we are calling flushall, + // ensure redis in test env can be used with a separate config + await redis.flushall(); + }); + + beforeAll(async () => { + redis = new IORedis(); + + jest.mock('../lib/defaults', () => { + const original = jest.requireActual('../lib/defaults'); + original.defaultOptions.queueOptions.maxIndividualQueueResultSize = 20; + original.defaultOptions.queueOptions.maxGlobalListSize = 30; + + return original; + }); + + /* eslint-disable */ + defaults = require('../lib/defaults').defaultOptions; + Producer = require('../lib/index').Producer; + Consumer = require('../lib/index').Consumer; + + maxIndividualQueueResultSize = defaults.queueOptions.maxIndividualQueueResultSize; + maxGlobalListSize = defaults.queueOptions.maxGlobalListSize; + /* eslint-enable */ + }); + + afterAll(async () => { + jest.resetModules(); + }); + + test('should insert to result list in order', async () => { + const tasks = Array.from({ length: 10 }).map((_, i) => ({ data: i })); + const qname = 'result-order-test'; + + const producer = new Producer(qname); + await producer.bulkAddTasks(tasks); + const consumer = new Consumer(qname, data => data); + consumer.start(); + await delay(300); // TODO: Hack Remove this delay once queue supports events + + const globalResult = await redis.zrange(defaults.RESULTLIST, 0, -1); + const queueResult = await redis.zrange(`${defaults.RESULTLIST}:${qname}`, 0, -1); + + expect(globalResult.length).toBe(queueResult.length); + expect(globalResult.length).toBe(tasks.length); + + for (let i = 0; i < tasks.length; i++) { + const t = tasks[i]; + const g = JSON.parse(globalResult[i]); + const q = JSON.parse(queueResult[i]); + + expect(t.data).toBe(g.data); + expect(t.data).toBe(g.result); + + expect(t.data).toBe(q.data); + expect(t.data).toBe(q.result); + } + await producer.disconnect(); + }); + + test('should insert to failed list in order', async () => { + const tasks = Array.from({ length: 10 }).map((_, i) => ({ data: i })); + const qname = 'failed-order-test'; + + const producer = new Producer(qname); + await producer.bulkAddTasks(tasks); + const consumer = new Consumer(qname, () => { + throw new Error(); + }); + consumer.start(); + await delay(300); // TODO: Hack Remove this delay once queue supports events + + const globalFailed = await redis.zrange(defaults.FAILEDLIST, 0, -1); + const queueFailed = await redis.zrange(`${defaults.FAILEDLIST}:${qname}`, 0, -1); + + expect(globalFailed.length).toBe(queueFailed.length); + expect(globalFailed.length).toBe(tasks.length); + + for (let i = 0; i < tasks.length; i++) { + const t = tasks[i]; + const g = JSON.parse(globalFailed[i]); + const q = JSON.parse(queueFailed[i]); + + expect(t.data).toBe(g.data); + expect(t.data).toBe(q.data); + } + await producer.disconnect(); + }); + + test('should insert to dead list in order', async () => { + const tasks = Array.from({ length: 10 }).map((_, i) => ({ data: i })); + const qname = 'dead-order-test'; + + const producer = new Producer(qname); + await producer.bulkAddTasks(tasks); + const consumer = new Consumer(qname, () => { + throw new Error(); + }); + consumer.start(); + await delay(300); // TODO: Hack Remove this delay once queue supports events + + const globalDead = await redis.zrange(defaults.DEADLIST, 0, -1); + const queueDead = await redis.zrange(`${defaults.DEADLIST}:${qname}`, 0, -1); + + expect(globalDead.length).toBe(queueDead.length); + expect(globalDead.length).toBe(tasks.length); + + for (let i = 0; i < tasks.length; i++) { + const t = tasks[i]; + const g = JSON.parse(globalDead[i]); + const q = JSON.parse(queueDead[i]); + + expect(t.data).toBe(g.data); + expect(t.data).toBe(q.data); + } + await producer.disconnect(); + }); + + test('should trim from the beginning of the result list when limit exceeds', async () => { + const overflow = 10; + const tasks = Array.from({ length: maxGlobalListSize + overflow }).map((_, i) => ({ data: i })); + const qname = 'result-trim-test'; + + const producer = new Producer(qname); + await producer.bulkAddTasks(tasks.slice(0, maxIndividualQueueResultSize)); + const consumer = new Consumer(qname, data => data); + consumer.start(); + await delay(200); // TODO: Hack Remove this delay once queue supports events + + let glen = await redis.zcard(defaults.RESULTLIST); + let qlen = await redis.zcard(`${defaults.RESULTLIST}:${qname}`); + + expect(qlen).toBe(maxIndividualQueueResultSize); + expect(glen).toBe(maxIndividualQueueResultSize); + + const gfirst = JSON.parse(await redis.zrange(defaults.RESULTLIST, 0, 0)); + const qfirst = JSON.parse(await redis.zrange(`${defaults.RESULTLIST}:${qname}`, 0, 0)); + + expect(gfirst.data).toBe(qfirst.data); + + let glast = JSON.parse(await redis.zrange(defaults.RESULTLIST, -1, -1)); + let qlast = JSON.parse(await redis.zrange(`${defaults.RESULTLIST}:${qname}`, -1, -1)); + + expect(glast.data).toBe(qlast.data); + + // Add more of the tasks + await producer.bulkAddTasks(tasks.slice(maxIndividualQueueResultSize - 1, maxGlobalListSize)); + await delay(200); // TODO: Hack Remove this delay once queue supports events + + glen = await redis.zcard(defaults.RESULTLIST); + qlen = await redis.zcard(`${defaults.RESULTLIST}:${qname}`); + + expect(glen).toBe(maxGlobalListSize); + expect(qlen).toBe(maxIndividualQueueResultSize); + + glast = JSON.parse(await redis.zrange(defaults.RESULTLIST, -1, -1)); + qlast = JSON.parse(await redis.zrange(`${defaults.RESULTLIST}:${qname}`, -1, -1)); + + expect(glast.data).toBe(qlast.data); + + // Add rest of the tasks + await producer.bulkAddTasks(tasks.slice(maxGlobalListSize - 1)); + await delay(200); // TODO: Hack Remove this delay once queue supports events + + glen = await redis.zcard(defaults.RESULTLIST); + qlen = await redis.zcard(`${defaults.RESULTLIST}:${qname}`); + + expect(glen).toBe(maxGlobalListSize); + expect(qlen).toBe(maxIndividualQueueResultSize); + + glast = JSON.parse(await redis.zrange(defaults.RESULTLIST, -1, -1)); + qlast = JSON.parse(await redis.zrange(`${defaults.RESULTLIST}:${qname}`, -1, -1)); + + expect(glast.data).toBe(qlast.data); + + await producer.disconnect(); + }); + + test('should trim from the beginning of the failed list when limit exceeds', async () => { + const overflow = 10; + const tasks = Array.from({ length: maxGlobalListSize + overflow }).map((_, i) => ({ data: i })); + const qname = 'failed-trim-test'; + + const producer = new Producer(qname); + await producer.bulkAddTasks(tasks.slice(0, maxIndividualQueueResultSize)); + const consumer = new Consumer(qname, () => { + throw new Error(); + }); + consumer.start(); + await delay(200); // TODO: Hack Remove this delay once queue supports events + + let glen = await redis.zcard(defaults.FAILEDLIST); + let qlen = await redis.zcard(`${defaults.FAILEDLIST}:${qname}`); + + expect(qlen).toBe(maxIndividualQueueResultSize); + expect(glen).toBe(maxIndividualQueueResultSize); + + const gfirst = JSON.parse(await redis.zrange(defaults.FAILEDLIST, 0, 0)); + const qfirst = JSON.parse(await redis.zrange(`${defaults.FAILEDLIST}:${qname}`, 0, 0)); + + expect(gfirst.data).toBe(qfirst.data); + + let glast = JSON.parse(await redis.zrange(defaults.FAILEDLIST, -1, -1)); + let qlast = JSON.parse(await redis.zrange(`${defaults.FAILEDLIST}:${qname}`, -1, -1)); + + expect(glast.data).toBe(qlast.data); + + // Add more of the tasks + await producer.bulkAddTasks(tasks.slice(maxIndividualQueueResultSize - 1, maxGlobalListSize)); + await delay(200); // TODO: Hack Remove this delay once queue supports events + + glen = await redis.zcard(defaults.FAILEDLIST); + qlen = await redis.zcard(`${defaults.FAILEDLIST}:${qname}`); + + expect(glen).toBe(maxGlobalListSize); + expect(qlen).toBe(maxIndividualQueueResultSize); + + glast = JSON.parse(await redis.zrange(defaults.FAILEDLIST, -1, -1)); + qlast = JSON.parse(await redis.zrange(`${defaults.FAILEDLIST}:${qname}`, -1, -1)); + + expect(glast.data).toBe(qlast.data); + + // Add rest of the tasks + await producer.bulkAddTasks(tasks.slice(maxGlobalListSize - 1)); + await delay(200); // TODO: Hack Remove this delay once queue supports events + + glen = await redis.zcard(defaults.FAILEDLIST); + qlen = await redis.zcard(`${defaults.FAILEDLIST}:${qname}`); + + expect(glen).toBe(maxGlobalListSize); + expect(qlen).toBe(maxIndividualQueueResultSize); + + glast = JSON.parse(await redis.zrange(defaults.FAILEDLIST, -1, -1)); + qlast = JSON.parse(await redis.zrange(`${defaults.FAILEDLIST}:${qname}`, -1, -1)); + + expect(glast.data).toBe(qlast.data); + + await producer.disconnect(); + }); + + test('should trim from the beginning of the dead list when limit exceeds', async () => { + const overflow = 10; + const tasks = Array.from({ length: maxGlobalListSize + overflow }).map((_, i) => ({ data: i })); + const qname = 'dead-trim-test'; + + const producer = new Producer(qname); + await producer.bulkAddTasks(tasks.slice(0, maxIndividualQueueResultSize)); + const consumer = new Consumer(qname, () => { + throw new Error(); + }); + consumer.start(); + await delay(200); // TODO: Hack Remove this delay once queue supports events + + let glen = await redis.zcard(defaults.DEADLIST); + let qlen = await redis.zcard(`${defaults.DEADLIST}:${qname}`); + + expect(qlen).toBe(maxIndividualQueueResultSize); + expect(glen).toBe(maxIndividualQueueResultSize); + + const gfirst = JSON.parse(await redis.zrange(defaults.DEADLIST, 0, 0)); + const qfirst = JSON.parse(await redis.zrange(`${defaults.DEADLIST}:${qname}`, 0, 0)); + + expect(gfirst.data).toBe(qfirst.data); + + let glast = JSON.parse(await redis.zrange(defaults.DEADLIST, -1, -1)); + let qlast = JSON.parse(await redis.zrange(`${defaults.DEADLIST}:${qname}`, -1, -1)); + + expect(glast.data).toBe(qlast.data); + + // Add more of the tasks + await producer.bulkAddTasks(tasks.slice(maxIndividualQueueResultSize - 1, maxGlobalListSize)); + await delay(200); // TODO: Hack Remove this delay once queue supports events + + glen = await redis.zcard(defaults.DEADLIST); + qlen = await redis.zcard(`${defaults.DEADLIST}:${qname}`); + + expect(glen).toBe(maxGlobalListSize); + expect(qlen).toBe(maxIndividualQueueResultSize); + + glast = JSON.parse(await redis.zrange(defaults.DEADLIST, -1, -1)); + qlast = JSON.parse(await redis.zrange(`${defaults.DEADLIST}:${qname}`, -1, -1)); + + expect(glast.data).toBe(qlast.data); + + // Add rest of the tasks + await producer.bulkAddTasks(tasks.slice(maxGlobalListSize - 1)); + await delay(200); // TODO: Hack Remove this delay once queue supports events + + glen = await redis.zcard(defaults.DEADLIST); + qlen = await redis.zcard(`${defaults.DEADLIST}:${qname}`); + + expect(glen).toBe(maxGlobalListSize); + expect(qlen).toBe(maxIndividualQueueResultSize); + + glast = JSON.parse(await redis.zrange(defaults.DEADLIST, -1, -1)); + qlast = JSON.parse(await redis.zrange(`${defaults.DEADLIST}:${qname}`, -1, -1)); + + expect(glast.data).toBe(qlast.data); + + await producer.disconnect(); + }); +}); diff --git a/src/consumer-unit.ts b/src/consumer-unit.ts index aaf8d97..d5b1955 100644 --- a/src/consumer-unit.ts +++ b/src/consumer-unit.ts @@ -359,19 +359,20 @@ export class ConsumerUnit { }); // Add to success list + const timestamp = new Date().getTime().toString(); await this._redis .pipeline() .dequeue(this._QNAME, this._DEDUPSET, this._GRPNAME, task.id, task.dedupKey) // Remove from queue - .zadd(defaultOptions.RESULTLIST, new Date().getTime().toString(), resultVal) - .zremrangebyrank(defaultOptions.RESULTLIST, 0, defaultOptions.queueOptions.maxGlobalListSize! * -1) + .zadd(defaultOptions.RESULTLIST, timestamp, resultVal) + .zremrangebyrank(defaultOptions.RESULTLIST, 0, (defaultOptions.queueOptions.maxGlobalListSize! + 1) * -1) - .zadd(`${defaultOptions.RESULTLIST}:${this.qname}`, new Date().getTime().toString(), resultVal) + .zadd(`${defaultOptions.RESULTLIST}:${this.qname}`, timestamp, resultVal) .zremrangebyrank( `${defaultOptions.RESULTLIST}:${this.qname}`, 0, - defaultOptions.queueOptions.maxIndividualQueueResultSize! * -1 + (defaultOptions.queueOptions.maxIndividualQueueResultSize! + 1) * -1 ) .exec(); @@ -406,19 +407,21 @@ export class ConsumerUnit { .exec(); } else { // Move to deadlist + const timestamp = new Date().getTime().toString(); + await this._redis .pipeline() .dequeue(this._QNAME, this._DEDUPSET, this._GRPNAME, task.id, task.dedupKey) // Remove from queue - .zadd(defaultOptions.DEADLIST, new Date().getTime().toString(), info) - .zremrangebyrank(defaultOptions.DEADLIST, 0, defaultOptions.queueOptions.maxGlobalListSize! * -1) + .zadd(defaultOptions.DEADLIST, timestamp, info) + .zremrangebyrank(defaultOptions.DEADLIST, 0, (defaultOptions.queueOptions.maxGlobalListSize! + 1) * -1) - .zadd(`${defaultOptions.DEADLIST}:${this.qname}`, new Date().getTime().toString(), info) + .zadd(`${defaultOptions.DEADLIST}:${this.qname}`, timestamp, info) .zremrangebyrank( `${defaultOptions.DEADLIST}:${this.qname}`, 0, - defaultOptions.queueOptions.maxIndividualQueueResultSize! * -1 + (defaultOptions.queueOptions.maxIndividualQueueResultSize! + 1) * -1 ) .hincrby(defaultOptions.STAT, 'dead', 1) @@ -428,17 +431,18 @@ export class ConsumerUnit { } // Add to failed list in all cases + const timestamp = new Date().getTime().toString(); await this._redis .pipeline() - .zadd(defaultOptions.FAILEDLIST, new Date().getTime().toString(), info) - .zremrangebyrank(defaultOptions.FAILEDLIST, 0, defaultOptions.queueOptions.maxGlobalListSize! * -1) + .zadd(defaultOptions.FAILEDLIST, timestamp, info) + .zremrangebyrank(defaultOptions.FAILEDLIST, 0, (defaultOptions.queueOptions.maxGlobalListSize! + 1) * -1) - .zadd(`${defaultOptions.FAILEDLIST}:${this.qname}`, new Date().getTime().toString(), info) + .zadd(`${defaultOptions.FAILEDLIST}:${this.qname}`, timestamp, info) .zremrangebyrank( `${defaultOptions.FAILEDLIST}:${this.qname}`, 0, - defaultOptions.queueOptions.maxIndividualQueueResultSize! * -1 + (defaultOptions.queueOptions.maxIndividualQueueResultSize! + 1) * -1 ) .hincrby(defaultOptions.STAT, 'failed', 1) diff --git a/src/producer.ts b/src/producer.ts index af6fe02..948725a 100644 --- a/src/producer.ts +++ b/src/producer.ts @@ -78,7 +78,8 @@ export class Producer { const pipeline = this._redis!.pipeline(); for (const t of c) { - pipeline.enqueue(this.qname, this._DEDUPSET, JSON.stringify(t.data || null), t.dedupKey || null, 0); + const { data = null, dedupKey = null } = t; + pipeline.enqueue(this.qname, this._DEDUPSET, JSON.stringify(data), dedupKey, 0); } const retval = await pipeline.exec();