@@ -4,6 +4,7 @@ import { log } from '@stacksjs/logging'
4
4
import FailedJob from '../../../orm/src/models/FailedJob'
5
5
import { Job } from '../../../orm/src/models/Job'
6
6
import { runJob } from './job'
7
+ import type { JobOptions } from '@stacksjs/types'
7
8
8
9
interface QueuePayload {
9
10
path : string
@@ -36,36 +37,51 @@ async function executeJobs(queue: string | undefined): Promise<void> {
36
37
const jobs = await Job . when ( queue !== undefined , ( query : JobModel ) => query . where ( 'queue' , queue ) ) . get ( )
37
38
38
39
for ( const job of jobs ) {
39
- if ( ! job . payload )
40
- continue
40
+ let currentAttempts = job . attempts || 1 // Assuming the job has an `attempts` field tracking its attempts
41
41
42
- if ( job . available_at && job . available_at > timestampNow ( ) )
43
- continue
42
+ if ( ! job . payload ) continue
43
+
44
+ if ( job . available_at && job . available_at > timestampNow ( ) ) continue
44
45
45
46
const body : QueuePayload = JSON . parse ( job . payload )
46
- const currentAttempts = job . attempts || 0
47
+ const classPayload = JSON . parse ( job . payload ) as JobOptions
48
+
49
+ const maxTries = Number ( classPayload . tries || 3 )
47
50
48
51
log . info ( `Running job: ${ body . path } ` )
49
52
53
+ // Increment attempts before running the job
50
54
await updateJobAttempts ( job , currentAttempts )
51
55
52
56
try {
57
+ // Run the job
53
58
await runJob ( body . name , {
54
59
queue : job . queue ,
55
60
payload : body . params ,
56
61
context : '' ,
57
- maxTries : body . maxTries ,
62
+ maxTries,
58
63
timeout : 60 ,
59
64
} )
60
65
66
+ // If job is successful, delete it
61
67
await job . delete ( )
62
68
log . info ( `Successfully ran job: ${ body . path } ` )
63
69
}
64
70
catch ( error ) {
65
- const stringifiedError = JSON . stringify ( error )
66
-
67
- storeFailedJob ( job , stringifiedError )
68
- log . error ( `Job failed: ${ body . path } ` , stringifiedError )
71
+ // Increment the attempt count
72
+ currentAttempts ++
73
+
74
+ if ( currentAttempts > maxTries ) {
75
+ // If attempts exceed maxTries, store as failed job and delete
76
+ const stringifiedError = JSON . stringify ( error )
77
+ storeFailedJob ( job , stringifiedError )
78
+ await job . delete ( ) // Delete job only after exceeding maxTries
79
+ log . error ( `Job failed after ${ maxTries } attempts: ${ body . path } ` , stringifiedError )
80
+ } else {
81
+ // If attempts are below maxTries, just update the job's attempt count
82
+ await updateJobAttempts ( job , currentAttempts )
83
+ log . error ( `Job failed, retrying... Attempt ${ currentAttempts } /${ maxTries } : ${ body . path } ` )
84
+ }
69
85
}
70
86
}
71
87
}
@@ -95,9 +111,9 @@ function now(): string {
95
111
return `${ year } -${ month } -${ day } ${ hours } :${ minutes } :${ seconds } `
96
112
}
97
113
98
- async function updateJobAttempts ( job : any , currentAttempts : number ) : Promise < void > {
114
+ async function updateJobAttempts ( job : JobModel , currentAttempts : number ) : Promise < void > {
99
115
try {
100
- await job . update ( { attempts : currentAttempts + 1 } )
116
+ await job . update ( { attempts : currentAttempts } )
101
117
}
102
118
catch ( error ) {
103
119
log . error ( 'Failed to update job attempts:' , error )
0 commit comments