diff --git a/__tests__/01_producer.js b/__tests__/01_producer.js index 2143277..6c0794e 100644 --- a/__tests__/01_producer.js +++ b/__tests__/01_producer.js @@ -1,5 +1,6 @@ const IORedis = require('ioredis'); const { Producer } = require('../lib/producer'); +const { defaultOptions: defaults } = require('../lib/defaults'); describe('Producer', () => { let redis; @@ -28,6 +29,34 @@ describe('Producer', () => { expect(id).toBeDefined(); }); + test('should bulk create task', async () => { + const tasks = Array.from({ length: 1000 }).map((_, i) => ({ data: i })); + + const qname = 'test-queue-bulk'; + const bulkProducer = new Producer(qname, {}); + const ids = await bulkProducer.bulkAddTasks(tasks, 200); + await bulkProducer.disconnect(); + + const waitingCount = await redis.xlen(`${defaults.NAMESPACE}:queue:${qname}`); + + expect(waitingCount).toBe(tasks.length); + expect(ids.length).toBe(tasks.length); + }); + + test('should deduplicate when bulk creating task', async () => { + const tasks = Array.from({ length: 1000 }).map((_, i) => ({ data: i, dedupKey: '__' })); + + const qname = 'test-queue-bulk-dedup'; + const bulkProducer = new Producer(qname, {}); + const ids = await bulkProducer.bulkAddTasks(tasks, 200); + await bulkProducer.disconnect(); + + const waitingCount = await redis.xlen(`${defaults.NAMESPACE}:queue:${qname}`); + + expect(waitingCount).toBe(1); + expect(ids.length).toBe(1); + }); + test('should discard task if dedupKey is present and duplicate', async () => { const id1 = await producer.addTask('test', 'test'); expect(id1).toBeDefined(); diff --git a/src/producer.ts b/src/producer.ts index b363e17..453a90c 100644 --- a/src/producer.ts +++ b/src/producer.ts @@ -51,7 +51,7 @@ export class Producer { this._isInitialized = true; } - async addTask(data = null, dedupKey: string | null = null) { + async addTask(data = null, dedupKey: string | null = null): Promise { if (!this._redis) { this._connect(); } @@ -63,12 +63,17 @@ export class Producer { return retval; } - async bulkAddTasks(tasks: Task[], chunkSize: number = 100): Promise { + _flatDeep(arr: any[]): any[] { + return arr.reduce((acc, val) => acc.concat(Array.isArray(val) ? this._flatDeep(val) : val), []); + } + + async bulkAddTasks(tasks: Task[], chunkSize: number = 100): Promise { if (!this._redis) { this._connect(); } const chunks = lodash.chunk(tasks, chunkSize); + let result = []; for (const c of chunks) { const pipeline = this._redis!.pipeline(); @@ -76,8 +81,13 @@ export class Producer { pipeline.enqueue(this.qname, this._DEDUPSET, JSON.stringify(t.data), t.dedupKey, 0); } - await pipeline.exec(); + const retval = await pipeline.exec(); + result.push(retval); } + + result = this._flatDeep(result).filter(i => !!i); + + return result; } _connect() {