Skip to content

Commit 5a52888

Browse files
chore: wip
1 parent 29d9587 commit 5a52888

File tree

4 files changed

+51
-42
lines changed

4 files changed

+51
-42
lines changed

app/Jobs/ExampleJob.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ export default new Job({
77
tries: 3, // optional, defaults to 3 retries, in case of failures
88
backoff: 3, // optional, defaults to 3-second delays between retries
99
rate: Every.Minute, // optional, '* * * * *' in cron syntax
10-
handle: () => {
11-
console.log('hello world')
10+
handle: (payload: any) => {
11+
console.log(payload)
1212
},
1313
// action: 'SendWelcomeEmail', // instead of handle, you may target an action or `action: () => {`
1414
})

storage/framework/core/actions/src/queue/work.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ const options = parseArgs()
66

77
const queue = options.queue
88

9-
console.log(queue)
10-
119
const result = await processJobs(queue)
1210

1311
if (result?.isErr()) {

storage/framework/core/queue/src/job.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { storeJob } from './utils'
77
const queueDriver = 'database'
88

99
interface JobConfig {
10-
handle?: () => Promise<void>
10+
handle?: (payload?: any) => Promise<void>
1111
action?: string | (() => Promise<void>)
1212
}
1313

@@ -66,7 +66,7 @@ export async function runJob(name: string, options: JobOptions = {}): Promise<vo
6666
}
6767
// If handle is defined, execute it
6868
else if (job.handle) {
69-
await job.handle()
69+
await job.handle(options.payload)
7070
}
7171
// If no handle or action, try to execute the module directly
7272
else if (typeof jobModule.default === 'function') {

storage/framework/core/queue/src/process.ts

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { ok, type Ok } from '@stacksjs/error-handling'
22
import { log } from '@stacksjs/logging'
3-
import { Job } from '../../../orm/src/models/Job'
3+
import { Job, type JobModel } from '../../../orm/src/models/Job'
44
import { runJob } from './job'
55

66
interface QueuePayload {
@@ -12,51 +12,62 @@ interface QueuePayload {
1212
}
1313

1414
export async function processJobs(queue: string | undefined): Promise<Ok<string, never>> {
15-
setInterval(async () => {
16-
await executeJobs(queue)
17-
}, 1000)
15+
async function process() {
16+
try {
17+
await executeJobs(queue)
18+
} catch (error) {
19+
log.error('Error processing jobs:', error)
20+
}
21+
22+
setTimeout(process, 1000)
23+
}
24+
25+
process()
1826

19-
return ok('All jobs processed successfully!')
27+
return ok('Job processing has started successfully!')
2028
}
2129

2230
async function executeJobs(queue: string | undefined): Promise<void> {
23-
const jobs = await Job.when(queue !== undefined, (query: any) => {
24-
return query.where('queue', queue)
25-
}).get()
31+
const jobs = await Job.when(queue !== undefined, (query: any) => query.where('queue', queue)).get()
2632

2733
for (const job of jobs) {
28-
if (job.payload) {
29-
if (job.available_at && job.available_at > timestampNow())
30-
return
31-
32-
const payload: QueuePayload = JSON.parse(job.payload)
33-
const currentAttempts = job.attempts || 0
34-
35-
log.info(`Running ${payload.displayName}`)
36-
37-
await job.update({ attempts: currentAttempts + 1 })
38-
39-
try {
40-
await runJob(payload.name, {
41-
queue: job.queue,
42-
payload: {},
43-
context: '',
44-
maxTries: payload.maxTries,
45-
timeout: 60,
46-
})
47-
48-
await job.delete()
49-
}
50-
catch (error) {
51-
log.info(`${payload.displayName} failed`)
52-
log.error(error)
53-
}
54-
55-
log.info(`Successfully ran ${payload.displayName}`)
34+
35+
if (!job.payload) continue
36+
37+
if (job.available_at && job.available_at > timestampNow()) continue
38+
39+
const payload: QueuePayload = JSON.parse(job.payload)
40+
const currentAttempts = job.attempts || 0
41+
42+
log.info(`Running job: ${payload.displayName}`)
43+
44+
await updateJobAttempts(job, currentAttempts)
45+
46+
try {
47+
await runJob(payload.name, {
48+
queue: job.queue,
49+
payload: {},
50+
context: '',
51+
maxTries: payload.maxTries,
52+
timeout: 60,
53+
})
54+
55+
await job.delete()
56+
log.info(`Successfully ran job: ${payload.displayName}`)
57+
} catch (error) {
58+
log.error(`Job failed: ${payload.displayName}`, error)
5659
}
5760
}
5861
}
5962

63+
async function updateJobAttempts(job: any, currentAttempts: number): Promise<void> {
64+
try {
65+
await job.update({ attempts: currentAttempts + 1 })
66+
} catch (error) {
67+
log.error('Failed to update job attempts:', error)
68+
}
69+
}
70+
6071
function timestampNow(): number {
6172
const now = Date.now()
6273
return Math.floor(now / 1000)

0 commit comments

Comments
 (0)