Skip to content

Commit 42176e0

Browse files
chore: wip
1 parent b1c0c17 commit 42176e0

File tree

2 files changed

+70
-4
lines changed

2 files changed

+70
-4
lines changed

app/Jobs/ExampleJob.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { HttpError } from '@stacksjs/error-handling'
1+
// import { HttpError } from '@stacksjs/error-handling'
22
import { Job } from '@stacksjs/queue'
33
import { Every } from '@stacksjs/types'
44

@@ -14,7 +14,7 @@ export default new Job({
1414
factor: 3,
1515
},
1616
handle: (payload: any) => {
17-
throw new HttpError(500, 'test')
17+
console.log('test')
1818
},
1919
// action: 'SendWelcomeEmail', // instead of handle, you may target an action or `action: () => {`
2020
})

storage/framework/core/queue/src/process.ts

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ import type { JitterConfig, JobOptions } from '@stacksjs/types'
22
import type { JobModel } from '../../../orm/src/models/Job'
33
import { ok, type Ok } from '@stacksjs/error-handling'
44
import { log } from '@stacksjs/logging'
5-
import FailedJob from '../../../orm/src/models/FailedJob'
6-
import { Job } from '../../../orm/src/models/Job'
5+
import { Job, FailedJob } from '@stacksjs/orm'
76
import { runJob } from './job'
87

98
interface QueuePayload {
@@ -28,6 +27,73 @@ export async function processJobs(queue: string | undefined): Promise<Ok<string,
2827
return ok('Job processing has started successfully!')
2928
}
3029

30+
export async function executeFailedJobs(): Promise<void> {
31+
const failedJobs = await FailedJob.all()
32+
33+
for (const job of failedJobs) {
34+
if (!job.payload)
35+
continue
36+
37+
const body: QueuePayload = JSON.parse(job.payload)
38+
const jobPayload = JSON.parse(job.payload) as QueuePayload
39+
40+
const classPayload = JSON.parse(jobPayload.classPayload) as JobOptions
41+
42+
const maxTries = Number(classPayload.tries || 3)
43+
44+
log.info(`Retrying job: ${body.path}`)
45+
46+
try {
47+
await runJob(body.name, {
48+
queue: job.queue,
49+
payload: body.params,
50+
context: '',
51+
maxTries,
52+
timeout: 60,
53+
})
54+
55+
await job.delete()
56+
57+
log.info(`Successfully ran job: ${body.path}`)
58+
}
59+
catch (error) {
60+
console.log(error)
61+
}
62+
}
63+
}
64+
65+
export async function retryFailedJob(id: number): Promise<void> {
66+
const failedJob = await FailedJob.find(id)
67+
68+
if (failedJob && failedJob.payload) {
69+
const body: QueuePayload = JSON.parse(failedJob.payload)
70+
const jobPayload = JSON.parse(failedJob.payload) as QueuePayload
71+
72+
const classPayload = JSON.parse(jobPayload.classPayload) as JobOptions
73+
74+
const maxTries = Number(classPayload.tries || 3)
75+
76+
log.info(`Retrying job: ${body.path}`)
77+
78+
try {
79+
await runJob(body.name, {
80+
queue: failedJob.queue,
81+
payload: body.params,
82+
context: '',
83+
maxTries,
84+
timeout: 60,
85+
})
86+
87+
await failedJob.delete()
88+
89+
log.info(`Successfully ran job: ${body.path}`)
90+
}
91+
catch (error) {
92+
console.log(error)
93+
}
94+
}
95+
}
96+
3197
async function executeJobs(queue: string | undefined): Promise<void> {
3298
const jobs = await Job.when(queue !== undefined, (query: JobModel) => query.where('queue', queue)).get()
3399

0 commit comments

Comments
 (0)