Skip to content

Commit 08fb159

Browse files
authored
feat: allow running sub-tasks from tasks (#10373)
Task handlers now receive `inlineTask` as an arg, which can be used to run inline sub-tasks. In the task log, those inline tasks will have a `parent` property that points to the parent task. Example: ```ts { slug: 'subTask', inputSchema: [ { name: 'message', type: 'text', required: true, }, ], handler: async ({ job, inlineTask }) => { await inlineTask('create two docs', { task: async ({ input, inlineTask }) => { const { newSimple } = await inlineTask('create doc 1', { task: async ({ req }) => { const newSimple = await req.payload.create({ collection: 'simple', req, data: { title: input.message, }, }) return { output: { newSimple, }, } }, }) const { newSimple2 } = await inlineTask('create doc 2', { task: async ({ req }) => { const newSimple2 = await req.payload.create({ collection: 'simple', req, data: { title: input.message, }, }) return { output: { newSimple2, }, } }, }) return { output: { simpleID1: newSimple.id, simpleID2: newSimple2.id, }, } }, input: { message: job.input.message, }, }) }, } as WorkflowConfig<'subTask'> ``` Job log example: ```ts [ { executedAt: '2025-01-06T03:55:44.682Z', completedAt: '2025-01-06T03:55:44.684Z', taskSlug: 'inline', taskID: 'create doc 1', output: { newSimple: [Object] }, parent: { taskSlug: 'inline', taskID: 'create two docs' }, // <= New state: 'succeeded', id: '677b5440ba35d345d1214d1b' }, { executedAt: '2025-01-06T03:55:44.690Z', completedAt: '2025-01-06T03:55:44.692Z', taskSlug: 'inline', taskID: 'create doc 2', output: { newSimple2: [Object] }, parent: { taskSlug: 'inline', taskID: 'create two docs' }, // <= New state: 'succeeded', id: '677b5440ba35d345d1214d1c' }, { executedAt: '2025-01-06T03:55:44.681Z', completedAt: '2025-01-06T03:55:44.697Z', taskSlug: 'inline', taskID: 'create two docs', input: { message: 'hello!' }, output: { simpleID1: '677b54401e34772cc63c8693', simpleID2: '677b54401e34772cc63c8697' }, parent: {}, state: 'succeeded', id: '677b5440ba35d345d1214d1d' } ] ```
1 parent ab53aba commit 08fb159

File tree

10 files changed

+416
-48
lines changed

10 files changed

+416
-48
lines changed

docs/jobs-queue/tasks.mdx

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,3 +203,49 @@ export default buildConfig({
203203
}
204204
})
205205
```
206+
207+
## Nested tasks
208+
209+
You can run sub-tasks within an existing task, by using the `tasks` or `ìnlineTask` arguments passed to the task `handler` function:
210+
211+
212+
```ts
213+
export default buildConfig({
214+
// ...
215+
jobs: {
216+
// It is recommended to set `addParentToTaskLog` to `true` when using nested tasks, so that the parent task is included in the task log
217+
// This allows for better observability and debugging of the task execution
218+
addParentToTaskLog: true,
219+
tasks: [
220+
{
221+
slug: 'parentTask',
222+
inputSchema: [
223+
{
224+
name: 'text',
225+
type: 'text'
226+
},
227+
],
228+
handler: async ({ input, req, tasks, inlineTask }) => {
229+
230+
await inlineTask('Sub Task 1', {
231+
task: () => {
232+
// Do something
233+
return {
234+
output: {},
235+
}
236+
},
237+
})
238+
239+
await tasks.CreateSimple('Sub Task 2', {
240+
input: { message: 'hello' },
241+
})
242+
243+
return {
244+
output: {},
245+
}
246+
}
247+
} as TaskConfig<'parentTask'>,
248+
]
249+
}
250+
})
251+
```

packages/payload/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,7 @@ export type { JobsConfig, RunJobAccess, RunJobAccessArgs } from './queues/config
13041304
export type {
13051305
RunInlineTaskFunction,
13061306
RunTaskFunction,
1307+
RunTaskFunctions,
13071308
TaskConfig,
13081309
TaskHandler,
13091310
TaskHandlerArgs,

packages/payload/src/queues/config/jobsCollection.ts

Lines changed: 66 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,70 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
3030
})
3131
}
3232

33+
const logFields: Field[] = [
34+
{
35+
name: 'executedAt',
36+
type: 'date',
37+
required: true,
38+
},
39+
{
40+
name: 'completedAt',
41+
type: 'date',
42+
required: true,
43+
},
44+
{
45+
name: 'taskSlug',
46+
type: 'select',
47+
options: [...taskSlugs],
48+
required: true,
49+
},
50+
{
51+
name: 'taskID',
52+
type: 'text',
53+
required: true,
54+
},
55+
{
56+
name: 'input',
57+
type: 'json',
58+
},
59+
{
60+
name: 'output',
61+
type: 'json',
62+
},
63+
{
64+
name: 'state',
65+
type: 'radio',
66+
options: ['failed', 'succeeded'],
67+
required: true,
68+
},
69+
{
70+
name: 'error',
71+
type: 'json',
72+
admin: {
73+
condition: (_, data) => data.state === 'failed',
74+
},
75+
required: true,
76+
},
77+
]
78+
79+
if (config?.jobs?.addParentToTaskLog) {
80+
logFields.push({
81+
name: 'parent',
82+
type: 'group',
83+
fields: [
84+
{
85+
name: 'taskSlug',
86+
type: 'select',
87+
options: [...taskSlugs],
88+
},
89+
{
90+
name: 'taskID',
91+
type: 'text',
92+
},
93+
],
94+
})
95+
}
96+
3397
const jobsCollection: CollectionConfig = {
3498
slug: 'payload-jobs',
3599
admin: {
@@ -89,51 +153,7 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
89153
admin: {
90154
description: 'Task execution log',
91155
},
92-
fields: [
93-
{
94-
name: 'executedAt',
95-
type: 'date',
96-
required: true,
97-
},
98-
{
99-
name: 'completedAt',
100-
type: 'date',
101-
required: true,
102-
},
103-
{
104-
name: 'taskSlug',
105-
type: 'select',
106-
options: [...taskSlugs],
107-
required: true,
108-
},
109-
{
110-
name: 'taskID',
111-
type: 'text',
112-
required: true,
113-
},
114-
{
115-
name: 'input',
116-
type: 'json',
117-
},
118-
{
119-
name: 'output',
120-
type: 'json',
121-
},
122-
{
123-
name: 'state',
124-
type: 'radio',
125-
options: ['failed', 'succeeded'],
126-
required: true,
127-
},
128-
{
129-
name: 'error',
130-
type: 'json',
131-
admin: {
132-
condition: (_, data) => data.state === 'failed',
133-
},
134-
required: true,
135-
},
136-
],
156+
fields: logFields,
137157
},
138158
],
139159
label: 'Status',
@@ -204,5 +224,6 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
204224
},
205225
lockDocuments: false,
206226
}
227+
207228
return jobsCollection
208229
}

packages/payload/src/queues/config/types/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ export type JobsConfig = {
1919
*/
2020
run?: RunJobAccess
2121
}
22+
/**
23+
* Adds information about the parent job to the task log. This is useful for debugging and tracking the flow of tasks.
24+
*
25+
* In 4.0, this will default to `true`.
26+
*
27+
* @default false
28+
*/
29+
addParentToTaskLog?: boolean
2230
/**
2331
* Determine whether or not to delete a job after it has successfully completed.
2432
*/

packages/payload/src/queues/config/types/taskTypes.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,18 @@ export type TaskHandlerArgs<
2020
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput,
2121
TWorkflowSlug extends keyof TypedJobs['workflows'] = string,
2222
> = {
23+
/**
24+
* Use this function to run a sub-task from within another task.
25+
*/
26+
inlineTask: RunInlineTaskFunction
2327
input: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks']
2428
? TypedJobs['tasks'][TTaskSlugOrInputOutput]['input']
2529
: TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type
2630
? TTaskSlugOrInputOutput['input']
2731
: never
2832
job: RunningJob<TWorkflowSlug>
2933
req: PayloadRequest
34+
tasks: RunTaskFunctions
3035
}
3136

3237
/**
@@ -92,7 +97,13 @@ export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput exte
9297
*/
9398
retries?: number | RetryConfig | undefined
9499
// This is the same as TaskHandler, but typed out explicitly in order to improve type inference
95-
task: (args: { input: TTaskInput; job: RunningJob<any>; req: PayloadRequest }) =>
100+
task: (args: {
101+
inlineTask: RunInlineTaskFunction
102+
input: TTaskInput
103+
job: RunningJob<any>
104+
req: PayloadRequest
105+
tasks: RunTaskFunctions
106+
}) =>
96107
| {
97108
output: TTaskOutput
98109
state?: 'failed' | 'succeeded'

packages/payload/src/queues/config/types/workflowTypes.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { Field } from '../../../fields/config/types.js'
22
import type { PayloadRequest, StringKeyOf, TypedCollection, TypedJobs } from '../../../index.js'
3+
import type { TaskParent } from '../../operations/runJobs/runJob/getRunTaskFunction.js'
34
import type {
45
RetryConfig,
56
RunInlineTaskFunction,
@@ -18,8 +19,12 @@ export type JobLog = {
1819
* ID added by the array field when the log is saved in the database
1920
*/
2021
id?: string
21-
input?: any
22-
output?: any
22+
input?: Record<string, any>
23+
output?: Record<string, any>
24+
/**
25+
* Sub-tasks (tasks that are run within a task) will have a parent task ID
26+
*/
27+
parent?: TaskParent
2328
state: 'failed' | 'succeeded'
2429
taskID: string
2530
taskSlug: string

packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export async function handleTaskFailed({
4444
job,
4545
maxRetries,
4646
output,
47+
parent,
4748
req,
4849
retriesConfig,
4950
runnerOutput,
@@ -60,6 +61,7 @@ export async function handleTaskFailed({
6061
job: BaseJob
6162
maxRetries: number
6263
output: object
64+
parent?: TaskParent
6365
req: PayloadRequest
6466
retriesConfig: number | RetryConfig
6567
runnerOutput?: TaskHandlerResult<string>
@@ -93,6 +95,7 @@ export async function handleTaskFailed({
9395
executedAt: executedAt.toISOString(),
9496
input,
9597
output,
98+
parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined,
9699
state: 'failed',
97100
taskID,
98101
taskSlug,
@@ -142,13 +145,19 @@ export async function handleTaskFailed({
142145
}
143146
}
144147

148+
export type TaskParent = {
149+
taskID: string
150+
taskSlug: string
151+
}
152+
145153
export const getRunTaskFunction = <TIsInline extends boolean>(
146154
state: RunTaskFunctionState,
147155
job: BaseJob,
148156
workflowConfig: WorkflowConfig<string>,
149157
req: PayloadRequest,
150158
isInline: TIsInline,
151159
updateJob: UpdateJobFunction,
160+
parent?: TaskParent,
152161
): TIsInline extends true ? RunInlineTaskFunction : RunTaskFunctions => {
153162
const runTask: <TTaskSlug extends string>(
154163
taskSlug: TTaskSlug,
@@ -240,6 +249,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
240249
completedAt: new Date().toISOString(),
241250
error: errorMessage,
242251
executedAt: executedAt.toISOString(),
252+
parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined,
243253
state: 'failed',
244254
taskID,
245255
taskSlug,
@@ -269,9 +279,17 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
269279

270280
try {
271281
const runnerOutput = await runner({
282+
inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob, {
283+
taskID,
284+
taskSlug,
285+
}),
272286
input,
273287
job: job as unknown as RunningJob<WorkflowTypes>, // TODO: Type this better
274288
req,
289+
tasks: getRunTaskFunction(state, job, workflowConfig, req, false, updateJob, {
290+
taskID,
291+
taskSlug,
292+
}),
275293
})
276294

277295
if (runnerOutput.state === 'failed') {
@@ -281,6 +299,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
281299
job,
282300
maxRetries,
283301
output,
302+
parent,
284303
req,
285304
retriesConfig: finalRetriesConfig,
286305
runnerOutput,
@@ -303,6 +322,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
303322
job,
304323
maxRetries,
305324
output,
325+
parent,
306326
req,
307327
retriesConfig: finalRetriesConfig,
308328
state,
@@ -327,6 +347,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
327347
executedAt: executedAt.toISOString(),
328348
input,
329349
output,
350+
parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined,
330351
state: 'succeeded',
331352
taskID,
332353
taskSlug,

0 commit comments

Comments
 (0)