Skip to content

Commit

Permalink
Merge 036b154 into 30a9bc6
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Jan 26, 2022
2 parents 30a9bc6 + 036b154 commit 7911ebd
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 178 deletions.
35 changes: 19 additions & 16 deletions src/classes/scripts.ts
Expand Up @@ -21,6 +21,7 @@ import {
QueueSchedulerOptions,
RedisClient,
WorkerOptions,
KeepJobs,
} from '../interfaces';
import { JobState, FinishedTarget, FinishedPropValAttribute } from '../types';
import { ErrorCode } from '../enums';
Expand Down Expand Up @@ -229,7 +230,7 @@ export class Scripts {
job: Job<T, R, N>,
val: any,
propVal: FinishedPropValAttribute,
shouldRemove: boolean | number,
shouldRemove: boolean | number | KeepJobs,
target: FinishedTarget,
token: string,
fetchNext = true,
Expand All @@ -248,20 +249,21 @@ export class Scripts {
queueKeys.stalled,
];

let remove;
if (typeof shouldRemove === 'boolean') {
remove = shouldRemove ? '1' : '0';
} else if (typeof shouldRemove === 'number') {
remove = `${shouldRemove + 1}`;
}
const keepJobs = pack(
typeof shouldRemove === 'object'
? shouldRemove
: typeof shouldRemove === 'number'
? { count: shouldRemove }
: { count: shouldRemove ? 0 : -1 },
);

const args = [
job.id,
Date.now(),
propVal,
typeof val === 'undefined' ? 'null' : val,
target,
remove,
keepJobs,
JSON.stringify({ jobId: job.id, val: val }),
!fetchNext || queue.closing || opts.limiter ? 0 : 1,
queueKeys[''],
Expand All @@ -278,21 +280,22 @@ export class Scripts {
}

private static async moveToFinished<
T = any,
R = any,
N extends string = string,
DataType = any,
ReturnType = any,
NameType extends string = string,
>(
queue: MinimalQueue,
job: Job<T, R, N>,
job: Job<DataType, ReturnType, NameType>,
val: any,
propVal: FinishedPropValAttribute,
shouldRemove: boolean | number,
shouldRemove: boolean | number | KeepJobs,
target: FinishedTarget,
token: string,
fetchNext: boolean,
): Promise<JobData | []> {
const client = await queue.client;
const args = this.moveToFinishedArgs<T, R, N>(

const args = this.moveToFinishedArgs<DataType, ReturnType, NameType>(
queue,
job,
val,
Expand Down Expand Up @@ -359,7 +362,7 @@ export class Scripts {
queue: MinimalQueue,
job: Job<T, R, N>,
returnvalue: any,
removeOnComplete: boolean | number,
removeOnComplete: boolean | number | KeepJobs,
token: string,
fetchNext: boolean,
): Promise<JobData | []> {
Expand All @@ -379,7 +382,7 @@ export class Scripts {
queue: MinimalQueue,
job: Job<T, R, N>,
failedReason: string,
removeOnFailed: boolean | number,
removeOnFailed: boolean | number | KeepJobs,
token: string,
fetchNext = false,
retriesExhausted = 0,
Expand Down
90 changes: 52 additions & 38 deletions src/commands/moveToFinished-8.lua
Expand Up @@ -49,36 +49,36 @@ local rcall = redis.call
--- @include "includes/removeParentDependencyKey"

local jobIdKey = KEYS[3]
if rcall("EXISTS",jobIdKey) == 1 then -- // Make sure job exists
if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists

if rcall("SCARD", jobIdKey .. ":dependencies") ~= 0 then -- // Make sure it does not have pending dependencies
return -4
return -4
end

if ARGV[10] ~= "0" then
local lockKey = jobIdKey .. ':lock'
if rcall("GET", lockKey) == ARGV[10] then
rcall("DEL", lockKey)
rcall("SREM", KEYS[8], ARGV[1])
else
return -2
end
local lockKey = jobIdKey .. ':lock'
if rcall("GET", lockKey) == ARGV[10] then
rcall("DEL", lockKey)
rcall("SREM", KEYS[8], ARGV[1])
else
return -2
end
end

local jobId = ARGV[1]
local timestamp = ARGV[2]

-- Remove from active list (if not active we shall return error)
local numRemovedElements = rcall("LREM", KEYS[1], -1, jobId)

if(numRemovedElements < 1) then
if (numRemovedElements < 1) then
return -3
end


-- Trim events before emiting them to avoid trimming events emitted in this script
local maxEvents = rcall("HGET", KEYS[7], "opts.maxLenEvents")
if (maxEvents == false) then
maxEvents = 10000
maxEvents = 10000
end
rcall("XTRIM", KEYS[6], "MAXLEN", "~", maxEvents)

Expand All @@ -91,38 +91,51 @@ if rcall("EXISTS",jobIdKey) == 1 then -- // Make sure job exists
local parentId = ARGV[12]
local parentQueueKey = ARGV[13]
if parentId == "" and ARGV[14] ~= "" then
parentId = getJobIdFromKey(ARGV[14])
parentQueueKey = getJobKeyPrefix(ARGV[14], ":" .. parentId)
parentId = getJobIdFromKey(ARGV[14])
parentQueueKey = getJobKeyPrefix(ARGV[14], ":" .. parentId)
end
if parentId ~= "" and ARGV[5] == "completed" then
local parentKey = parentQueueKey .. ":" .. parentId
local parentKey = parentQueueKey .. ":" .. parentId
local dependenciesSet = parentKey .. ":dependencies"
local result = rcall("SREM", dependenciesSet, jobIdKey)
if result == 1 then
updateParentDepsIfNeeded(parentKey, parentQueueKey, dependenciesSet, parentId, jobIdKey, ARGV[4])
updateParentDepsIfNeeded(parentKey, parentQueueKey, dependenciesSet,
parentId, jobIdKey, ARGV[4])
end
end

-- Remove job?
local removeJobs = tonumber(ARGV[6])
if removeJobs ~= 1 then
local keepJobs = cmsgpack.unpack(ARGV[6])
local maxCount = keepJobs['count']
local maxAge = keepJobs['age']
if maxCount ~= 0 then
local targetSet = KEYS[2]
-- Add to complete/failed set
rcall("ZADD", KEYS[2], ARGV[2], jobId)
rcall("HMSET", jobIdKey, ARGV[3], ARGV[4], "finishedOn",
ARGV[2]) -- "returnvalue" / "failedReason" and "finishedOn"
rcall("ZADD", targetSet, timestamp, jobId)
rcall("HMSET", jobIdKey, ARGV[3], ARGV[4], "finishedOn", timestamp) -- "returnvalue" / "failedReason" and "finishedOn"

-- Remove old jobs?
if removeJobs and removeJobs > 1 then
local start = removeJobs - 1
local jobIds = rcall("ZREVRANGE", KEYS[2], start, -1)
for i, jobId in ipairs(jobIds) do
local jobKey = ARGV[9] .. jobId
removeParentDependencyKey(jobKey)
local jobLogKey = jobKey .. ':logs'
local jobProcessedKey = jobKey .. ':processed'
rcall("DEL", jobKey, jobLogKey, jobProcessedKey)
end
rcall("ZREMRANGEBYRANK", KEYS[2], 0, -removeJobs)
local prefix = ARGV[9]
local function removeJob(jobId)
local jobKey = prefix .. jobId
removeParentDependencyKey(jobKey)
local jobLogKey = jobKey .. ':logs'
local jobProcessedKey = jobKey .. ':processed'
rcall("DEL", jobKey, jobLogKey, jobProcessedKey)
end

if maxAge ~= nil then
local start = timestamp - maxAge * 1000
local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf")
for i, jobId in ipairs(jobIds) do removeJob(jobId) end
rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start)
end

if maxCount ~= nil and maxCount > 0 then
local start = maxCount
local jobIds = rcall("ZREVRANGE", targetSet, start, -1)
for i, jobId in ipairs(jobIds) do removeJob(jobId) end
rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1))
end
else
local jobLogKey = jobIdKey .. ':logs'
Expand All @@ -134,9 +147,10 @@ if rcall("EXISTS",jobIdKey) == 1 then -- // Make sure job exists
ARGV[4])

if ARGV[5] == "failed" then
if tonumber(ARGV[16]) >= tonumber(ARGV[15]) then
rcall("XADD", KEYS[6], "*", "event", "retries-exhausted", "jobId", jobId, "attemptsMade", ARGV[16])
end
if tonumber(ARGV[16]) >= tonumber(ARGV[15]) then
rcall("XADD", KEYS[6], "*", "event", "retries-exhausted", "jobId",
jobId, "attemptsMade", ARGV[16])
end
end

-- Try to get next job to avoid an extra roundtrip if the queue is not closing,
Expand All @@ -150,17 +164,17 @@ if rcall("EXISTS",jobIdKey) == 1 then -- // Make sure job exists

-- get a lock
if ARGV[10] ~= "0" then
rcall("SET", lockKey, ARGV[10], "PX", ARGV[11])
rcall("SET", lockKey, ARGV[10], "PX", ARGV[11])
end

rcall("ZREM", KEYS[5], jobId) -- remove from priority
rcall("XADD", KEYS[6], "*", "event", "active", "jobId", jobId,
"prev", "waiting")
rcall("HSET", jobKey, "processedOn", ARGV[2])
rcall("HSET", jobKey, "processedOn", timestamp)

return {rcall("HGETALL", jobKey), jobId} -- get job data
else
rcall("XADD", KEYS[6], "*", "event", "drained");
rcall("XADD", KEYS[6], "*", "event", "drained");
end
end

Expand Down
1 change: 1 addition & 0 deletions src/interfaces/index.ts
Expand Up @@ -6,6 +6,7 @@ export * from './connection';
export * from './flow-job';
export * from './job-json';
export * from './jobs-options';
export * from './keep-jobs';
export * from './parent-command';
export * from './parent-message';
export * from './parent';
Expand Down
13 changes: 7 additions & 6 deletions src/interfaces/jobs-options.ts
@@ -1,5 +1,4 @@
import { RepeatOptions } from './repeat-options';
import { BackoffOptions } from './backoff-options';
import { RepeatOptions, KeepJobs, BackoffOptions } from './';

export interface JobsOptions {
/**
Expand Down Expand Up @@ -70,17 +69,19 @@ export interface JobsOptions {
/**
* If true, removes the job when it successfully completes
* When given an number, it specifies the maximum amount of
* jobs to keep.
* jobs to keep, or you can provide an object specifying max
* age and/or count to keep.
* Default behavior is to keep the job in the completed set.
*/
removeOnComplete?: boolean | number;
removeOnComplete?: boolean | number | KeepJobs;

/**
* If true, removes the job when it fails after all attempts.
* When given an number, it specifies the maximum amount of
* jobs to keep.
* jobs to keep, or you can provide an object specifying max
* age and/or count to keep.
*/
removeOnFail?: boolean | number;
removeOnFail?: boolean | number | KeepJobs;

/**
* Limits the amount of stack trace lines that will be recorded in the stacktrace.
Expand Down
18 changes: 18 additions & 0 deletions src/interfaces/keep-jobs.ts
@@ -0,0 +1,18 @@
/**
* KeepJobs
*
* Specify which jobs to keep after finishing. If both age and count are
* specified, then the jobs kept will be the ones that satisfies both
* properties.
*/
export interface KeepJobs {
/**
* Maximum age in seconds for job to be kept.
*/
age?: number;

/**
* Maximum count of jobs to be kept.
*/
count?: number;
}

0 comments on commit 7911ebd

Please sign in to comment.