Skip to content

Commit

Permalink
fix(job): increase attemptsMade when moving job to active (#1009) fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jan 25, 2022
1 parent 07735a0 commit 0974ae0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 35 deletions.
6 changes: 2 additions & 4 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ export class Job<

let command: string;
const multi = client.multi();
this.saveAttempt(multi, err);
this.saveStacktrace(multi, err);

//
// Check if an automatic retry should be performed
Expand Down Expand Up @@ -919,8 +919,7 @@ export class Job<
);
}

private saveAttempt(multi: Pipeline, err: Error) {
this.attemptsMade++;
private saveStacktrace(multi: Pipeline, err: Error) {
this.stacktrace = this.stacktrace || [];

if (err?.stack) {
Expand All @@ -931,7 +930,6 @@ export class Job<
}

const params = {
attemptsMade: this.attemptsMade,
stacktrace: JSON.stringify(this.stacktrace),
failedReason: err?.message,
};
Expand Down
1 change: 1 addition & 0 deletions src/commands/moveToActive-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ if jobId then
rcall("XADD", KEYS[4], "*", "event", "active", "jobId", jobId, "prev", "waiting")

rcall("HSET", jobKey, "processedOn", ARGV[4])
rcall("HINCRBY", jobKey, "attemptsMade", 1)

return {rcall("HGETALL", jobKey), jobId} -- get job data
else
Expand Down
61 changes: 30 additions & 31 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1468,34 +1468,33 @@ describe('workers', function () {

await worker.waitUntilReady();

await queue.add(
'test',
{ foo: 'bar' },
{
attempts: 1,
},
);
await queue.add(
'test',
{ foo: 'baz' },
{
attempts: 1,
},
);

await new Promise<void>((resolve, reject) => {
const completing = new Promise<void>((resolve, reject) => {
queueEvents.on('retries-exhausted', async () => {
reject();
});

queueEvents.on(
'completed',
after(2, async function ({ jobId, returnvalue }) {
after(3, async function ({ jobId, returnvalue }) {
resolve();
}),
);
});

await Promise.all(
times(3, () =>
queue.add(
'test',
{ foo: 'baz' },
{
attempts: 1,
},
),
),
);

await completing;

await worker.close();
});
});
Expand Down Expand Up @@ -1537,8 +1536,8 @@ describe('workers', function () {
const worker = new Worker(
queueName,
async job => {
expect(job.attemptsMade).to.be.eql(tries);
tries++;
expect(job.attemptsMade).to.be.eql(tries);
if (job.attemptsMade < 2) {
throw new Error('Not yet!');
}
Expand Down Expand Up @@ -1570,10 +1569,10 @@ describe('workers', function () {
queueName,
async job => {
tries++;
if (job.attemptsMade < 3) {
if (job.attemptsMade < 4) {
throw new Error('Not yet!');
}
expect(job.attemptsMade).to.be.eql(tries - 1);
expect(job.attemptsMade).to.be.eql(tries);
},
{ connection },
);
Expand Down Expand Up @@ -1603,15 +1602,15 @@ describe('workers', function () {
});

it('should retry a job after a delay if a fixed backoff is given', async function () {
this.timeout(12000);
this.timeout(10000);

const queueScheduler = new QueueScheduler(queueName);
await queueScheduler.waitUntilReady();

const worker = new Worker(
queueName,
async job => {
if (job.attemptsMade < 2) {
if (job.attemptsMade < 3) {
throw new Error('Not yet!');
}
},
Expand Down Expand Up @@ -1643,15 +1642,15 @@ describe('workers', function () {
});

it('should retry a job after a delay if an exponential backoff is given', async function () {
this.timeout(12000);
this.timeout(10000);

const queueScheduler = new QueueScheduler(queueName, { connection });
await queueScheduler.waitUntilReady();

const worker = new Worker(
queueName,
async job => {
if (job.attemptsMade < 2) {
if (job.attemptsMade < 3) {
throw new Error('Not yet!');
}
},
Expand Down Expand Up @@ -1687,14 +1686,14 @@ describe('workers', function () {
});

it('should retry a job after a delay if a custom backoff is given', async function () {
this.timeout(12000);
this.timeout(10000);
const queueScheduler = new QueueScheduler(queueName, { connection });
await queueScheduler.waitUntilReady();

const worker = new Worker(
queueName,
async job => {
if (job.attemptsMade < 2) {
if (job.attemptsMade < 3) {
throw new Error('Not yet!');
}
},
Expand Down Expand Up @@ -1794,7 +1793,7 @@ describe('workers', function () {
const worker = new Worker(
queueName,
async job => {
if (job.attemptsMade < 2) {
if (job.attemptsMade < 3) {
throw new CustomError('Hey, custom error!');
}
},
Expand Down Expand Up @@ -1899,15 +1898,15 @@ describe('workers', function () {
});

it('should be able to handle a custom backoff if it returns a promise', async function () {
this.timeout(12000);
this.timeout(10000);

const queueScheduler = new QueueScheduler(queueName, { connection });
await queueScheduler.waitUntilReady();

const worker = new Worker(
queueName,
async (job: Job) => {
if (job.attemptsMade < 2) {
if (job.attemptsMade < 3) {
throw new Error('some error');
}
},
Expand Down Expand Up @@ -1954,7 +1953,7 @@ describe('workers', function () {
let attempts = 0;
const worker = new Worker(
queueName,
async job => {
async () => {
if (attempts === 0) {
attempts++;
throw failedError;
Expand Down Expand Up @@ -2011,7 +2010,7 @@ describe('workers', function () {

const worker = new Worker(
queueName,
async job => {
async () => {
if (attempts === 0) {
attempts++;
throw failedError;
Expand Down

0 comments on commit 0974ae0

Please sign in to comment.