Skip to content

Commit a5c3aa0

Browse files
authored
perf: reduce job queue db calls (#11846)
Continuation of #11489. This adds a new, optional `updateJobs` db adapter method that reduces the amount of database calls for the jobs queue. ## MongoDB ### Previous: running a set of 50 queued jobs - 1x db.find (= 1x `Model.paginate`) - 50x db.updateOne (= 50x `Model.findOneAndUpdate`) ### Now: running a set of 50 queued jobs - 1x db.updateJobs (= 1x `Model.find` and 1x `Model.updateMany`) **=> 51 db round trips before, 2 db round trips after** ### Previous: upon task completion - 1x db.find (= 1x `Model.paginate`) - 1x db.updateOne (= 1x `Model.findOneAndUpdate`) ### Now: upon task completion - 1x db.updateJobs (= 1x `Model.findOneAndUpdate`) **=> 2 db round trips before, 1 db round trip after** ## Drizzle (e.g. Postgres) ### running a set of 50 queued jobs - 1x db.query[tablename].findMany - 50x db.select - 50x upsertRow This is unaffected by this PR and will be addressed in a future PR
1 parent 74f935b commit a5c3aa0

File tree

12 files changed

+279
-47
lines changed

12 files changed

+279
-47
lines changed

packages/db-mongodb/src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import type {
1717
TypeWithVersion,
1818
UpdateGlobalArgs,
1919
UpdateGlobalVersionArgs,
20-
UpdateManyArgs,
2120
UpdateOneArgs,
2221
UpdateVersionArgs,
2322
} from 'payload'
@@ -55,6 +54,7 @@ import { commitTransaction } from './transactions/commitTransaction.js'
5554
import { rollbackTransaction } from './transactions/rollbackTransaction.js'
5655
import { updateGlobal } from './updateGlobal.js'
5756
import { updateGlobalVersion } from './updateGlobalVersion.js'
57+
import { updateJobs } from './updateJobs.js'
5858
import { updateMany } from './updateMany.js'
5959
import { updateOne } from './updateOne.js'
6060
import { updateVersion } from './updateVersion.js'
@@ -227,6 +227,7 @@ export function mongooseAdapter({
227227
mongoMemoryServer,
228228
sessions: {},
229229
transactionOptions: transactionOptions === false ? undefined : transactionOptions,
230+
updateJobs,
230231
updateMany,
231232
url,
232233
versions: {},

packages/db-mongodb/src/updateJobs.ts

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import type { MongooseUpdateQueryOptions } from 'mongoose'
2+
import type { BaseJob, UpdateJobs, Where } from 'payload'
3+
4+
import type { MongooseAdapter } from './index.js'
5+
6+
import { buildQuery } from './queries/buildQuery.js'
7+
import { getCollection } from './utilities/getEntity.js'
8+
import { getSession } from './utilities/getSession.js'
9+
import { handleError } from './utilities/handleError.js'
10+
import { transform } from './utilities/transform.js'
11+
12+
export const updateJobs: UpdateJobs = async function updateMany(
13+
this: MongooseAdapter,
14+
{ id, data, limit, req, returning, where: whereArg },
15+
) {
16+
const where = id ? { id: { equals: id } } : (whereArg as Where)
17+
18+
const { collectionConfig, Model } = getCollection({
19+
adapter: this,
20+
collectionSlug: 'payload-jobs',
21+
})
22+
23+
const options: MongooseUpdateQueryOptions = {
24+
lean: true,
25+
new: true,
26+
session: await getSession(this, req),
27+
}
28+
29+
let query = await buildQuery({
30+
adapter: this,
31+
collectionSlug: collectionConfig.slug,
32+
fields: collectionConfig.flattenedFields,
33+
where,
34+
})
35+
36+
transform({ adapter: this, data, fields: collectionConfig.fields, operation: 'write' })
37+
38+
let result: BaseJob[] = []
39+
40+
try {
41+
if (id) {
42+
if (returning === false) {
43+
await Model.updateOne(query, data, options)
44+
return null
45+
} else {
46+
const doc = await Model.findOneAndUpdate(query, data, options)
47+
result = doc ? [doc] : []
48+
}
49+
} else {
50+
if (typeof limit === 'number' && limit > 0) {
51+
const documentsToUpdate = await Model.find(
52+
query,
53+
{},
54+
{ ...options, limit, projection: { _id: 1 } },
55+
)
56+
if (documentsToUpdate.length === 0) {
57+
return null
58+
}
59+
60+
query = { _id: { $in: documentsToUpdate.map((doc) => doc._id) } }
61+
}
62+
63+
await Model.updateMany(query, data, options)
64+
65+
if (returning === false) {
66+
return null
67+
}
68+
69+
result = await Model.find(query, {}, options)
70+
}
71+
} catch (error) {
72+
handleError({ collection: collectionConfig.slug, error, req })
73+
}
74+
75+
transform({
76+
adapter: this,
77+
data: result,
78+
fields: collectionConfig.fields,
79+
operation: 'read',
80+
})
81+
82+
return result
83+
}

packages/db-postgres/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
rollbackTransaction,
3434
updateGlobal,
3535
updateGlobalVersion,
36+
updateJobs,
3637
updateMany,
3738
updateOne,
3839
updateVersion,
@@ -172,6 +173,7 @@ export function postgresAdapter(args: Args): DatabaseAdapterObj<PostgresAdapter>
172173
find,
173174
findGlobal,
174175
findGlobalVersions,
176+
updateJobs,
175177
// @ts-expect-error - vestiges of when tsconfig was not strict. Feel free to improve
176178
findOne,
177179
findVersions,

packages/db-sqlite/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import {
3434
rollbackTransaction,
3535
updateGlobal,
3636
updateGlobalVersion,
37+
updateJobs,
3738
updateMany,
3839
updateOne,
3940
updateVersion,
@@ -127,6 +128,7 @@ export function sqliteAdapter(args: Args): DatabaseAdapterObj<SQLiteAdapter> {
127128
tables: {},
128129
// @ts-expect-error - vestiges of when tsconfig was not strict. Feel free to improve
129130
transactionOptions: args.transactionOptions || undefined,
131+
updateJobs,
130132
updateMany,
131133
versionsSuffix: args.versionsSuffix || '_v',
132134

packages/db-vercel-postgres/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import {
3434
rollbackTransaction,
3535
updateGlobal,
3636
updateGlobalVersion,
37+
updateJobs,
3738
updateMany,
3839
updateOne,
3940
updateVersion,
@@ -138,6 +139,7 @@ export function vercelPostgresAdapter(args: Args = {}): DatabaseAdapterObj<Verce
138139
tables: {},
139140
tablesFilter: args.tablesFilter,
140141
transactionOptions: args.transactionOptions || undefined,
142+
updateJobs,
141143
versionsSuffix: args.versionsSuffix || '_v',
142144

143145
// DatabaseAdapter

packages/drizzle/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export { commitTransaction } from './transactions/commitTransaction.js'
3333
export { rollbackTransaction } from './transactions/rollbackTransaction.js'
3434
export { updateGlobal } from './updateGlobal.js'
3535
export { updateGlobalVersion } from './updateGlobalVersion.js'
36+
export { updateJobs } from './updateJobs.js'
3637
export { updateMany } from './updateMany.js'
3738
export { updateOne } from './updateOne.js'
3839
export { updateVersion } from './updateVersion.js'

packages/drizzle/src/updateJobs.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import type { UpdateJobs, Where } from 'payload'
2+
3+
import toSnakeCase from 'to-snake-case'
4+
5+
import type { DrizzleAdapter } from './types.js'
6+
7+
import { findMany } from './find/findMany.js'
8+
import { upsertRow } from './upsertRow/index.js'
9+
import { getTransaction } from './utilities/getTransaction.js'
10+
11+
export const updateJobs: UpdateJobs = async function updateMany(
12+
this: DrizzleAdapter,
13+
{ id, data, limit: limitArg, req, returning, sort: sortArg, where: whereArg },
14+
) {
15+
const whereToUse: Where = id ? { id: { equals: id } } : whereArg
16+
const limit = id ? 1 : limitArg
17+
18+
const db = await getTransaction(this, req)
19+
const collection = this.payload.collections['payload-jobs'].config
20+
const tableName = this.tableNameMap.get(toSnakeCase(collection.slug))
21+
const sort = sortArg !== undefined && sortArg !== null ? sortArg : collection.defaultSort
22+
23+
const jobs = await findMany({
24+
adapter: this,
25+
collectionSlug: 'payload-jobs',
26+
fields: collection.flattenedFields,
27+
limit: id ? 1 : limit,
28+
pagination: false,
29+
req,
30+
sort,
31+
tableName,
32+
where: whereToUse,
33+
})
34+
if (!jobs.docs.length) {
35+
return []
36+
}
37+
38+
const results = []
39+
40+
// TODO: We need to batch this to reduce the amount of db calls. This can get very slow if we are updating a lot of rows.
41+
for (const job of jobs.docs) {
42+
const updateData = {
43+
...job,
44+
...data,
45+
}
46+
47+
const result = await upsertRow({
48+
id: job.id,
49+
adapter: this,
50+
data: updateData,
51+
db,
52+
fields: collection.flattenedFields,
53+
ignoreResult: returning === false,
54+
operation: 'update',
55+
req,
56+
tableName,
57+
})
58+
results.push(result)
59+
}
60+
61+
if (returning === false) {
62+
return null
63+
}
64+
65+
return results
66+
}

packages/payload/src/database/createDatabaseAdapter.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type {
88
RollbackTransaction,
99
} from './types.js'
1010

11+
import { defaultUpdateJobs } from './defaultUpdateJobs.js'
1112
import { createMigration } from './migrations/createMigration.js'
1213
import { migrate } from './migrations/migrate.js'
1314
import { migrateDown } from './migrations/migrateDown.js'
@@ -31,6 +32,7 @@ export function createDatabaseAdapter<T extends BaseDatabaseAdapter>(
3132
| 'migrateReset'
3233
| 'migrateStatus'
3334
| 'migrationDir'
35+
| 'updateJobs'
3436
>,
3537
): T {
3638
return {
@@ -45,6 +47,7 @@ export function createDatabaseAdapter<T extends BaseDatabaseAdapter>(
4547
migrateReset,
4648
migrateStatus,
4749
rollbackTransaction,
50+
updateJobs: defaultUpdateJobs,
4851

4952
...args,
5053
// Ensure migrationDir is set
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import type { BaseJob, DatabaseAdapter } from '../index.js'
2+
import type { UpdateJobs } from './types.js'
3+
4+
import { jobsCollectionSlug } from '../queues/config/index.js'
5+
import { sanitizeUpdateData } from '../queues/utilities/sanitizeUpdateData.js'
6+
7+
export const defaultUpdateJobs: UpdateJobs = async function updateMany(
8+
this: DatabaseAdapter,
9+
{ id, data, limit, req, returning, where },
10+
) {
11+
const updatedJobs: BaseJob[] | null = []
12+
13+
const jobsToUpdate: BaseJob[] = (
14+
id
15+
? [
16+
await this.findOne({
17+
collection: jobsCollectionSlug,
18+
req,
19+
where: { id: { equals: id } },
20+
}),
21+
]
22+
: (
23+
await this.find({
24+
collection: jobsCollectionSlug,
25+
limit,
26+
pagination: false,
27+
req,
28+
where,
29+
})
30+
).docs
31+
).filter(Boolean) as BaseJob[]
32+
33+
if (!jobsToUpdate) {
34+
return null
35+
}
36+
37+
for (const job of jobsToUpdate) {
38+
const updateData = {
39+
...job,
40+
...data,
41+
}
42+
const updatedJob = await this.updateOne({
43+
id: job.id,
44+
collection: jobsCollectionSlug,
45+
data: sanitizeUpdateData({ data: updateData }),
46+
req,
47+
returning,
48+
})
49+
updatedJobs.push(updatedJob)
50+
}
51+
52+
return updatedJobs
53+
}

packages/payload/src/database/types.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { TypeWithID } from '../collections/config/types.js'
2-
import type { CollectionSlug, GlobalSlug } from '../index.js'
2+
import type { BaseJob, CollectionSlug, GlobalSlug } from '../index.js'
33
import type {
44
Document,
55
JoinQuery,
@@ -147,6 +147,8 @@ export interface BaseDatabaseAdapter {
147147

148148
updateGlobalVersion: UpdateGlobalVersion
149149

150+
updateJobs: UpdateJobs
151+
150152
updateMany: UpdateMany
151153

152154
updateOne: UpdateOne
@@ -540,6 +542,32 @@ export type UpdateManyArgs = {
540542

541543
export type UpdateMany = (args: UpdateManyArgs) => Promise<Document[] | null>
542544

545+
export type UpdateJobsArgs = {
546+
data: Record<string, unknown>
547+
req?: Partial<PayloadRequest>
548+
/**
549+
* If true, returns the updated documents
550+
*
551+
* @default true
552+
*/
553+
returning?: boolean
554+
} & (
555+
| {
556+
id: number | string
557+
limit?: never
558+
sort?: never
559+
where?: never
560+
}
561+
| {
562+
id?: never
563+
limit?: number
564+
sort?: Sort
565+
where: Where
566+
}
567+
)
568+
569+
export type UpdateJobs = (args: UpdateJobsArgs) => Promise<BaseJob[] | null>
570+
543571
export type UpsertArgs = {
544572
collection: CollectionSlug
545573
data: Record<string, unknown>

0 commit comments

Comments
 (0)