Skip to content

Commit caf9150

Browse files
authored
fix: stop workflows retrying forever when no retries are configured (#16465)
Backport of #16450
1 parent 0facc44 commit caf9150

7 files changed

Lines changed: 212 additions & 17 deletions

File tree

packages/payload/src/queues/errors/handleWorkflowError.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,18 @@ export async function handleWorkflowError({
4444
stack: error.stack,
4545
}
4646

47-
const { hasFinalError, maxWorkflowRetries, waitUntil } = getWorkflowRetryBehavior({
48-
job,
49-
retriesConfig: workflowConfig.retries!,
50-
})
47+
// No retries configured => permanently fail. Errors reaching this handler are
48+
// workflow-level (task errors are routed to handleTaskError first), so there's
49+
// nothing else to bound them.
50+
const hasNoRetriesConfigured =
51+
workflowConfig.retries === undefined || workflowConfig.retries === null
52+
53+
const { hasFinalError, maxWorkflowRetries, waitUntil } = hasNoRetriesConfigured
54+
? { hasFinalError: true, maxWorkflowRetries: undefined, waitUntil: undefined }
55+
: getWorkflowRetryBehavior({
56+
job,
57+
retriesConfig: workflowConfig.retries,
58+
})
5159

5260
if (!hasFinalError) {
5361
if (job.waitUntil) {

packages/payload/src/queues/operations/runJobs/index.ts

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -335,25 +335,48 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
335335
}
336336
const jobReq = isolateObjectProperty(req, 'transactionID')
337337

338-
const workflowConfig: WorkflowConfig =
339-
job.workflowSlug && jobsConfig.workflows?.length
340-
? jobsConfig.workflows.find(({ slug }) => slug === job.workflowSlug)!
341-
: {
342-
slug: 'singleTask',
343-
handler: async ({ job, tasks }) => {
344-
await tasks[job.taskSlug as string]!('1', {
345-
input: job.input,
346-
})
347-
},
348-
}
338+
let workflowConfig: undefined | WorkflowConfig = undefined
339+
340+
if (job.workflowSlug && jobsConfig.workflows?.length) {
341+
workflowConfig = jobsConfig.workflows.find(({ slug }) => slug === job.workflowSlug)
342+
} else if (job.taskSlug && jobsConfig.tasks?.length) {
343+
const taskExists = jobsConfig.tasks.some(({ slug }) => slug === job.taskSlug)
344+
if (taskExists) {
345+
workflowConfig = {
346+
slug: 'singleTask',
347+
handler: async ({ job, tasks }) => {
348+
await tasks[job.taskSlug as string]!('1', {
349+
input: job.input,
350+
})
351+
},
352+
}
353+
}
354+
}
349355

350356
if (!workflowConfig) {
357+
// Permanently fail jobs whose task/workflow slug is no longer registered in config — they can never complete.
358+
const errorMessage = `${job.taskSlug ? `Task '${job.taskSlug}'` : `Workflow '${job.workflowSlug}'`} is not registered in payload.config.jobs.`
359+
360+
if (!silent || (typeof silent === 'object' && !silent.error)) {
361+
payload.logger.error({
362+
msg: `Error running job ${job.workflowSlug || `Task: ${job.taskSlug}`} id: ${job.id} - ${errorMessage}`,
363+
})
364+
}
365+
366+
const updateJob = getUpdateJobFunction(job, jobReq)
367+
await updateJob({
368+
error: { message: errorMessage },
369+
hasError: true,
370+
processing: false,
371+
totalTried: (job.totalTried ?? 0) + 1,
372+
})
373+
351374
return {
352375
id: job.id,
353376
result: {
354-
status: 'error',
377+
status: 'error-reached-max-retries',
355378
},
356-
} // Skip jobs with no workflow configuration
379+
}
357380
}
358381

359382
try {

test/queues/getConfig.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import { selfCancelWorkflow } from './workflows/selfCancel.js'
3838
import { subTaskWorkflow } from './workflows/subTask.js'
3939
import { subTaskFailsWorkflow } from './workflows/subTaskFails.js'
4040
import { supersedesConcurrencyWorkflow } from './workflows/supersedesConcurrency.js'
41+
import { throwsInHandlerNoRetriesWorkflow } from './workflows/throwsInHandlerNoRetries.js'
42+
import { throwsInHandlerRetries1Workflow } from './workflows/throwsInHandlerRetries1.js'
4143
import { updatePostWorkflow } from './workflows/updatePost.js'
4244
import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js'
4345
import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js'
@@ -179,6 +181,8 @@ export const getConfig: () => Partial<Config> = () => ({
179181
noConcurrencyWorkflow,
180182
queueSpecificConcurrencyWorkflow,
181183
supersedesConcurrencyWorkflow,
184+
throwsInHandlerNoRetriesWorkflow,
185+
throwsInHandlerRetries1Workflow,
182186
],
183187
},
184188
editor: lexicalEditor(),

test/queues/int.spec.ts

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,115 @@ describe('Queues - Payload', () => {
938938
expect(allSimples.docs[0]?.title).toBe('from single task')
939939
})
940940

941+
describe('when a queued task slug is no longer registered in config', () => {
942+
let originalTasks: typeof payload.config.jobs.tasks
943+
944+
beforeEach(() => {
945+
originalTasks = payload.config.jobs.tasks
946+
})
947+
948+
afterEach(() => {
949+
payload.config.jobs.tasks = originalTasks
950+
})
951+
952+
it('should permanently fail the job after one attempt instead of retrying forever', async () => {
953+
payload.config.jobs.deleteJobOnComplete = false
954+
955+
const job = await payload.jobs.queue({
956+
task: 'CreateSimple',
957+
input: {
958+
message: 'queued before task removal',
959+
},
960+
})
961+
962+
// Simulate a deploy that removed the 'CreateSimple' task from config
963+
payload.config.jobs.tasks = originalTasks!.filter((t) => t.slug !== 'CreateSimple')
964+
965+
await payload.jobs.run({ silent: true })
966+
967+
const jobAfterRun = await payload.findByID({
968+
collection: 'payload-jobs',
969+
id: job.id,
970+
})
971+
972+
expect(jobAfterRun.hasError).toBe(true)
973+
expect(jobAfterRun.processing).toBe(false)
974+
expect(jobAfterRun.totalTried).toBe(1)
975+
})
976+
977+
it('should permanently fail when a workflow handler calls a task that has been removed', async () => {
978+
payload.config.jobs.deleteJobOnComplete = false
979+
980+
const job = await payload.jobs.queue({
981+
workflow: 'workflowNoRetriesSet',
982+
input: {
983+
message: 'queued before referenced task removal',
984+
},
985+
})
986+
987+
payload.config.jobs.tasks = originalTasks!.filter((t) => t.slug !== 'CreateSimple')
988+
989+
await payload.jobs.run({ silent: true })
990+
991+
const jobAfterRun = await payload.findByID({
992+
collection: 'payload-jobs',
993+
id: job.id,
994+
})
995+
996+
expect(jobAfterRun.hasError).toBe(true)
997+
expect(jobAfterRun.processing).toBe(false)
998+
expect(jobAfterRun.totalTried).toBe(1)
999+
})
1000+
})
1001+
1002+
it('should not retry a workflow with no retries configured when its handler throws', async () => {
1003+
payload.config.jobs.deleteJobOnComplete = false
1004+
1005+
const job = await payload.jobs.queue({
1006+
workflow: 'throwsInHandlerNoRetries',
1007+
input: {},
1008+
})
1009+
1010+
await payload.jobs.run({ silent: true })
1011+
1012+
const jobAfterRun = await payload.findByID({
1013+
collection: 'payload-jobs',
1014+
id: job.id,
1015+
})
1016+
1017+
expect(jobAfterRun.hasError).toBe(true)
1018+
expect(jobAfterRun.processing).toBe(false)
1019+
expect(jobAfterRun.totalTried).toBe(1)
1020+
})
1021+
1022+
it('should retry a workflow with retries=1 exactly once when its handler throws', async () => {
1023+
payload.config.jobs.deleteJobOnComplete = false
1024+
1025+
const job = await payload.jobs.queue({
1026+
workflow: 'throwsInHandlerRetries1',
1027+
input: {},
1028+
})
1029+
1030+
let hasJobsRemaining = true
1031+
while (hasJobsRemaining) {
1032+
const response = await payload.jobs.run({ silent: true })
1033+
if (response.noJobsRemaining) {
1034+
hasJobsRemaining = false
1035+
}
1036+
}
1037+
1038+
const jobAfterRun = await payload.findByID({
1039+
collection: 'payload-jobs',
1040+
id: job.id,
1041+
})
1042+
1043+
// Initial attempt + 1 retry = 2. Once hasError is true the queue stops picking it up,
1044+
// so the loop naturally bounds at exactly 2 attempts.
1045+
expect(jobAfterRun.totalTried).toBe(2)
1046+
expect(jobAfterRun.hasError).toBe(true)
1047+
expect(jobAfterRun.processing).toBe(false)
1048+
})
1049+
9411050
it('can queue and run via the endpoint single tasks without workflows', async () => {
9421051
const workflowsRef = payload.config.jobs.workflows
9431052
delete payload.config.jobs.workflows

test/queues/payload-types.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ export interface Config {
9494
globals: {};
9595
globalsSelect: {};
9696
locale: null;
97+
widgets: {
98+
collections: CollectionsWidget;
99+
};
97100
user: User;
98101
jobs: {
99102
tasks: {
@@ -140,6 +143,8 @@ export interface Config {
140143
noConcurrency: WorkflowNoConcurrency;
141144
queueSpecificConcurrency: WorkflowQueueSpecificConcurrency;
142145
supersedesConcurrency: WorkflowSupersedesConcurrency;
146+
throwsInHandlerNoRetries: WorkflowThrowsInHandlerNoRetries;
147+
throwsInHandlerRetries1: WorkflowThrowsInHandlerRetries1;
143148
};
144149
};
145150
}
@@ -365,6 +370,8 @@ export interface PayloadJob {
365370
| 'noConcurrency'
366371
| 'queueSpecificConcurrency'
367372
| 'supersedesConcurrency'
373+
| 'throwsInHandlerNoRetries'
374+
| 'throwsInHandlerRetries1'
368375
)
369376
| null;
370377
taskSlug?:
@@ -571,6 +578,16 @@ export interface PayloadMigrationsSelect<T extends boolean = true> {
571578
updatedAt?: T;
572579
createdAt?: T;
573580
}
581+
/**
582+
* This interface was referenced by `Config`'s JSON-Schema
583+
* via the `definition` "collections_widget".
584+
*/
585+
export interface CollectionsWidget {
586+
data?: {
587+
[k: string]: unknown;
588+
};
589+
width: 'full';
590+
}
574591
/**
575592
* This interface was referenced by `Config`'s JSON-Schema
576593
* via the `definition` "MyUpdatePostType".
@@ -932,6 +949,20 @@ export interface WorkflowSupersedesConcurrency {
932949
delayMs?: number | null;
933950
};
934951
}
952+
/**
953+
* This interface was referenced by `Config`'s JSON-Schema
954+
* via the `definition` "WorkflowThrowsInHandlerNoRetries".
955+
*/
956+
export interface WorkflowThrowsInHandlerNoRetries {
957+
input?: unknown;
958+
}
959+
/**
960+
* This interface was referenced by `Config`'s JSON-Schema
961+
* via the `definition` "WorkflowThrowsInHandlerRetries1".
962+
*/
963+
export interface WorkflowThrowsInHandlerRetries1 {
964+
input?: unknown;
965+
}
935966
/**
936967
* This interface was referenced by `Config`'s JSON-Schema
937968
* via the `definition` "auth".
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import type { WorkflowConfig } from 'payload'
2+
3+
export const throwsInHandlerNoRetriesWorkflow: WorkflowConfig<'throwsInHandlerNoRetries'> = {
4+
slug: 'throwsInHandlerNoRetries',
5+
inputSchema: [],
6+
// Intentionally no `retries` set — exercises the default no-retries-on-workflow-error behavior
7+
handler: () => {
8+
throw new Error('This workflow throws in its handler')
9+
},
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import type { WorkflowConfig } from 'payload'
2+
3+
export const throwsInHandlerRetries1Workflow: WorkflowConfig<'throwsInHandlerRetries1'> = {
4+
slug: 'throwsInHandlerRetries1',
5+
inputSchema: [],
6+
retries: 1,
7+
handler: () => {
8+
throw new Error('This workflow throws in its handler')
9+
},
10+
}

0 commit comments

Comments
 (0)