Skip to content

Commit

Permalink
Add example for adding tasks in bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
mugli committed Oct 11, 2019
1 parent 3bf9e42 commit aa2cf65
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 2 deletions.
Empty file.
11 changes: 11 additions & 0 deletions examples/bulk-add-task/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
const { Consumer } = require('orkid');

async function worker(data, metadata) {
console.log(`Processing task from Queue: ${metadata.qname}. Task ID: ${metadata.id}. Data:`, data);
const result = Math.random();
console.log(`Task ${metadata.id} done!`);
return result;
}

const consumer = new Consumer('bulk-add-example-queue', worker, { consumerOptions: { concurrencyPerInstance: 50 } });
consumer.start();
10 changes: 10 additions & 0 deletions examples/bulk-add-task/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"name": "orkid-example",
"version": "1.0.0",
"keywords": [],
"private": true,
"license": "MIT",
"dependencies": {
"orkid": "*"
}
}
21 changes: 21 additions & 0 deletions examples/bulk-add-task/producer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const { Producer } = require('orkid');

const producer = new Producer('bulk-add-example-queue');

async function addTasks() {
// Each task in the array is a object with a mandatory data property
// [{ data: 0}, { data: 1}, { data: 2}, ... { data: 999}]
//
// For de-duplication, you can optionally set dedupKey property too, like:
// [{ data: 0, dedupKey: "unique"}, { data: 1, dedupKey: "unique"}...]
const tasks = Array.from({ length: 1000 }).map((_, i) => ({ data: i }));

await producer.bulkAddTasks(tasks);
}

addTasks()
.then(() => {
console.log('Done');
process.exit();
})
.catch(e => console.error(e));
4 changes: 4 additions & 0 deletions examples/failure-timeout-retry/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ const { Consumer } = require('orkid');

const delay = time => new Promise(res => setTimeout(() => res(), time));

// Note: Setting workerFnTimeoutMs in consumerOptions is totally optional.
// The default behavior is to let workers run for as long as they need.

async function worker(data, metadata) {
console.log(`Processing task from Queue: ${metadata.qname}. Task ID: ${metadata.id}. Data:`, data);

Expand All @@ -28,4 +31,5 @@ async function worker(data, metadata) {
const consumer = new Consumer('retriable-queue', worker, {
consumerOptions: { concurrencyPerInstance: 1, maxRetry: 1, workerFnTimeoutMs: 3000 }
});

consumer.start();
4 changes: 2 additions & 2 deletions src/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export interface ProducerOptions {

export interface Task {
data: any;
dedupKey: string | null;
dedupKey?: string | null;
}

export class Producer {
Expand Down Expand Up @@ -78,7 +78,7 @@ export class Producer {
const pipeline = this._redis!.pipeline();

for (const t of c) {
pipeline.enqueue(this.qname, this._DEDUPSET, JSON.stringify(t.data), t.dedupKey, 0);
pipeline.enqueue(this.qname, this._DEDUPSET, JSON.stringify(t.data || null), t.dedupKey || null, 0);
}

const retval = await pipeline.exec();
Expand Down

0 comments on commit aa2cf65

Please sign in to comment.