Skip to content

Commit

Permalink
feat(job): add deduplication logic (#2796)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Oct 7, 2024
1 parent 6babb9e commit 0a4982d
Show file tree
Hide file tree
Showing 18 changed files with 384 additions and 121 deletions.
2 changes: 1 addition & 1 deletion docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
- [LIFO](guide/jobs/lifo.md)
- [Job Ids](guide/jobs/job-ids.md)
- [Job Data](guide/jobs/job-data.md)
- [Debouncing](guide/jobs/debouncing.md)
- [Deduplication](guide/jobs/deduplication.md)
- [Delayed](guide/jobs/delayed.md)
- [Repeatable](guide/jobs/repeatable.md)
- [Prioritized](guide/jobs/prioritized.md)
Expand Down
66 changes: 0 additions & 66 deletions docs/gitbook/guide/jobs/debouncing.md

This file was deleted.

66 changes: 66 additions & 0 deletions docs/gitbook/guide/jobs/deduplication.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Deduplication

Deduplication in BullMQ is a process where job execution is delayed and deduplicated based on specific identifiers. It ensures that within a specified period, or until a specific job is completed or failed, no new jobs with the same identifier will be added to the queue. Instead, these attempts will trigger a deduplicated event.

## Simple Mode

The Simple Mode takes a different approach by extending the deduplication duration until the job's completion or failure. This means as long as the job remains in an incomplete state (neither succeeded nor failed), any subsequent job with the same deduplication ID will be ignored.

```typescript
// Add a job that will be deduplicated as this record is not finished (completed or failed).
await myQueue.add(
'house',
{ color: 'white' },
{ deduplication: { id: 'customValue' } },
);
```

While this job is not moved to completed or failed state, next jobs added with same **deduplication id** will be ignored and a _deduplicated_ event will be triggered by our QueueEvent class.

This mode is particularly useful for jobs that have a long running time or those that must not be duplicated until they are resolved, such as processing a file upload or performing a critical update that should not be repeated if the initial attempt is still in progress.

## Throttle Mode

In the Throttle Mode, deduplication works by assigning a delay (Time to Live, TTL) to a job upon its creation. If a similar job (identified by a unique deduplication ID) is added during this delay period, it is ignored. This prevents the queue from being overwhelmed with multiple instances of the same task, thus optimizing the processing time and resource utilization.

```typescript
import { Queue } from 'bullmq';

const myQueue = new Queue('Paint');

// Add a job that will be deduplicated for 5 seconds.
await myQueue.add(
'house',
{ color: 'white' },
{ deduplication: { id: 'customValue', ttl: 5000 } },
);
```

In this example, after adding the house painting job with the deduplicated parameters (id and ttl), any subsequent job with the same deduplication ID customValue added within 5 seconds will be ignored. This is useful for scenarios where rapid, repetitive requests are made, such as multiple users or processes attempting to trigger the same job.

Note that you must provide a deduplication id that should represent your job. You can hash your entire job data or a subset of attributes for creating this identifier.

{% hint style="warning" %}
Any manual deletion will disable the deduplication. For example, when calling _job.remove_ method.
{% endhint %}

## Get Deduplication Job Id

If you need to know which is the job id that started the deduplicated state. You can call **getDeduplicationJobId** method.

```typescript
const jobId = await myQueue.getDeduplicationJobId('customValue');
```

## Remove Deduplication Key

If you need to stop deduplication before ttl finishes or before finishing a job. You can call **removeDeduplicationKey** method.

```typescript
await myQueue.removeDeduplicationKey('customValue');
```

## Read more:

- 💡 [Add Job API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#add)
- 💡 [Remove Deduplication Key API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeDeduplicationKey)
12 changes: 11 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ import type { QueueEvents } from './queue-events';
const logger = debuglog('bull');

const optsDecodeMap = {
de: 'debounce',
de: 'deduplication',
fpof: 'failParentOnFailure',
idof: 'ignoreDependencyOnFailure',
kl: 'keepLogs',
rdof: 'removeDependencyOnFailure',
};

const optsEncodeMap = invertObject(optsDecodeMap);
optsEncodeMap.debounce = 'de';

export const PRIORITY_LIMIT = 2 ** 21;

Expand Down Expand Up @@ -140,9 +141,15 @@ export class Job<

/**
* Debounce identifier.
* @deprecated use deduplicationId
*/
debounceId?: string;

/**
* Deduplication identifier.
*/
deduplicationId?: string;

/**
* Base repeat job key.
*/
Expand Down Expand Up @@ -207,6 +214,7 @@ export class Job<
: undefined;

this.debounceId = opts.debounce ? opts.debounce.id : undefined;
this.deduplicationId = opts.deduplication ? opts.deduplication.id : this.debounceId;

this.toKey = queue.toKey.bind(queue);
this.setScripts();
Expand Down Expand Up @@ -333,6 +341,7 @@ export class Job<

if (json.deid) {
job.debounceId = json.deid;
job.deduplicationId = json.deid;
}

job.failedReason = json.failedReason;
Expand Down Expand Up @@ -459,6 +468,7 @@ export class Job<
failedReason: JSON.stringify(this.failedReason),
stacktrace: JSON.stringify(this.stacktrace),
debounceId: this.debounceId,
deduplicationId: this.deduplicationId,
repeatJobKey: this.repeatJobKey,
returnvalue: JSON.stringify(this.returnvalue),
};
Expand Down
8 changes: 8 additions & 0 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,19 @@ export interface QueueEventsListener extends IoredisListener {

/**
* Listen to 'debounced' event.
* @deprecated use deduplicated event
*
* This event is triggered when a job is debounced because debounceId still existed.
*/
debounced: (args: { jobId: string; debounceId: string }, id: string) => void;

/**
* Listen to 'deduplicated' event.
*
* This event is triggered when a job is deduplicated because deduplicatedId still existed.
*/
deduplicated: (args: { jobId: string; deduplicationId: string }, id: string) => void;

/**
* Listen to 'delayed' event.
*
Expand Down
12 changes: 12 additions & 0 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export class QueueGetters<

/**
* Get jobId that starts debounced state.
* @deprecated use getDeduplicationJobId method
*
* @param id - debounce identifier
*/
Expand All @@ -124,6 +125,17 @@ export class QueueGetters<
return client.get(`${this.keys.de}:${id}`);
}

/**
* Get jobId from deduplicated state.
*
* @param id - deduplication identifier
*/
async getDeduplicationJobId(id: string): Promise<string | null> {
const client = await this.client;

return client.get(`${this.keys.de}:${id}`);
}

/**
* Job counts by type
*
Expand Down
2 changes: 1 addition & 1 deletion src/classes/queue-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class QueueKeys {
'events',
'pc', // priority counter key
'marker', // marker key
'de', // debounce key
'de', // deduplication key
].forEach(key => {
keys[key] = this.toKey(name, key);
});
Expand Down
12 changes: 12 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ export class Queue<

/**
* Removes a debounce key.
* @deprecated use removeDeduplicationKey
*
* @param id - identifier
*/
Expand All @@ -479,6 +480,17 @@ export class Queue<
return client.del(`${this.keys.de}:${id}`);
}

