Skip to content

Commit d53f166

Browse files
authored
fix: ensure errors returned from tasks are properly logged (#11443)
Fixes #9767 We allow failing a job queue task by returning `{ state: 'failed' }` from the task, instead of throwing an error. However, previously, this threw an error when trying to update the task in the database. Additionally, it was not possible to customize the error message. This PR fixes that by letting you return `errorMessage` alongside `{ state: 'failed' }`, and by ensuring the error is transformed into proper json before saving it to the `error` column.
1 parent dfddee2 commit d53f166

File tree

4 files changed

+160
-40
lines changed

4 files changed

+160
-40
lines changed

packages/payload/src/queues/config/types/taskTypes.ts

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,19 @@ export type TaskInputOutput = {
77
}
88
export type TaskHandlerResult<
99
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput,
10-
> = {
11-
output: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks']
12-
? TypedJobs['tasks'][TTaskSlugOrInputOutput]['output']
13-
: TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type
14-
? TTaskSlugOrInputOutput['output']
15-
: never
16-
state?: 'failed' | 'succeeded'
17-
}
10+
> =
11+
| {
12+
errorMessage?: string
13+
state: 'failed'
14+
}
15+
| {
16+
output: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks']
17+
? TypedJobs['tasks'][TTaskSlugOrInputOutput]['output']
18+
: TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type
19+
? TTaskSlugOrInputOutput['output']
20+
: never
21+
state?: 'succeeded'
22+
}
1823

1924
export type TaskHandlerArgs<
2025
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput,
@@ -84,6 +89,8 @@ export type RunTaskFunctions = {
8489
[TTaskSlug in keyof TypedJobs['tasks']]: RunTaskFunction<TTaskSlug>
8590
}
8691

92+
type MaybePromise<T> = Promise<T> | T
93+
8794
export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput extends object>(
8895
taskID: string,
8996
taskArgs: {
@@ -103,12 +110,16 @@ export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput exte
103110
job: RunningJob<any>
104111
req: PayloadRequest
105112
tasks: RunTaskFunctions
106-
}) =>
113+
}) => MaybePromise<
114+
| {
115+
errorMessage?: string
116+
state: 'failed'
117+
}
107118
| {
108119
output: TTaskOutput
109-
state?: 'failed' | 'succeeded'
120+
state?: 'succeeded'
110121
}
111-
| Promise<{ output: TTaskOutput; state?: 'failed' | 'succeeded' }>
122+
>
112123
},
113124
) => Promise<TTaskOutput>
114125

packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ export async function handleTaskFailed({
4848
parent,
4949
req,
5050
retriesConfig,
51-
runnerOutput,
5251
state,
5352
taskConfig,
53+
taskHandlerResult,
5454
taskID,
5555
taskSlug,
5656
taskStatus,
@@ -65,9 +65,9 @@ export async function handleTaskFailed({
6565
parent?: TaskParent
6666
req: PayloadRequest
6767
retriesConfig: number | RetryConfig
68-
runnerOutput?: TaskHandlerResult<string>
6968
state: RunTaskFunctionState
7069
taskConfig?: TaskConfig<string>
70+
taskHandlerResult?: TaskHandlerResult<string>
7171
taskID: string
7272
taskSlug: string
7373
taskStatus: null | SingleTaskStatus<string>
@@ -88,7 +88,12 @@ export async function handleTaskFailed({
8888
message: error.message,
8989
stack: error.stack,
9090
}
91-
: runnerOutput.state
91+
: {
92+
message:
93+
taskHandlerResult.state === 'failed'
94+
? (taskHandlerResult.errorMessage ?? taskHandlerResult.state)
95+
: 'failed',
96+
}
9297

9398
job.log.push({
9499
completedAt: new Date().toISOString(),
@@ -262,8 +267,6 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
262267
return
263268
}
264269

265-
let output: object = {}
266-
267270
let maxRetries: number | undefined = finalRetriesConfig?.attempts
268271

269272
if (maxRetries === undefined || maxRetries === null) {
@@ -278,8 +281,11 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
278281
}
279282
}
280283

284+
let taskHandlerResult: TaskHandlerResult<string>
285+
let output: object = {}
286+
281287
try {
282-
const runnerOutput = await runner({
288+
taskHandlerResult = await runner({
283289
inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob, {
284290
taskID,
285291
taskSlug,
@@ -292,29 +298,6 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
292298
taskSlug,
293299
}),
294300
})
295-
296-
if (runnerOutput.state === 'failed') {
297-
await handleTaskFailed({
298-
executedAt,
299-
input,
300-
job,
301-
maxRetries,
302-
output,
303-
parent,
304-
req,
305-
retriesConfig: finalRetriesConfig,
306-
runnerOutput,
307-
state,
308-
taskConfig,
309-
taskID,
310-
taskSlug,
311-
taskStatus,
312-
updateJob,
313-
})
314-
throw new Error('Task failed')
315-
} else {
316-
output = runnerOutput.output
317-
}
318301
} catch (err) {
319302
await handleTaskFailed({
320303
error: err,
@@ -336,6 +319,29 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
336319
throw new Error('Task failed')
337320
}
338321

322+
if (taskHandlerResult.state === 'failed') {
323+
await handleTaskFailed({
324+
executedAt,
325+
input,
326+
job,
327+
maxRetries,
328+
output,
329+
parent,
330+
req,
331+
retriesConfig: finalRetriesConfig,
332+
state,
333+
taskConfig,
334+
taskHandlerResult,
335+
taskID,
336+
taskSlug,
337+
taskStatus,
338+
updateJob,
339+
})
340+
throw new Error('Task failed')
341+
} else {
342+
output = taskHandlerResult.output
343+
}
344+
339345
if (taskConfig?.onSuccess) {
340346
await taskConfig.onSuccess()
341347
}

test/queues/config.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,44 @@ export default buildConfigWithDefaults({
323323
],
324324
handler: path.resolve(dirname, 'runners/externalTask.ts') + '#externalTaskHandler',
325325
} as TaskConfig<'ExternalTask'>,
326+
{
327+
retries: 0,
328+
slug: 'ThrowError',
329+
inputSchema: [],
330+
outputSchema: [],
331+
handler: () => {
332+
throw new Error('failed')
333+
},
334+
} as TaskConfig<'ThrowError'>,
335+
{
336+
retries: 0,
337+
slug: 'ReturnError',
338+
inputSchema: [],
339+
outputSchema: [],
340+
handler: () => {
341+
return {
342+
state: 'failed',
343+
}
344+
},
345+
} as TaskConfig<'ReturnError'>,
346+
{
347+
retries: 0,
348+
slug: 'ReturnCustomError',
349+
inputSchema: [
350+
{
351+
name: 'errorMessage',
352+
type: 'text',
353+
required: true,
354+
},
355+
],
356+
outputSchema: [],
357+
handler: ({ input }) => {
358+
return {
359+
state: 'failed',
360+
errorMessage: input.errorMessage,
361+
}
362+
},
363+
} as TaskConfig<'ReturnCustomError'>,
326364
],
327365
workflows: [
328366
updatePostWorkflow,

test/queues/int.spec.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,4 +1128,69 @@ describe('Queues', () => {
11281128
// @ts-expect-error
11291129
expect(jobAfterRun.input.amountTask1Retried).toBe(0)
11301130
})
1131+
1132+
it('can tasks throw error', async () => {
1133+
payload.config.jobs.deleteJobOnComplete = false
1134+
1135+
const job = await payload.jobs.queue({
1136+
task: 'ThrowError',
1137+
input: {},
1138+
})
1139+
1140+
await payload.jobs.run()
1141+
1142+
const jobAfterRun = await payload.findByID({
1143+
collection: 'payload-jobs',
1144+
id: job.id,
1145+
})
1146+
1147+
expect(jobAfterRun.hasError).toBe(true)
1148+
expect(jobAfterRun.log?.length).toBe(1)
1149+
expect(jobAfterRun.log[0].error.message).toBe('failed')
1150+
expect(jobAfterRun.log[0].state).toBe('failed')
1151+
})
1152+
1153+
it('can tasks return error', async () => {
1154+
payload.config.jobs.deleteJobOnComplete = false
1155+
1156+
const job = await payload.jobs.queue({
1157+
task: 'ReturnError',
1158+
input: {},
1159+
})
1160+
1161+
await payload.jobs.run()
1162+
1163+
const jobAfterRun = await payload.findByID({
1164+
collection: 'payload-jobs',
1165+
id: job.id,
1166+
})
1167+
1168+
expect(jobAfterRun.hasError).toBe(true)
1169+
expect(jobAfterRun.log?.length).toBe(1)
1170+
expect(jobAfterRun.log[0].error.message).toBe('failed')
1171+
expect(jobAfterRun.log[0].state).toBe('failed')
1172+
})
1173+
1174+
it('can tasks return error with custom error message', async () => {
1175+
payload.config.jobs.deleteJobOnComplete = false
1176+
1177+
const job = await payload.jobs.queue({
1178+
task: 'ReturnCustomError',
1179+
input: {
1180+
errorMessage: 'custom error message',
1181+
},
1182+
})
1183+
1184+
await payload.jobs.run()
1185+
1186+
const jobAfterRun = await payload.findByID({
1187+
collection: 'payload-jobs',
1188+
id: job.id,
1189+
})
1190+
1191+
expect(jobAfterRun.hasError).toBe(true)
1192+
expect(jobAfterRun.log?.length).toBe(1)
1193+
expect(jobAfterRun.log[0].error.message).toBe('custom error message')
1194+
expect(jobAfterRun.log[0].state).toBe('failed')
1195+
})
11311196
})

0 commit comments

Comments
 (0)