Skip to content

Commit b1ef28d

Browse files
authored
feat: allow where in payload.jobs.run (#9877)
Example: ```ts await payload.jobs.queue({ task: 'MyTask', input: { message: `secret`, }, }) await payload.jobs.run({ where: { 'input.message': { equals: 'secret' } } }) ```
1 parent 09246a4 commit b1ef28d

File tree

4 files changed

+98
-0
lines changed

4 files changed

+98
-0
lines changed

docs/jobs-queue/queues.mdx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ const results = await payload.jobs.run()
105105

106106
// You can customize the queue name and limit by passing them as arguments:
107107
await payload.jobs.run({ queue: 'nightly', limit: 100 })
108+
109+
// You can provide a where clause to filter the jobs that should be run:
110+
await payload.jobs.run({ where: { 'input.message': { equals: 'secret' } } })
108111
```
109112

110113
**Run a single job:**

packages/payload/src/queues/localAPI.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
type PayloadRequest,
77
type RunningJob,
88
type TypedJobs,
9+
type Where,
910
} from '../index.js'
1011
import { runJobs } from './operations/runJobs/index.js'
1112

@@ -70,13 +71,15 @@ export const getJobsLocalAPI = (payload: Payload) => ({
7071
overrideAccess?: boolean
7172
queue?: string
7273
req?: PayloadRequest
74+
where?: Where
7375
}): Promise<ReturnType<typeof runJobs>> => {
7476
const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload))
7577
const result = await runJobs({
7678
limit: args?.limit,
7779
overrideAccess: args?.overrideAccess !== false,
7880
queue: args?.queue,
7981
req: newReq,
82+
where: args?.where,
8083
})
8184
return result
8285
},

packages/payload/src/queues/operations/runJobs/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export type RunJobsArgs = {
2525
overrideAccess?: boolean
2626
queue?: string
2727
req: PayloadRequest
28+
where?: Where
2829
}
2930

3031
export type RunJobsResult = {
@@ -45,6 +46,7 @@ export const runJobs = async ({
4546
overrideAccess,
4647
queue,
4748
req,
49+
where: whereFromProps,
4850
}: RunJobsArgs): Promise<RunJobsResult> => {
4951
if (!overrideAccess) {
5052
const hasAccess = await req.payload.config.jobs.access.run({ req })
@@ -94,6 +96,10 @@ export const runJobs = async ({
9496
})
9597
}
9698

99+
if (whereFromProps) {
100+
where.and.push(whereFromProps)
101+
}
102+
97103
// Find all jobs and ensure we set job to processing: true as early as possible to reduce the chance of
98104
// the same job being picked up by another worker
99105
const jobsQuery: {

test/queues/int.spec.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -965,4 +965,90 @@ describe('Queues', () => {
965965
expect(allCompletedJobs.totalDocs).toBe(1)
966966
expect(allCompletedJobs.docs[0].id).toBe(lastJobID)
967967
})
968+
969+
it('ensure where query for id in payload.jobs.run works and only runs the specified job', async () => {
970+
payload.config.jobs.deleteJobOnComplete = false
971+
972+
let lastJobID: string = null
973+
for (let i = 0; i < 3; i++) {
974+
const job = await payload.jobs.queue({
975+
task: 'CreateSimple',
976+
input: {
977+
message: 'from single task',
978+
},
979+
})
980+
lastJobID = job.id
981+
}
982+
983+
await payload.jobs.run({
984+
where: {
985+
id: {
986+
equals: lastJobID,
987+
},
988+
},
989+
})
990+
991+
const allSimples = await payload.find({
992+
collection: 'simple',
993+
limit: 100,
994+
})
995+
996+
expect(allSimples.totalDocs).toBe(1)
997+
expect(allSimples.docs[0].title).toBe('from single task')
998+
999+
const allCompletedJobs = await payload.find({
1000+
collection: 'payload-jobs',
1001+
limit: 100,
1002+
where: {
1003+
completedAt: {
1004+
exists: true,
1005+
},
1006+
},
1007+
})
1008+
1009+
expect(allCompletedJobs.totalDocs).toBe(1)
1010+
expect(allCompletedJobs.docs[0].id).toBe(lastJobID)
1011+
})
1012+
1013+
it('ensure where query for input data in payload.jobs.run works and only runs the specified job', async () => {
1014+
payload.config.jobs.deleteJobOnComplete = false
1015+
1016+
for (let i = 0; i < 3; i++) {
1017+
await payload.jobs.queue({
1018+
task: 'CreateSimple',
1019+
input: {
1020+
message: `from single task ${i}`,
1021+
},
1022+
})
1023+
}
1024+
1025+
await payload.jobs.run({
1026+
where: {
1027+
'input.message': {
1028+
equals: 'from single task 2',
1029+
},
1030+
},
1031+
})
1032+
1033+
const allSimples = await payload.find({
1034+
collection: 'simple',
1035+
limit: 100,
1036+
})
1037+
1038+
expect(allSimples.totalDocs).toBe(1)
1039+
expect(allSimples.docs[0].title).toBe('from single task 2')
1040+
1041+
const allCompletedJobs = await payload.find({
1042+
collection: 'payload-jobs',
1043+
limit: 100,
1044+
where: {
1045+
completedAt: {
1046+
exists: true,
1047+
},
1048+
},
1049+
})
1050+
1051+
expect(allCompletedJobs.totalDocs).toBe(1)
1052+
expect((allCompletedJobs.docs[0].input as any).message).toBe('from single task 2')
1053+
})
9681054
})

0 commit comments

Comments
 (0)