-
Notifications
You must be signed in to change notification settings - Fork 1
/
QueueService.groovy
117 lines (98 loc) · 3 KB
/
QueueService.groovy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package gittig
import org.codehaus.groovy.grails.support.PersistenceContextInterceptor
class QueueService {
static transactional = false
def gitService
def grailsApplication
PersistenceContextInterceptor persistenceInterceptor
def taskExecutor
def list(params) {
// list completed/cancelled/discarded/error jobs from this date on
def queueTimeout = grailsApplication.config.app.queueTimeout
def date = new Date(System.currentTimeMillis() - (1000 * queueTimeout))
HookJob.withCriteria {
or {
eq('status', HookJob.HookJobStatus.QUEUED)
eq('status', HookJob.HookJobStatus.WAITING)
eq('status', HookJob.HookJobStatus.RUNNING)
ge('dateCreated', date)
}
}
}
def enqueue(url) {
new HookJob(url: url, status: HookJob.HookJobStatus.QUEUED).save(failOnError: true, flush: true)
}
def dequeue() {
HookJob.withNewSession {
def queued = HookJob.executeQuery "from HookJob j where j.status = ? and j.url not in (select jr.url from HookJob jr where jr.status = ? or jr.status = ?)",
[HookJob.HookJobStatus.QUEUED, HookJob.HookJobStatus.WAITING, HookJob.HookJobStatus.RUNNING]
if (queued) {
def job = queued.first()
log.info "Dequeuing job for ${job.url}"
// initialize the progress and set to WAITING status
job.progress = new HookJobProgress(job: job)
job.status = HookJob.HookJobStatus.WAITING
job.save(failOnError: true, flush: true)
// discard other queue jobs of the same url
def discard = HookJob.withCriteria {
eq('status', HookJob.HookJobStatus.QUEUED)
eq('url', job.url)
}.each {
it.status = HookJob.HookJobStatus.DISCARDED
it.result = "${job.id}"
it.save(failOnError: true, flush: true)
}
return job
}
}
}
def dequeueAndRun() {
def job = dequeue()
if (job) {
// submit the job for background execution
def jobId = job.id
def url = job.url
taskExecutor.submit {
persistenceInterceptor.init()
try {
// set job to RUNNING state
HookJob.withNewSession {
def j = HookJob.get(jobId)
j.status = HookJob.HookJobStatus.RUNNING
j.save(failOnError: true, flush: true)
log.info "Running job for ${j.url}"
}
def status
def error
def result
try {
def hookJobProgressMonitor = new HookJobProgressMonitor(jobId)
result = gitService.cloneOrUpdate(url, hookJobProgressMonitor)
status = HookJob.HookJobStatus.COMPLETED
} catch (HookJobException e) {
status = HookJob.HookJobStatus.ERROR
error = e.message
}
// save the job final state
HookJob.withNewSession {
def j = HookJob.get(jobId)
j.status = status
j.error = error
j.result = result
j.save(failOnError: true, flush: true)
}
} finally {
persistenceInterceptor.flush()
persistenceInterceptor.destroy()
}
}
}
}
def cancel(id) {
HookJob.withNewSession {
def job = HookJob.get(id)
job.status = HookJob.HookJobStatus.CANCELLED
job.save(failOnError: true, flush: true)
}
}
}