Skip to content

Commit

Permalink
Add tests for bulk creating tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
mugli committed Oct 10, 2019
1 parent f76e1a7 commit dd54f41
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
29 changes: 29 additions & 0 deletions __tests__/01_producer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const IORedis = require('ioredis');
const { Producer } = require('../lib/producer');
const { defaultOptions: defaults } = require('../lib/defaults');

describe('Producer', () => {
let redis;
Expand Down Expand Up @@ -28,6 +29,34 @@ describe('Producer', () => {
expect(id).toBeDefined();
});

test('should bulk create task', async () => {
const tasks = Array.from({ length: 1000 }).map((_, i) => ({ data: i }));

const qname = 'test-queue-bulk';
const bulkProducer = new Producer(qname, {});
const ids = await bulkProducer.bulkAddTasks(tasks, 200);
await bulkProducer.disconnect();

const waitingCount = await redis.xlen(`${defaults.NAMESPACE}:queue:${qname}`);

expect(waitingCount).toBe(tasks.length);
expect(ids.length).toBe(tasks.length);
});

test('should deduplicate when bulk creating task', async () => {
const tasks = Array.from({ length: 1000 }).map((_, i) => ({ data: i, dedupKey: '__' }));

const qname = 'test-queue-bulk-dedup';
const bulkProducer = new Producer(qname, {});
const ids = await bulkProducer.bulkAddTasks(tasks, 200);
await bulkProducer.disconnect();

const waitingCount = await redis.xlen(`${defaults.NAMESPACE}:queue:${qname}`);

expect(waitingCount).toBe(1);
expect(ids.length).toBe(1);
});

test('should discard task if dedupKey is present and duplicate', async () => {
const id1 = await producer.addTask('test', 'test');
expect(id1).toBeDefined();
Expand Down
16 changes: 13 additions & 3 deletions src/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class Producer {
this._isInitialized = true;
}

async addTask(data = null, dedupKey: string | null = null) {
async addTask(data = null, dedupKey: string | null = null): Promise<string | null> {
if (!this._redis) {
this._connect();
}
Expand All @@ -63,21 +63,31 @@ export class Producer {
return retval;
}

async bulkAddTasks(tasks: Task[], chunkSize: number = 100): Promise<void> {
_flatDeep(arr: any[]): any[] {
return arr.reduce((acc, val) => acc.concat(Array.isArray(val) ? this._flatDeep(val) : val), []);
}

async bulkAddTasks(tasks: Task[], chunkSize: number = 100): Promise<string[]> {
if (!this._redis) {
this._connect();
}

const chunks = lodash.chunk(tasks, chunkSize);
let result = [];
for (const c of chunks) {
const pipeline = this._redis!.pipeline();

for (const t of c) {
pipeline.enqueue(this.qname, this._DEDUPSET, JSON.stringify(t.data), t.dedupKey, 0);
}

await pipeline.exec();
const retval = await pipeline.exec();
result.push(retval);
}

result = this._flatDeep(result).filter(i => !!i);

return result;
}

_connect() {
Expand Down

0 comments on commit dd54f41

Please sign in to comment.