Skip to content

Commit

Permalink
Merge branch 'master' into feat/simplified-lock-extension
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Feb 28, 2023
2 parents feac7b4 + bd233a9 commit 4284eb9
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 8 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions docs/gitbook/guide/jobs/prioritized.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Jobs can also include a priority option. Using priorities, job's processing orde
Adding prioritized jobs is a slower operation than the other types of jobs, with a complexity O(n) relative to the number of jobs waiting in the Queue.
{% endhint %}

Priorities goes from 1 to MAX\_INT, whereas lower number is always higher priority than higher numbers.
Note that the priorities go from 1 to MAX\_INT, whereas a lower number is always a higher priority than higher numbers.

Jobs without a priority assigned will get the least priority.

Expand All @@ -23,4 +23,4 @@ await myQueue.add('wall', { color: 'blue' }, { priority: 7 });
// finally pink.
```

If several jobs are added with the same priority value, then the jobs within that priority will be processed in LIFO (Last in first out) fashion.
If several jobs are added with the same priority value, then the jobs within that priority will be processed in FIFO (First in first out) fashion.
25 changes: 25 additions & 0 deletions docs/gitbook/guide/queues/auto-removal-of-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ await myQueue.add(
);
```

Or if you want to set it for all your jobs for an specific worker:

```typescript
new Worker('test', async job => {}, {
connection,
removeOnComplete: { count: 1000 },
removeOnFail: { count: 5000 },
});
```

### Keep jobs based on their age

Another possibility is to keep jobs up to a certain age. The "removeOn" option accepts a "[KeepJobs](https://api.docs.bullmq.io/interfaces/KeepJobs.html)" object, that includes an "age" and a "count" fields. The age is used to specify how old jobs to keep (in seconds), and the count can be used to limit the total amount to keep. The count option is useful in cases we get an unexpected amount of jobs in a very short time, in this case we may just want to limit to a certain amount to avoid running out of memory.
Expand All @@ -52,6 +62,21 @@ await myQueue.add(
The auto removal of jobs works lazily. This means that jobs are not removed unless a new job completes or fails, since that is when the auto-removal takes place.
{% endhint %}

Or if you want to set it for all your jobs for an specific worker:

```typescript
new Worker('test', async job => {}, {
connection,
removeOnComplete: {
age: 3600, // keep up to 1 hour
count: 1000, // keep up to 1000 jobs
},
removeOnFail: {
age: 24 * 3600, // keep up to 24 hours
},
});
```

### What about idempotence?

One of the strategies to implement idempotence with BullMQ is to use unique job ids. When you add a job with an id that exists already in the queue, the new job is ignored and a **duplicated** event is triggered. It is important to keep this in mind when activating auto removal of jobs, since a job that has been removed will not be considered part of the queue anymore, and will not affect any future jobs that could have the same Id.
Expand Down
2 changes: 1 addition & 1 deletion src/classes/backoffs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { BackoffOptions, MinimalJob } from '../interfaces';
import { BackoffStrategy } from '../types';

interface BuiltInStrategies {
export interface BuiltInStrategies {
[index: string]: (delay: number) => BackoffStrategy;
}

Expand Down
2 changes: 1 addition & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ export class Scripts {

const keys = this.moveToFinishedKeys;
keys[8] = queueKeys[target];
keys[9] = this.queue.toKey(job.id);
keys[9] = this.queue.toKey(job.id ?? '');
keys[11] = metricsKey;

const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs);
Expand Down
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ export class Worker<
async getNextJob(
token: string,
{ block = true }: GetNextJobOptions = {},
): Promise<Job<DataType, ResultType, NameType>> {
): Promise<Job<DataType, ResultType, NameType> | undefined> {
if (this.paused) {
if (block) {
await this.paused;
Expand Down
4 changes: 2 additions & 2 deletions src/commands/script-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ export interface ScriptMetadata {
}

export class ScriptLoaderError extends Error {
public readonly path: string;
/**
* The include stack
*/
Expand Down Expand Up @@ -483,7 +482,7 @@ function splitFilename(filePath: string): {
} {
const longName = path.basename(filePath, '.lua');
const [name, num] = longName.split('-');
const numberOfKeys = num && parseInt(num, 10);
const numberOfKeys = num ? parseInt(num, 10) : undefined;
return { name, numberOfKeys };
}

Expand All @@ -506,6 +505,7 @@ function getPkgJsonDir(): string {
// eslint-disable-next-line no-empty
} catch (e) {}
}
return '';
}

// https://stackoverflow.com/a/66842927
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export const isRedisVersionLowerThan = (
currentVersion: string,
minimumVersion: string,
): boolean => {
const version = semver.valid(semver.coerce(currentVersion));
const version = semver.valid(semver.coerce(currentVersion)) as string;

return semver.lt(version, minimumVersion);
};
Expand Down

0 comments on commit 4284eb9

Please sign in to comment.