Skip to content

Commit

Permalink
fix(stalled): consider ignoreDependencyOnFailure option
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Apr 24, 2024
1 parent 0fb9b85 commit 0da62f4
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/commands/moveStalledJobsToWait-9.lua
Expand Up @@ -28,6 +28,7 @@ local rcall = redis.call
--- @include "includes/batches"
--- @include "includes/getTargetQueueList"
--- @include "includes/moveParentFromWaitingChildrenToFailed"
--- @include "includes/moveParentToWaitIfNeeded"
--- @include "includes/removeJob"
--- @include "includes/removeJobsByMaxAge"
--- @include "includes/removeJobsByMaxCount"
Expand Down Expand Up @@ -105,6 +106,16 @@ if (#stalling > 0) then
jobKey,
timestamp
)
elseif opts['idof'] then
local parentData = cjson.decode(rawParentData)
local parentKey = parentData['queueKey'] .. ':' .. parentData['id']
local dependenciesSet = parentKey .. ":dependencies"
if rcall("SREM", dependenciesSet, jobKey) == 1 then
moveParentToWaitIfNeeded(parentData['queueKey'], dependenciesSet,
parentKey, parentData['id'], timestamp)
local failedSet = parentKey .. ":failed"
rcall("HSET", failedSet, jobKey, failedReason)
end
end
if removeOnFailType == "number" then
removeJobsByMaxCount(opts["removeOnFail"],
Expand Down
89 changes: 89 additions & 0 deletions tests/test_stalled_jobs.ts
Expand Up @@ -467,6 +467,95 @@ describe('stalled jobs', function () {
});
});

describe('when ignoreDependencyOnFailure is provided as true', function () {
it('should move parent to waiting when child is moved to failed', async function () {
this.timeout(6000);
const concurrency = 4;
const parentQueueName = `parent-queue-${v4()}`;

const parentQueue = new Queue(parentQueueName, {
connection,
prefix,
});

const flow = new FlowProducer({ connection, prefix });

const worker = new Worker(
queueName,
async () => {
return delay(10000);
},
{
connection,
prefix,
lockDuration: 1000,
stalledInterval: 100,
maxStalledCount: 0,
concurrency,
},
);

const allActive = new Promise(resolve => {
worker.on('active', after(concurrency, resolve));
});

await worker.waitUntilReady();

const { job: parent } = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{
name: 'test',
data: { foo: 'bar' },
queueName,
opts: { ignoreDependencyOnFailure: true },
},
],
});

const jobs = Array.from(Array(3).keys()).map(index => ({
name: 'test',
data: { index },
}));

await queue.addBulk(jobs);
await allActive;
await worker.close(true);

const worker2 = new Worker(queueName, async job => {}, {
connection,
prefix,
stalledInterval: 100,
maxStalledCount: 0,
concurrency,
});

const errorMessage = 'job stalled more than allowable limit';
const allFailed = new Promise<void>(resolve => {
worker2.on(
'failed',
after(concurrency, async (job, failedReason, prev) => {
const parentState = await parent.getState();

expect(parentState).to.be.equal('waiting');
expect(prev).to.be.equal('active');
expect(failedReason.message).to.be.equal(errorMessage);
resolve();
}),
);
});

await allFailed;

await worker2.close();
await parentQueue.close();
await flow.close();
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});
});

describe('when removeOnFail is provided as a number', function () {
it('keeps the specified number of jobs in failed', async function () {
this.timeout(6000);
Expand Down

0 comments on commit 0da62f4

Please sign in to comment.