diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 8b754e3af2..aac2e01e4a 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -21,6 +21,7 @@ import { QueueSchedulerOptions, RedisClient, WorkerOptions, + KeepJobs, } from '../interfaces'; import { JobState, FinishedTarget, FinishedPropValAttribute } from '../types'; import { ErrorCode } from '../enums'; @@ -229,7 +230,7 @@ export class Scripts { job: Job, val: any, propVal: FinishedPropValAttribute, - shouldRemove: boolean | number, + shouldRemove: boolean | number | KeepJobs, target: FinishedTarget, token: string, fetchNext = true, @@ -248,12 +249,13 @@ export class Scripts { queueKeys.stalled, ]; - let remove; - if (typeof shouldRemove === 'boolean') { - remove = shouldRemove ? '1' : '0'; - } else if (typeof shouldRemove === 'number') { - remove = `${shouldRemove + 1}`; - } + const keepJobs = pack( + typeof shouldRemove === 'object' + ? shouldRemove + : typeof shouldRemove === 'number' + ? { count: shouldRemove } + : { count: shouldRemove ? 0 : -1 }, + ); const args = [ job.id, @@ -261,7 +263,7 @@ export class Scripts { propVal, typeof val === 'undefined' ? 'null' : val, target, - remove, + keepJobs, JSON.stringify({ jobId: job.id, val: val }), !fetchNext || queue.closing || opts.limiter ? 0 : 1, queueKeys[''], @@ -278,21 +280,22 @@ export class Scripts { } private static async moveToFinished< - T = any, - R = any, - N extends string = string, + DataType = any, + ReturnType = any, + NameType extends string = string, >( queue: MinimalQueue, - job: Job, + job: Job, val: any, propVal: FinishedPropValAttribute, - shouldRemove: boolean | number, + shouldRemove: boolean | number | KeepJobs, target: FinishedTarget, token: string, fetchNext: boolean, ): Promise { const client = await queue.client; - const args = this.moveToFinishedArgs( + + const args = this.moveToFinishedArgs( queue, job, val, @@ -359,7 +362,7 @@ export class Scripts { queue: MinimalQueue, job: Job, returnvalue: any, - removeOnComplete: boolean | number, + removeOnComplete: boolean | number | KeepJobs, token: string, fetchNext: boolean, ): Promise { @@ -379,7 +382,7 @@ export class Scripts { queue: MinimalQueue, job: Job, failedReason: string, - removeOnFailed: boolean | number, + removeOnFailed: boolean | number | KeepJobs, token: string, fetchNext = false, retriesExhausted = 0, diff --git a/src/commands/moveToFinished-8.lua b/src/commands/moveToFinished-8.lua index 8ba53f9f60..3bea3ac096 100644 --- a/src/commands/moveToFinished-8.lua +++ b/src/commands/moveToFinished-8.lua @@ -49,36 +49,36 @@ local rcall = redis.call --- @include "includes/removeParentDependencyKey" local jobIdKey = KEYS[3] -if rcall("EXISTS",jobIdKey) == 1 then -- // Make sure job exists +if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists if rcall("SCARD", jobIdKey .. ":dependencies") ~= 0 then -- // Make sure it does not have pending dependencies - return -4 + return -4 end if ARGV[10] ~= "0" then - local lockKey = jobIdKey .. ':lock' - if rcall("GET", lockKey) == ARGV[10] then - rcall("DEL", lockKey) - rcall("SREM", KEYS[8], ARGV[1]) - else - return -2 - end + local lockKey = jobIdKey .. ':lock' + if rcall("GET", lockKey) == ARGV[10] then + rcall("DEL", lockKey) + rcall("SREM", KEYS[8], ARGV[1]) + else + return -2 + end end local jobId = ARGV[1] + local timestamp = ARGV[2] -- Remove from active list (if not active we shall return error) local numRemovedElements = rcall("LREM", KEYS[1], -1, jobId) - if(numRemovedElements < 1) then + if (numRemovedElements < 1) then return -3 end - -- Trim events before emiting them to avoid trimming events emitted in this script local maxEvents = rcall("HGET", KEYS[7], "opts.maxLenEvents") if (maxEvents == false) then - maxEvents = 10000 + maxEvents = 10000 end rcall("XTRIM", KEYS[6], "MAXLEN", "~", maxEvents) @@ -91,38 +91,51 @@ if rcall("EXISTS",jobIdKey) == 1 then -- // Make sure job exists local parentId = ARGV[12] local parentQueueKey = ARGV[13] if parentId == "" and ARGV[14] ~= "" then - parentId = getJobIdFromKey(ARGV[14]) - parentQueueKey = getJobKeyPrefix(ARGV[14], ":" .. parentId) + parentId = getJobIdFromKey(ARGV[14]) + parentQueueKey = getJobKeyPrefix(ARGV[14], ":" .. parentId) end if parentId ~= "" and ARGV[5] == "completed" then - local parentKey = parentQueueKey .. ":" .. parentId + local parentKey = parentQueueKey .. ":" .. parentId local dependenciesSet = parentKey .. ":dependencies" local result = rcall("SREM", dependenciesSet, jobIdKey) if result == 1 then - updateParentDepsIfNeeded(parentKey, parentQueueKey, dependenciesSet, parentId, jobIdKey, ARGV[4]) + updateParentDepsIfNeeded(parentKey, parentQueueKey, dependenciesSet, + parentId, jobIdKey, ARGV[4]) end end -- Remove job? - local removeJobs = tonumber(ARGV[6]) - if removeJobs ~= 1 then + local keepJobs = cmsgpack.unpack(ARGV[6]) + local maxCount = keepJobs['count'] + local maxAge = keepJobs['age'] + if maxCount ~= 0 then + local targetSet = KEYS[2] -- Add to complete/failed set - rcall("ZADD", KEYS[2], ARGV[2], jobId) - rcall("HMSET", jobIdKey, ARGV[3], ARGV[4], "finishedOn", - ARGV[2]) -- "returnvalue" / "failedReason" and "finishedOn" + rcall("ZADD", targetSet, timestamp, jobId) + rcall("HMSET", jobIdKey, ARGV[3], ARGV[4], "finishedOn", timestamp) -- "returnvalue" / "failedReason" and "finishedOn" -- Remove old jobs? - if removeJobs and removeJobs > 1 then - local start = removeJobs - 1 - local jobIds = rcall("ZREVRANGE", KEYS[2], start, -1) - for i, jobId in ipairs(jobIds) do - local jobKey = ARGV[9] .. jobId - removeParentDependencyKey(jobKey) - local jobLogKey = jobKey .. ':logs' - local jobProcessedKey = jobKey .. ':processed' - rcall("DEL", jobKey, jobLogKey, jobProcessedKey) - end - rcall("ZREMRANGEBYRANK", KEYS[2], 0, -removeJobs) + local prefix = ARGV[9] + local function removeJob(jobId) + local jobKey = prefix .. jobId + removeParentDependencyKey(jobKey) + local jobLogKey = jobKey .. ':logs' + local jobProcessedKey = jobKey .. ':processed' + rcall("DEL", jobKey, jobLogKey, jobProcessedKey) + end + + if maxAge ~= nil then + local start = timestamp - maxAge * 1000 + local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf") + for i, jobId in ipairs(jobIds) do removeJob(jobId) end + rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start) + end + + if maxCount ~= nil and maxCount > 0 then + local start = maxCount + local jobIds = rcall("ZREVRANGE", targetSet, start, -1) + for i, jobId in ipairs(jobIds) do removeJob(jobId) end + rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1)) end else local jobLogKey = jobIdKey .. ':logs' @@ -134,9 +147,10 @@ if rcall("EXISTS",jobIdKey) == 1 then -- // Make sure job exists ARGV[4]) if ARGV[5] == "failed" then - if tonumber(ARGV[16]) >= tonumber(ARGV[15]) then - rcall("XADD", KEYS[6], "*", "event", "retries-exhausted", "jobId", jobId, "attemptsMade", ARGV[16]) - end + if tonumber(ARGV[16]) >= tonumber(ARGV[15]) then + rcall("XADD", KEYS[6], "*", "event", "retries-exhausted", "jobId", + jobId, "attemptsMade", ARGV[16]) + end end -- Try to get next job to avoid an extra roundtrip if the queue is not closing, @@ -150,17 +164,17 @@ if rcall("EXISTS",jobIdKey) == 1 then -- // Make sure job exists -- get a lock if ARGV[10] ~= "0" then - rcall("SET", lockKey, ARGV[10], "PX", ARGV[11]) + rcall("SET", lockKey, ARGV[10], "PX", ARGV[11]) end rcall("ZREM", KEYS[5], jobId) -- remove from priority rcall("XADD", KEYS[6], "*", "event", "active", "jobId", jobId, "prev", "waiting") - rcall("HSET", jobKey, "processedOn", ARGV[2]) + rcall("HSET", jobKey, "processedOn", timestamp) return {rcall("HGETALL", jobKey), jobId} -- get job data else - rcall("XADD", KEYS[6], "*", "event", "drained"); + rcall("XADD", KEYS[6], "*", "event", "drained"); end end diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index f68980b1d2..21704fd877 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -6,6 +6,7 @@ export * from './connection'; export * from './flow-job'; export * from './job-json'; export * from './jobs-options'; +export * from './keep-jobs'; export * from './parent-command'; export * from './parent-message'; export * from './parent'; diff --git a/src/interfaces/jobs-options.ts b/src/interfaces/jobs-options.ts index 9c46bd29ac..a9d014d5f4 100644 --- a/src/interfaces/jobs-options.ts +++ b/src/interfaces/jobs-options.ts @@ -1,5 +1,4 @@ -import { RepeatOptions } from './repeat-options'; -import { BackoffOptions } from './backoff-options'; +import { RepeatOptions, KeepJobs, BackoffOptions } from './'; export interface JobsOptions { /** @@ -70,17 +69,19 @@ export interface JobsOptions { /** * If true, removes the job when it successfully completes * When given an number, it specifies the maximum amount of - * jobs to keep. + * jobs to keep, or you can provide an object specifying max + * age and/or count to keep. * Default behavior is to keep the job in the completed set. */ - removeOnComplete?: boolean | number; + removeOnComplete?: boolean | number | KeepJobs; /** * If true, removes the job when it fails after all attempts. * When given an number, it specifies the maximum amount of - * jobs to keep. + * jobs to keep, or you can provide an object specifying max + * age and/or count to keep. */ - removeOnFail?: boolean | number; + removeOnFail?: boolean | number | KeepJobs; /** * Limits the amount of stack trace lines that will be recorded in the stacktrace. diff --git a/src/interfaces/keep-jobs.ts b/src/interfaces/keep-jobs.ts new file mode 100644 index 0000000000..7c38812dff --- /dev/null +++ b/src/interfaces/keep-jobs.ts @@ -0,0 +1,18 @@ +/** + * KeepJobs + * + * Specify which jobs to keep after finishing. If both age and count are + * specified, then the jobs kept will be the ones that satisfies both + * properties. + */ +export interface KeepJobs { + /** + * Maximum age in seconds for job to be kept. + */ + age?: number; + + /** + * Maximum count of jobs to be kept. + */ + count?: number; +} diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 0f01b67600..8e535afd91 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -11,6 +11,8 @@ import { Worker, QueueScheduler, } from '../src/classes'; +import { KeepJobs, JobsOptions } from '../src/interfaces'; + import { delay, removeAllQueueData } from '../src/utils'; describe('workers', function () { @@ -193,6 +195,81 @@ describe('workers', function () { }); describe('auto job removal', () => { + async function testRemoveOnFinish( + opts: boolean | number | KeepJobs, + expectedCount: number, + fail?: boolean, + ) { + const clock = sinon.useFakeTimers(); + clock.reset(); + + const worker = new Worker( + queueName, + async job => { + await job.log('test log'); + if (fail) { + throw new Error('job failed'); + } + }, + { connection }, + ); + await worker.waitUntilReady(); + + const datas = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]; + + let jobIds; + + const processing = new Promise(resolve => { + worker.on(fail ? 'failed' : 'completed', async job => { + clock.tick(1000); + + if (job.data == 14) { + const counts = await queue.getJobCounts( + fail ? 'failed' : 'completed', + ); + + if (fail) { + expect(counts.failed).to.be.equal(expectedCount); + } else { + expect(counts.completed).to.be.equal(expectedCount); + } + + await Promise.all( + jobIds.map(async (jobId, index) => { + const job = await queue.getJob(jobId); + const logs = await queue.getJobLogs(jobId); + if (index >= datas.length - expectedCount) { + expect(job).to.not.be.equal(undefined); + expect(logs.logs).to.not.be.empty; + } else { + expect(job).to.be.equal(undefined); + expect(logs.logs).to.be.empty; + } + }), + ); + resolve(); + } + }); + }); + + const jobOpts: JobsOptions = {}; + if (fail) { + jobOpts.removeOnFail = opts; + } else { + jobOpts.removeOnComplete = opts; + } + + jobIds = ( + await Promise.all( + datas.map(async data => queue.add('test', data, jobOpts)), + ) + ).map(job => job.id); + + await processing; + clock.restore(); + await worker.close(); + } + it('should remove job after completed if removeOnComplete', async () => { const worker = new Worker( queueName, @@ -273,55 +350,27 @@ describe('workers', function () { it('should keep specified number of jobs after completed with removeOnComplete', async () => { const keepJobs = 3; + await testRemoveOnFinish(keepJobs, keepJobs); + }); - const worker = new Worker( - queueName, - async job => { - await job.log('test log'); - }, - { connection }, - ); - await worker.waitUntilReady(); - - const datas = [0, 1, 2, 3, 4, 5, 6, 7, 8]; - - const jobIds = await Promise.all( - datas.map( - async data => - ( - await queue.add('test', data, { removeOnComplete: keepJobs }) - ).id, - ), - ); - - await new Promise(resolve => { - worker.on('completed', async job => { - if (job.data == 8) { - const counts = await queue.getJobCounts('completed'); - expect(counts.completed).to.be.equal(keepJobs); + it('should keep of jobs newer than specified after completed with removeOnComplete', async () => { + const age = 7; + await testRemoveOnFinish({ age }, age); + }); - await Promise.all( - jobIds.map(async (jobId, index) => { - const job = await queue.getJob(jobId); - const logs = await queue.getJobLogs(jobId); - if (index >= datas.length - keepJobs) { - expect(job).to.not.be.equal(undefined); - expect(logs.logs).to.not.be.empty; - } else { - expect(job).to.be.equal(undefined); - expect(logs.logs).to.be.empty; - } - }), - ); - resolve(); - } - }); - }); + it('should keep of jobs newer than specified and up to a count completed with removeOnComplete', async () => { + const age = 7; + const count = 5; + await testRemoveOnFinish({ age, count }, count); + }); - await worker.close(); + it('should keep of jobs newer than specified and up to a count fail with removeOnFail', async () => { + const age = 7; + const count = 5; + await testRemoveOnFinish({ age, count }, count, true); }); - it('should keep specified number of jobs after completed with global removeOnComplete', async () => { + it('should keep specified number of jobs after completed with default job options removeOnComplete', async () => { const keepJobs = 3; const newQueue = new Queue(queueName, { @@ -376,42 +425,7 @@ describe('workers', function () { }); it('should remove job after failed if removeOnFail', async () => { - const jobError = new Error('Job Failed'); - const worker = new Worker( - queueName, - async job => { - await job.log('test log'); - throw jobError; - }, - { connection }, - ); - await worker.waitUntilReady(); - - const job = await queue.add( - 'test', - { foo: 'bar' }, - { removeOnFail: true }, - ); - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.eql('bar'); - - return new Promise(resolve => { - worker.on('failed', async (job, error) => { - await queue - .getJob(job.id) - .then(currentJob => { - expect(currentJob).to.be.equal(undefined); - return null; - }) - .then(() => { - return queue.getJobCounts('failed').then(counts => { - expect(counts.failed).to.be.equal(0); - resolve(); - }); - }); - expect(error).to.be.equal(jobError); - }); - }); + await testRemoveOnFinish(true, 0, true); }); it('should remove a job after fail if the default job options specify removeOnFail', async () => { @@ -447,44 +461,7 @@ describe('workers', function () { it('should keep specified number of jobs after completed with removeOnFail', async () => { const keepJobs = 3; - - const worker = new Worker(queueName, async job => { - throw Error('error'); - }); - await worker.waitUntilReady(); - - const datas = [0, 1, 2, 3, 4, 5, 6, 7, 8]; - - const jobIds = await Promise.all( - datas.map( - async data => - ( - await queue.add('test', data, { removeOnFail: keepJobs }) - ).id, - ), - ); - - return new Promise(resolve => { - worker.on('failed', async job => { - if (job.data == 8) { - const counts = await queue.getJobCounts('failed'); - expect(counts.failed).to.be.equal(keepJobs); - - await Promise.all( - jobIds.map(async (jobId, index) => { - const job = await queue.getJob(jobId); - if (index >= datas.length - keepJobs) { - expect(job).to.not.be.equal(undefined); - } else { - expect(job).to.be.equal(undefined); - } - }), - ); - await worker.close(); - resolve(); - } - }); - }); + await testRemoveOnFinish(keepJobs, keepJobs, true); }); it('should keep specified number of jobs after completed with global removeOnFail', async () => {