/**
* Removes a deduplication key.
*
* @param id - identifier
*/
async removeDeduplicationKey(id: string): Promise<number> {
const client = await this.client;

return client.del(`${this.keys.de}:${id}`);
}

/**
* Removes a repeatable job by its key. Note that the key is the one used
* to store the repeatable job metadata and not one of the job iterations
Expand Down
2 changes: 1 addition & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ export class Scripts {
parentOpts.parentDependenciesKey || null,
parent,
job.repeatJobKey,
job.debounceId ? `${queueKeys.de}:${job.debounceId}` : null,
job.deduplicationId ? `${queueKeys.de}:${job.deduplicationId}` : null,
];

let encodedOpts;
Expand Down
14 changes: 7 additions & 7 deletions src/commands/addDelayedJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
[7] parent dependencies key.
[8] parent? {id, queueKey}
[9] repeat job key
[10] debounce key
[10] deduplication key
ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand All @@ -52,12 +52,12 @@ local data = ARGV[2]
local parentKey = args[5]
local parent = args[8]
local repeatJobKey = args[9]
local debounceKey = args[10]
local deduplicationKey = args[10]
local parentData

-- Includes
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/debounceJob"
--- @include "includes/deduplicateJob"
--- @include "includes/getDelayedScore"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
Expand Down Expand Up @@ -90,10 +90,10 @@ else
end
end

local debouncedJobId = debounceJob(args[1], opts['de'],
jobId, debounceKey, eventsKey, maxEvents)
if debouncedJobId then
return debouncedJobId
local deduplicationJobId = deduplicateJob(args[1], opts['de'],
jobId, deduplicationKey, eventsKey, maxEvents)
if deduplicationJobId then
return deduplicationJobId
end

-- Store the job.
Expand Down
14 changes: 7 additions & 7 deletions src/commands/addParentJob-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
[7] parent dependencies key.
[8] parent? {id, queueKey}
[9] repeat job key
[10] debounce key
[10] deduplication key
ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand All @@ -47,11 +47,11 @@ local opts = cmsgpack.unpack(ARGV[3])
local parentKey = args[5]
local parent = args[8]
local repeatJobKey = args[9]
local debounceKey = args[10]
local deduplicationKey = args[10]
local parentData

-- Includes
--- @include "includes/debounceJob"
--- @include "includes/deduplicateJob"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
--- @include "includes/storeJob"
Expand Down Expand Up @@ -81,10 +81,10 @@ else
end
end

local debouncedJobId = debounceJob(args[1], opts['de'],
jobId, debounceKey, eventsKey, maxEvents)
if debouncedJobId then
return debouncedJobId
local deduplicationJobId = deduplicateJob(args[1], opts['de'],
jobId, deduplicationKey, eventsKey, maxEvents)
if deduplicationJobId then
return deduplicationJobId
end

-- Store the job.
Expand Down
Loading

0 comments on commit 0a4982d

Please sign in to comment.