Skip to content

Commit 032c424

Browse files
perf: use direct db calls in job-queue system (#11489)
Previously, our job queue system relied on `payload.*` operations, which ran very frequently: - whenever job execution starts, as all jobs need to be set to `processing: true` - every single time a task completes or fails, as the job log needs to be updated - whenever job execution stops, to mark it as completed and to delete it (if `deleteJobOnComplete` is set) This PR replaces these with direct `payload.db.*` calls, which are significantly faster than payload operations. Given how often the job queue system communicates with the database, this should be a massive performance improvement. ## How it affects running hooks To generate the task status, we previously used an `afterRead` hook. Since direct db adapter calls no longer execute hooks, this PR introduces new `updateJob` and `updateJobs` helpers to handle task status generation outside the normal payload hook lifecycle. Additionally, a new `runHooks` property has been added to the global job configuration. While setting this to `true` can be useful if custom hooks were added to the `payload-jobs` collection config, this will revert the job system to use normal payload operations. This should be avoided as it degrades performance. In most cases, the `onSuccess` or `onFail` properties in the job config will be sufficient and much faster. Furthermore, if the `depth` property is set in the global job configuration, the job queue system will also fall back to the slower, normal payload operations. --------- Co-authored-by: Dan Ribbens <DanRibbens@users.noreply.github.com>
1 parent 43cdccd commit 032c424

20 files changed

+299
-72
lines changed

packages/payload/src/config/sanitize.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,20 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise<SanitizedC
286286
defaultJobsCollection = configWithDefaults.jobs.jobsCollectionOverrides({
287287
defaultJobsCollection,
288288
})
289+
290+
const hooks = defaultJobsCollection?.hooks
291+
// @todo - delete this check in 4.0
292+
if (hooks && config?.jobs?.runHooks !== true) {
293+
for (const hook of Object.keys(hooks)) {
294+
const defaultAmount = hook === 'afterRead' || hook === 'beforeChange' ? 1 : 0
295+
if (hooks[hook]?.length > defaultAmount) {
296+
console.warn(
297+
`The jobsCollectionOverrides function is returning a collection with an additional ${hook} hook defined. These hooks will not run unless the jobs.runHooks option is set to true. Setting this option to true will negatively impact performance.`,
298+
)
299+
break
300+
}
301+
}
302+
}
289303
}
290304
const sanitizedJobsCollection = await sanitizeCollection(
291305
config as unknown as Config,

packages/payload/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1390,7 +1390,9 @@ export type {
13901390
PreferenceUpdateRequest,
13911391
TabsPreferences,
13921392
} from './preferences/types.js'
1393+
export { jobAfterRead } from './queues/config/index.js'
13931394
export type { JobsConfig, RunJobAccess, RunJobAccessArgs } from './queues/config/types/index.js'
1395+
13941396
export type {
13951397
RunInlineTaskFunction,
13961398
RunTaskFunction,

packages/payload/src/queues/config/index.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { CollectionConfig } from '../../collections/config/types.js'
2-
import type { Config } from '../../config/types.js'
2+
import type { Config, SanitizedConfig } from '../../config/types.js'
33
import type { Field } from '../../fields/config/types.js'
4+
import type { BaseJob } from './types/workflowTypes.js'
45

56
import { runJobsEndpoint } from '../restEndpointRun.js'
67
import { getJobTaskStatus } from '../utilities/getJobTaskStatus.js'
@@ -211,12 +212,7 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
211212
({ doc, req }) => {
212213
// This hook is used to add the virtual `tasks` field to the document, that is computed from the `log` field
213214

214-
doc.taskStatus = getJobTaskStatus({
215-
jobLog: doc.log,
216-
tasksConfig: req.payload.config.jobs.tasks,
217-
})
218-
219-
return doc
215+
return jobAfterRead({ config: req.payload.config, doc })
220216
},
221217
],
222218
/**
@@ -240,3 +236,11 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
240236

241237
return jobsCollection
242238
}
239+
240+
export function jobAfterRead({ config, doc }: { config: SanitizedConfig; doc: BaseJob }): BaseJob {
241+
doc.taskStatus = getJobTaskStatus({
242+
jobLog: doc.log || [],
243+
tasksConfig: config.jobs.tasks,
244+
})
245+
return doc
246+
}

packages/payload/src/queues/config/types/index.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,27 @@ export type JobsConfig = {
6868
/**
6969
* Specify depth for retrieving jobs from the queue.
7070
* This should be as low as possible in order for job retrieval
71-
* to be as efficient as possible. Defaults to 0.
71+
* to be as efficient as possible. Setting it to anything higher than
72+
* 0 will drastically affect performance, as less efficient database
73+
* queries will be used.
74+
*
75+
* @default 0
7276
*/
7377
depth?: number
7478
/**
7579
* Override any settings on the default Jobs collection. Accepts the default collection and allows you to return
7680
* a new collection.
7781
*/
7882
jobsCollectionOverrides?: (args: { defaultJobsCollection: CollectionConfig }) => CollectionConfig
83+
/**
84+
* By default, the job system uses direct database calls for optimal performance.
85+
* If you added custom hooks to your jobs collection, you can set this to true to
86+
* use the standard Payload API for all job operations. This is discouraged, as it will
87+
* drastically affect performance.
88+
*
89+
* @default false
90+
*/
91+
runHooks?: boolean
7992
/**
8093
* A function that will be executed before Payload picks up jobs which are configured by the `jobs.autorun` function.
8194
* If this function returns true, jobs will be queried and picked up. If it returns false, jobs will not be run.

packages/payload/src/queues/localAPI.ts

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ import {
88
type TypedJobs,
99
type Where,
1010
} from '../index.js'
11-
import { jobsCollectionSlug } from './config/index.js'
11+
import { jobAfterRead, jobsCollectionSlug } from './config/index.js'
1212
import { runJobs } from './operations/runJobs/index.js'
13+
import { updateJob, updateJobs } from './utilities/updateJob.js'
1314

1415
export const getJobsLocalAPI = (payload: Payload) => ({
1516
queue: async <
@@ -72,13 +73,27 @@ export const getJobsLocalAPI = (payload: Payload) => ({
7273
data.taskSlug = args.task as string
7374
}
7475

75-
return (await payload.create({
76-
collection: jobsCollectionSlug,
77-
data,
78-
req: args.req,
79-
})) as TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
76+
type ReturnType = TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
8077
? RunningJob<TTaskOrWorkflowSlug>
8178
: RunningJobFromTask<TTaskOrWorkflowSlug> // Type assertion is still needed here
79+
80+
if (payload?.config?.jobs?.depth || payload?.config?.jobs?.runHooks) {
81+
return (await payload.create({
82+
collection: jobsCollectionSlug,
83+
data,
84+
depth: payload.config.jobs.depth ?? 0,
85+
req: args.req,
86+
})) as ReturnType
87+
} else {
88+
return jobAfterRead({
89+
config: payload.config,
90+
doc: await payload.db.create({
91+
collection: jobsCollectionSlug,
92+
data,
93+
req: args.req,
94+
}),
95+
}) as unknown as ReturnType
96+
}
8297
},
8398

8499
run: async (args?: {
@@ -143,8 +158,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({
143158
})
144159
}
145160

146-
await payload.db.updateMany({
147-
collection: jobsCollectionSlug,
161+
await updateJobs({
148162
data: {
149163
completedAt: null,
150164
error: {
@@ -153,13 +167,14 @@ export const getJobsLocalAPI = (payload: Payload) => ({
153167
hasError: true,
154168
processing: false,
155169
waitUntil: null,
156-
} as Partial<
157-
{
158-
completedAt: null
159-
waitUntil: null
160-
} & BaseJob
161-
>,
170+
} as {
171+
completedAt: null
172+
waitUntil: null
173+
} & BaseJob,
174+
depth: 0, // No depth, since we're not returning
175+
disableTransaction: true,
162176
req: newReq,
177+
returning: false,
163178
where: { and },
164179
})
165180
},
@@ -171,9 +186,8 @@ export const getJobsLocalAPI = (payload: Payload) => ({
171186
}): Promise<void> => {
172187
const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload))
173188

174-
await payload.db.updateOne({
189+
await updateJob({
175190
id: args.id,
176-
collection: jobsCollectionSlug,
177191
data: {
178192
completedAt: null,
179193
error: {
@@ -186,7 +200,10 @@ export const getJobsLocalAPI = (payload: Payload) => ({
186200
completedAt: null
187201
waitUntil: null
188202
} & BaseJob,
203+
depth: 0, // No depth, since we're not returning
204+
disableTransaction: true,
189205
req: newReq,
206+
returning: false,
190207
})
191208
},
192209
})

packages/payload/src/queues/operations/runJobs/index.ts

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import type { RunJobResult } from './runJob/index.js'
1313
import { Forbidden } from '../../../errors/Forbidden.js'
1414
import isolateObjectProperty from '../../../utilities/isolateObjectProperty.js'
1515
import { jobsCollectionSlug } from '../../config/index.js'
16+
import { updateJob, updateJobs } from '../../utilities/updateJob.js'
1617
import { getUpdateJobFunction } from './runJob/getUpdateJobFunction.js'
1718
import { importHandlerPath } from './runJob/importHandlerPath.js'
1819
import { runJob } from './runJob/index.js'
@@ -106,40 +107,45 @@ export const runJobs = async ({
106107
// the same job being picked up by another worker
107108
const jobsQuery: {
108109
docs: BaseJob[]
109-
} = id
110-
? {
111-
docs: [
112-
(await req.payload.update({
113-
id,
114-
collection: jobsCollectionSlug,
115-
data: {
116-
processing: true,
117-
seenByWorker: true,
118-
},
119-
depth: req.payload.config.jobs.depth,
120-
disableTransaction: true,
121-
showHiddenFields: true,
122-
})) as BaseJob,
123-
],
124-
}
125-
: ((await req.payload.update({
126-
collection: jobsCollectionSlug,
110+
} = { docs: [] }
111+
112+
if (id) {
113+
// Only one job to run
114+
jobsQuery.docs = [
115+
await updateJob({
116+
id,
127117
data: {
128118
processing: true,
129-
seenByWorker: true,
130119
},
131120
depth: req.payload.config.jobs.depth,
132121
disableTransaction: true,
133-
limit,
134-
showHiddenFields: true,
135-
where,
136-
})) as unknown as PaginatedDocs<BaseJob>)
122+
req,
123+
returning: true,
124+
}),
125+
]
126+
} else {
127+
const updatedDocs = await updateJobs({
128+
data: {
129+
processing: true,
130+
},
131+
depth: req.payload.config.jobs.depth,
132+
disableTransaction: true,
133+
limit,
134+
req,
135+
returning: true,
136+
where,
137+
})
138+
139+
if (updatedDocs) {
140+
jobsQuery.docs = updatedDocs
141+
}
142+
}
137143

138144
/**
139145
* Just for logging purposes, we want to know how many jobs are new and how many are existing (= already been tried).
140146
* This is only for logs - in the end we still want to run all jobs, regardless of whether they are new or existing.
141147
*/
142-
const { newJobs } = jobsQuery.docs.reduce(
148+
const { existingJobs, newJobs } = jobsQuery.docs.reduce(
143149
(acc, job) => {
144150
if (job.totalTried > 0) {
145151
acc.existingJobs.push(job)
@@ -159,7 +165,11 @@ export const runJobs = async ({
159165
}
160166

161167
if (jobsQuery?.docs?.length) {
162-
req.payload.logger.info(`Running ${jobsQuery.docs.length} jobs.`)
168+
req.payload.logger.info({
169+
msg: `Running ${jobsQuery.docs.length} jobs.`,
170+
new: newJobs?.length,
171+
retrying: existingJobs?.length,
172+
})
163173
}
164174
const jobsToDelete: (number | string)[] | undefined = req.payload.config.jobs.deleteJobOnComplete
165175
? []
@@ -253,11 +263,19 @@ export const runJobs = async ({
253263

254264
if (jobsToDelete && jobsToDelete.length > 0) {
255265
try {
256-
await req.payload.delete({
257-
collection: jobsCollectionSlug,
258-
req,
259-
where: { id: { in: jobsToDelete } },
260-
})
266+
if (req.payload.config.jobs.runHooks) {
267+
await req.payload.delete({
268+
collection: jobsCollectionSlug,
269+
depth: 0, // can be 0 since we're not returning anything
270+
disableTransaction: true,
271+
where: { id: { in: jobsToDelete } },
272+
})
273+
} else {
274+
await req.payload.db.deleteMany({
275+
collection: jobsCollectionSlug,
276+
where: { id: { in: jobsToDelete } },
277+
})
278+
}
261279
} catch (err) {
262280
req.payload.logger.error({
263281
err,

packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,29 @@
22
import type { PayloadRequest } from '../../../../types/index.js'
33
import type { BaseJob } from '../../../config/types/workflowTypes.js'
44

5-
import { jobsCollectionSlug } from '../../../config/index.js'
5+
import { updateJob } from '../../../utilities/updateJob.js'
66

77
export type UpdateJobFunction = (jobData: Partial<BaseJob>) => Promise<BaseJob>
88

99
export function getUpdateJobFunction(job: BaseJob, req: PayloadRequest): UpdateJobFunction {
1010
return async (jobData) => {
11-
const updatedJob = (await req.payload.update({
11+
const updatedJob = await updateJob({
1212
id: job.id,
13-
collection: jobsCollectionSlug,
1413
data: jobData,
15-
depth: 0,
14+
depth: req.payload.config.jobs.depth,
1615
disableTransaction: true,
17-
})) as BaseJob
16+
req,
17+
})
1818

1919
// Update job object like this to modify the original object - that way, incoming changes (e.g. taskStatus field that will be re-generated through the hook) will be reflected in the calling function
2020
for (const key in updatedJob) {
2121
job[key] = updatedJob[key]
2222
}
2323

24-
if ((updatedJob.error as any)?.cancelled) {
25-
throw new Error('Job cancelled')
24+
if ((updatedJob.error as Record<string, unknown>)?.cancelled) {
25+
const cancelledError = new Error('Job cancelled') as { cancelled: boolean } & Error
26+
cancelledError.cancelled = true
27+
throw cancelledError
2628
}
2729

2830
return updatedJob

packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export function handleWorkflowError({
2626
} {
2727
const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}`
2828

29-
let hasFinalError = state.reachedMaxRetries // If any TASK reached max retries, the job has an error
29+
let hasFinalError = state.reachedMaxRetries || !!('cancelled' in error && error.cancelled) // If any TASK reached max retries, the job has an error
3030
const maxWorkflowRetries: number =
3131
(typeof workflowConfig.retries === 'object'
3232
? workflowConfig.retries.attempts

packages/payload/src/queues/operations/runJobs/runJob/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ export const runJob = async ({
6060
const errorJSON = hasFinalError
6161
? {
6262
name: err.name,
63+
cancelled: Boolean('cancelled' in err && err.cancelled),
6364
message: err.message,
6465
stack: err.stack,
6566
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import ObjectIdImport from 'bson-objectid'
2+
3+
import type { BaseJob } from '../config/types/workflowTypes.js'
4+
5+
const ObjectId = (ObjectIdImport.default ||
6+
ObjectIdImport) as unknown as typeof ObjectIdImport.default
7+
8+
/**
9+
* Our payload operations sanitize the input data to, for example, add missing IDs to array rows.
10+
* This function is used to manually sanitize the data for direct db adapter operations
11+
*/
12+
export function sanitizeUpdateData({ data }: { data: Partial<BaseJob> }): Partial<BaseJob> {
13+
if (data.log) {
14+
const sanitizedData = { ...data }
15+
sanitizedData.log = sanitizedData?.log?.map((log) => {
16+
if (log.id) {
17+
return log
18+
}
19+
return {
20+
...log,
21+
id: new ObjectId().toHexString(),
22+
}
23+
})
24+
return sanitizedData
25+
}
26+
27+
return data
28+
}

0 commit comments

Comments
 (0)