Skip to content

Commit

Permalink
feat(worker): add remove on complete and fail options (#1703)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Feb 25, 2023
1 parent b58c6f3 commit cf13494
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 10 deletions.
26 changes: 19 additions & 7 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,16 @@ export class Scripts {
job: MinimalJob<T, R, N>,
val: any,
propVal: FinishedPropValAttribute,
shouldRemove: boolean | number | KeepJobs,
shouldRemove: undefined | boolean | number | KeepJobs,
target: FinishedStatus,
token: string,
timestamp: number,
fetchNext = true,
): (string | number | boolean | Buffer)[] {
const queueKeys = this.queue.keys;
const opts: WorkerOptions = <WorkerOptions>this.queue.opts;
const workerKeepJobs =
target === 'completed' ? opts.removeOnComplete : opts.removeOnFail;

const metricsKey = this.queue.toKey(`metrics:${target}`);

Expand All @@ -252,12 +254,7 @@ export class Scripts {
metricsKey,
];

const keepJobs =
typeof shouldRemove === 'object'
? shouldRemove
: typeof shouldRemove === 'number'
? { count: shouldRemove }
: { count: shouldRemove ? 0 : -1 };
const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs);

const args = [
job.id,
Expand Down Expand Up @@ -285,6 +282,21 @@ export class Scripts {
return keys.concat(args);
}

protected getKeepJobs(
shouldRemove: undefined | boolean | number | KeepJobs,
workerKeepJobs: undefined | KeepJobs,
) {
if (typeof shouldRemove === 'undefined') {
return workerKeepJobs || { count: shouldRemove ? 0 : -1 };
}

return typeof shouldRemove === 'object'
? shouldRemove
: typeof shouldRemove === 'number'
? { count: shouldRemove }
: { count: shouldRemove ? 0 : -1 };
}

protected async moveToFinished<
DataType = any,
ReturnType = any,
Expand Down
5 changes: 3 additions & 2 deletions src/interfaces/base-job-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export interface DefaultJobOptions {
* If true, removes the job when it successfully completes
* When given a number, it specifies the maximum amount of
* jobs to keep, or you can provide an object specifying max
* age and/or count to keep.
* age and/or count to keep. It overrides whatever setting is used in the worker.
* Default behavior is to keep the job in the completed set.
*/
removeOnComplete?: boolean | number | KeepJobs;
Expand All @@ -53,7 +53,8 @@ export interface DefaultJobOptions {
* If true, removes the job when it fails after all attempts.
* When given a number, it specifies the maximum amount of
* jobs to keep, or you can provide an object specifying max
* age and/or count to keep.
* age and/or count to keep. It overrides whatever setting is used in the worker.
* Default behavior is to keep the job in the failed set.
*/
removeOnFail?: boolean | number | KeepJobs;

Expand Down
19 changes: 18 additions & 1 deletion src/interfaces/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AdvancedOptions } from './advanced-options';
import { QueueBaseOptions } from './queue-options';
import { RateLimiterOptions } from './rate-limiter-options';
import { MetricsOptions } from './metrics-options';
import { KeepJobs } from './keep-jobs';

/**
* An async function that receives `Job`s and handles them.
Expand All @@ -19,6 +20,7 @@ export interface WorkerOptions extends QueueBaseOptions {
* @default true
*/
autorun?: boolean;

/**
* Amount of jobs that a single worker is allowed to work on
* in parallel.
Expand All @@ -27,6 +29,7 @@ export interface WorkerOptions extends QueueBaseOptions {
* @see {@link https://docs.bullmq.io/guide/workers/concurrency}
*/
concurrency?: number;

/**
* Enable rate limiter
* @see {@link https://docs.bullmq.io/guide/rate-limiting}
Expand Down Expand Up @@ -55,6 +58,20 @@ export interface WorkerOptions extends QueueBaseOptions {
*/
stalledInterval?: number;

/**
* You can provide an object specifying max
* age and/or count to keep.
* Default behavior is to keep the job in the completed set.
*/
removeOnComplete?: KeepJobs;

/**
* You can provide an object specifying max
* age and/or count to keep.
* Default behavior is to keep the job in the failed set.
*/
removeOnFail?: KeepJobs;

/**
* Skip stalled check for this worker. Note that other workers could still
* perform stalled checkd and move jobs back to wait for jobs being processed
Expand Down Expand Up @@ -101,7 +118,7 @@ export interface WorkerOptions extends QueueBaseOptions {
/**
* More advanced options.
*/
settings?: AdvancedOptions; // FIXME only backoffStrategies is used
settings?: AdvancedOptions;
}

export interface GetNextJobOptions {
Expand Down
88 changes: 88 additions & 0 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,75 @@ describe('workers', function () {
await worker.close();
}

async function testWorkerRemoveOnFinish(
opts: KeepJobs,
expectedCount: number,
fail?: boolean,
) {
const clock = sinon.useFakeTimers();
clock.reset();

const worker = new Worker(
queueName,
async job => {
await job.log('test log');
if (fail) {
throw new Error('job failed');
}
},
{
connection,
...(fail ? { removeOnFail: opts } : { removeOnComplete: opts }),
},
);
await worker.waitUntilReady();

const datas = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14];

let jobIds;

const processing = new Promise<void>(resolve => {
worker.on(fail ? 'failed' : 'completed', async job => {
clock.tick(1000);

if (job.data == 14) {
const counts = await queue.getJobCounts(
fail ? 'failed' : 'completed',
);

if (fail) {
expect(counts.failed).to.be.equal(expectedCount);
} else {
expect(counts.completed).to.be.equal(expectedCount);
}

await Promise.all(
jobIds.map(async (jobId, index) => {
const job = await queue.getJob(jobId);
const logs = await queue.getJobLogs(jobId);
if (index >= datas.length - expectedCount) {
expect(job).to.not.be.equal(undefined);
expect(logs.logs).to.not.be.empty;
} else {
expect(job).to.be.equal(undefined);
expect(logs.logs).to.be.empty;
}
}),
);
resolve();
}
});
});

jobIds = (
await Promise.all(datas.map(async data => queue.add('test', data)))
).map(job => job.id);

await processing;
clock.restore();
await worker.close();
}

it('should remove job after completed if removeOnComplete', async () => {
const worker = new Worker(
queueName,
Expand Down Expand Up @@ -483,6 +552,25 @@ describe('workers', function () {
});
});
});

describe('when worker has removeOnFinish options', () => {
it('should keep of jobs newer than specified after completed with removeOnComplete', async () => {
const age = 7;
await testWorkerRemoveOnFinish({ age }, age);
});

it('should keep of jobs newer than specified and up to a count completed with removeOnComplete', async () => {
const age = 7;
const count = 5;
await testWorkerRemoveOnFinish({ age, count }, count);
});

it('should keep of jobs newer than specified and up to a count fail with removeOnFail', async () => {
const age = 7;
const count = 5;
await testWorkerRemoveOnFinish({ age, count }, count, true);
});
});
});

it('process a lifo queue', async function () {
Expand Down

0 comments on commit cf13494

Please sign in to comment.