Skip to content

Commit c844b4c

Browse files
authored
feat: configurable job queue processing order (LIFO/FIFO), allow sequential execution of jobs (#11897)
Previously, jobs were executed in FIFO order on MongoDB, and LIFO on Postgres, with no way to configure this behavior. This PR makes FIFO the default on both MongoDB and Postgres and introduces the following new options to configure the processing order globally or on a queue-by-queue basis: - a `processingOrder` property to the jobs config - a `processingOrder` argument to `payload.jobs.run()` to override what's set in the jobs config It also adds a new `sequential` option to `payload.jobs.run()`, which can be useful for debugging.
1 parent 9c88af4 commit c844b4c

File tree

9 files changed

+340
-21
lines changed

9 files changed

+340
-21
lines changed

docs/jobs-queue/queues.mdx

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ Then, you could configure two different runner strategies:
2828

2929
As mentioned above, you can queue jobs, but the jobs won't run unless a worker picks up your jobs and runs them. This can be done in four ways:
3030

31-
#### Cron jobs
31+
### Cron jobs
3232

3333
You can use the `jobs.autoRun` property to configure cron jobs:
3434

@@ -63,7 +63,7 @@ export default buildConfig({
6363
and should not be used on serverless platforms like Vercel.
6464
</Banner>
6565

66-
#### Endpoint
66+
### Endpoint
6767

6868
You can execute jobs by making a fetch request to the `/api/payload-jobs/run` endpoint:
6969

@@ -130,7 +130,7 @@ This works because Vercel automatically makes the `CRON_SECRET` environment vari
130130

131131
After the project is deployed to Vercel, the Vercel Cron job will automatically trigger the `/api/payload-jobs/run` endpoint in the specified schedule, running the queued jobs in the background.
132132

133-
#### Local API
133+
### Local API
134134

135135
If you want to process jobs programmatically from your server-side code, you can use the Local API:
136136

@@ -156,7 +156,7 @@ const results = await payload.jobs.runByID({
156156
})
157157
```
158158

159-
#### Bin script
159+
### Bin script
160160

161161
Finally, you can process jobs via the bin script that comes with Payload out of the box.
162162

@@ -169,3 +169,76 @@ In addition, the bin script allows you to pass a `--cron` flag to the `jobs:run`
169169
```sh
170170
npx payload jobs:run --cron "*/5 * * * *"
171171
```
172+
173+
## Processing Order
174+
175+
By default, jobs are processed first in, first out (FIFO). This means that the first job added to the queue will be the first one processed. However, you can also configure the order in which jobs are processed.
176+
177+
### Jobs Configuration
178+
179+
You can configure the order in which jobs are processed in the jobs configuration by passing the `processingOrder` property. This mimics the Payload [sort](../queries/sort) property that's used for functionality such as `payload.find()`.
180+
181+
```ts
182+
export default buildConfig({
183+
// Other configurations...
184+
jobs: {
185+
tasks: [
186+
// your tasks here
187+
],
188+
processingOrder: '-createdAt', // Process jobs in reverse order of creation = LIFO
189+
},
190+
})
191+
```
192+
193+
You can also set this on a queue-by-queue basis:
194+
195+
```ts
196+
export default buildConfig({
197+
// Other configurations...
198+
jobs: {
199+
tasks: [
200+
// your tasks here
201+
],
202+
processingOrder: {
203+
default: 'createdAt', // FIFO
204+
queues: {
205+
nightly: '-createdAt', // LIFO
206+
myQueue: '-createdAt', // LIFO
207+
},
208+
},
209+
},
210+
})
211+
```
212+
213+
If you need even more control over the processing order, you can pass a function that returns the processing order - this function will be called every time a queue starts processing jobs.
214+
215+
```ts
216+
export default buildConfig({
217+
// Other configurations...
218+
jobs: {
219+
tasks: [
220+
// your tasks here
221+
],
222+
processingOrder: ({ queue }) => {
223+
if (queue === 'myQueue') {
224+
return '-createdAt' // LIFO
225+
}
226+
return 'createdAt' // FIFO
227+
},
228+
},
229+
})
230+
```
231+
232+
### Local API
233+
234+
You can configure the order in which jobs are processed in the `payload.jobs.queue` method by passing the `processingOrder` property.
235+
236+
```ts
237+
const createdJob = await payload.jobs.queue({
238+
workflow: 'createPostAndUpdate',
239+
input: {
240+
title: 'my title',
241+
},
242+
processingOrder: '-createdAt', // Process jobs in reverse order of creation = LIFO
243+
})
244+
```

packages/db-mongodb/src/updateJobs.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ import type { BaseJob, UpdateJobs, Where } from 'payload'
44
import type { MongooseAdapter } from './index.js'
55

66
import { buildQuery } from './queries/buildQuery.js'
7+
import { buildSortParam } from './queries/buildSortParam.js'
78
import { getCollection } from './utilities/getEntity.js'
89
import { getSession } from './utilities/getSession.js'
910
import { handleError } from './utilities/handleError.js'
1011
import { transform } from './utilities/transform.js'
1112

1213
export const updateJobs: UpdateJobs = async function updateMany(
1314
this: MongooseAdapter,
14-
{ id, data, limit, req, returning, where: whereArg },
15+
{ id, data, limit, req, returning, sort: sortArg, where: whereArg },
1516
) {
1617
if (!(data?.log as object[])?.length) {
1718
delete data.log
@@ -23,6 +24,14 @@ export const updateJobs: UpdateJobs = async function updateMany(
2324
collectionSlug: 'payload-jobs',
2425
})
2526

27+
const sort: Record<string, unknown> | undefined = buildSortParam({
28+
adapter: this,
29+
config: this.payload.config,
30+
fields: collectionConfig.flattenedFields,
31+
sort: sortArg || collectionConfig.defaultSort,
32+
timestamps: true,
33+
})
34+
2635
const options: MongooseUpdateQueryOptions = {
2736
lean: true,
2837
new: true,
@@ -54,7 +63,7 @@ export const updateJobs: UpdateJobs = async function updateMany(
5463
const documentsToUpdate = await Model.find(
5564
query,
5665
{},
57-
{ ...options, limit, projection: { _id: 1 } },
66+
{ ...options, limit, projection: { _id: 1 }, sort },
5867
)
5968
if (documentsToUpdate.length === 0) {
6069
return null
@@ -69,7 +78,14 @@ export const updateJobs: UpdateJobs = async function updateMany(
6978
return null
7079
}
7180

72-
result = await Model.find(query, {}, options)
81+
result = await Model.find(
82+
query,
83+
{},
84+
{
85+
...options,
86+
sort,
87+
},
88+
)
7389
}
7490
} catch (error) {
7591
handleError({ collection: collectionConfig.slug, error, req })

packages/payload/src/queues/config/types/index.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { CollectionConfig } from '../../../index.js'
2-
import type { Payload, PayloadRequest } from '../../../types/index.js'
2+
import type { Payload, PayloadRequest, Sort } from '../../../types/index.js'
3+
import type { RunJobsArgs } from '../../operations/runJobs/index.js'
34
import type { TaskConfig } from './taskTypes.js'
45
import type { WorkflowConfig } from './workflowTypes.js'
56

@@ -80,6 +81,22 @@ export type JobsConfig = {
8081
* a new collection.
8182
*/
8283
jobsCollectionOverrides?: (args: { defaultJobsCollection: CollectionConfig }) => CollectionConfig
84+
/**
85+
* Adjust the job processing order using a Payload sort string. This can be set globally or per queue.
86+
*
87+
* FIFO would equal `createdAt` and LIFO would equal `-createdAt`.
88+
*
89+
* @default all jobs for all queues will be executed in FIFO order.
90+
*/
91+
processingOrder?:
92+
| ((args: RunJobsArgs) => Promise<Sort> | Sort)
93+
| {
94+
default?: Sort
95+
queues: {
96+
[queue: string]: Sort
97+
}
98+
}
99+
| Sort
83100
/**
84101
* By default, the job system uses direct database calls for optimal performance.
85102
* If you added custom hooks to your jobs collection, you can set this to true to

packages/payload/src/queues/localAPI.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
type Payload,
66
type PayloadRequest,
77
type RunningJob,
8+
type Sort,
89
type TypedJobs,
910
type Where,
1011
} from '../index.js'
@@ -99,17 +100,30 @@ export const getJobsLocalAPI = (payload: Payload) => ({
99100
run: async (args?: {
100101
limit?: number
101102
overrideAccess?: boolean
103+
/**
104+
* Adjust the job processing order using a Payload sort string.
105+
*
106+
* FIFO would equal `createdAt` and LIFO would equal `-createdAt`.
107+
*/
108+
processingOrder?: Sort
102109
queue?: string
103110
req?: PayloadRequest
111+
/**
112+
* By default, jobs are run in parallel.
113+
* If you want to run them in sequence, set this to true.
114+
*/
115+
sequential?: boolean
104116
where?: Where
105117
}): Promise<ReturnType<typeof runJobs>> => {
106118
const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload))
107119

108120
return await runJobs({
109121
limit: args?.limit,
110122
overrideAccess: args?.overrideAccess !== false,
123+
processingOrder: args?.processingOrder,
111124
queue: args?.queue,
112125
req: newReq,
126+
sequential: args?.sequential,
113127
where: args?.where,
114128
})
115129
},

packages/payload/src/queues/operations/runJobs/index.ts

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
// @ts-strict-ignore
2-
import type { PaginatedDocs } from '../../../database/types.js'
3-
import type { PayloadRequest, Where } from '../../../types/index.js'
2+
import type { PayloadRequest, Sort, Where } from '../../../types/index.js'
43
import type { WorkflowJSON } from '../../config/types/workflowJSONTypes.js'
54
import type {
65
BaseJob,
@@ -26,8 +25,21 @@ export type RunJobsArgs = {
2625
id?: number | string
2726
limit?: number
2827
overrideAccess?: boolean
28+
/**
29+
* Adjust the job processing order
30+
*
31+
* FIFO would equal `createdAt` and LIFO would equal `-createdAt`.
32+
*
33+
* @default all jobs for all queues will be executed in FIFO order.
34+
*/
35+
processingOrder?: Sort
2936
queue?: string
3037
req: PayloadRequest
38+
/**
39+
* By default, jobs are run in parallel.
40+
* If you want to run them in sequence, set this to true.
41+
*/
42+
sequential?: boolean
3143
where?: Where
3244
}
3345

@@ -43,14 +55,18 @@ export type RunJobsResult = {
4355
remainingJobsFromQueried: number
4456
}
4557

46-
export const runJobs = async ({
47-
id,
48-
limit = 10,
49-
overrideAccess,
50-
queue,
51-
req,
52-
where: whereFromProps,
53-
}: RunJobsArgs): Promise<RunJobsResult> => {
58+
export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
59+
const {
60+
id,
61+
limit = 10,
62+
overrideAccess,
63+
processingOrder,
64+
queue,
65+
req,
66+
sequential,
67+
where: whereFromProps,
68+
} = args
69+
5470
if (!overrideAccess) {
5571
const hasAccess = await req.payload.config.jobs.access.run({ req })
5672
if (!hasAccess) {
@@ -124,6 +140,21 @@ export const runJobs = async ({
124140
}),
125141
]
126142
} else {
143+
let defaultProcessingOrder: Sort =
144+
req.payload.collections[jobsCollectionSlug].config.defaultSort ?? 'createdAt'
145+
146+
const processingOrderConfig = req.payload.config.jobs?.processingOrder
147+
if (typeof processingOrderConfig === 'function') {
148+
defaultProcessingOrder = await processingOrderConfig(args)
149+
} else if (typeof processingOrderConfig === 'object' && !Array.isArray(processingOrderConfig)) {
150+
if (queue && processingOrderConfig.queues && processingOrderConfig.queues[queue]) {
151+
defaultProcessingOrder = processingOrderConfig.queues[queue]
152+
} else if (processingOrderConfig.default) {
153+
defaultProcessingOrder = processingOrderConfig.default
154+
}
155+
} else if (typeof processingOrderConfig === 'string') {
156+
defaultProcessingOrder = processingOrderConfig
157+
}
127158
const updatedDocs = await updateJobs({
128159
data: {
129160
processing: true,
@@ -133,6 +164,7 @@ export const runJobs = async ({
133164
limit,
134165
req,
135166
returning: true,
167+
sort: processingOrder ?? defaultProcessingOrder,
136168
where,
137169
})
138170

@@ -175,7 +207,7 @@ export const runJobs = async ({
175207
? []
176208
: undefined
177209

178-
const jobPromises = jobsQuery.docs.map(async (job) => {
210+
const runSingleJob = async (job) => {
179211
if (!job.workflowSlug && !job.taskSlug) {
180212
throw new Error('Job must have either a workflowSlug or a taskSlug')
181213
}
@@ -257,9 +289,20 @@ export const runJobs = async ({
257289

258290
return { id: job.id, result }
259291
}
260-
})
292+
}
261293

262-
const resultsArray = await Promise.all(jobPromises)
294+
let resultsArray: { id: number | string; result: RunJobResult }[] = []
295+
if (sequential) {
296+
for (const job of jobsQuery.docs) {
297+
const result = await runSingleJob(job)
298+
if (result !== null) {
299+
resultsArray.push(result)
300+
}
301+
}
302+
} else {
303+
const jobPromises = jobsQuery.docs.map(runSingleJob)
304+
resultsArray = await Promise.all(jobPromises)
305+
}
263306

264307
if (jobsToDelete && jobsToDelete.length > 0) {
265308
try {

test/queues/config.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js'
2424
import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js'
2525
import { workflowRetries2TasksRetries0Workflow } from './workflows/workflowRetries2TasksRetries0.js'
2626
import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workflowRetries2TasksRetriesUndefined.js'
27+
import { inlineTaskTestDelayedWorkflow } from './workflows/inlineTaskTestDelayed.js'
2728
import { parallelTaskWorkflow } from './workflows/parallelTaskWorkflow.js'
2829

2930
const filename = fileURLToPath(import.meta.url)
@@ -104,6 +105,11 @@ export default buildConfigWithDefaults({
104105
},
105106
}
106107
},
108+
processingOrder: {
109+
queues: {
110+
lifo: '-createdAt',
111+
},
112+
},
107113
tasks: [
108114
{
109115
retries: 2,
@@ -376,6 +382,7 @@ export default buildConfigWithDefaults({
376382
workflowRetries2TasksRetriesUndefinedWorkflow,
377383
workflowRetries2TasksRetries0Workflow,
378384
inlineTaskTestWorkflow,
385+
inlineTaskTestDelayedWorkflow,
379386
externalWorkflow,
380387
retriesBackoffTestWorkflow,
381388
subTaskWorkflow,

0 commit comments

Comments
 (0)