This repository has been archived by the owner on Nov 1, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
job.go
157 lines (136 loc) · 3.68 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package job
import (
"sync"
"github.com/go-kit/kit/log"
"github.com/weaveworks/flux/update"
)
type ID string
type JobFunc func(log.Logger) error
type Job struct {
ID ID
Do JobFunc
}
type StatusString string
const (
StatusQueued StatusString = "queued"
StatusRunning StatusString = "running"
StatusFailed StatusString = "failed"
StatusSucceeded StatusString = "succeeded"
)
// Result looks like CommitEventMetadata, because that's what we
// used to send. But in the interest of breaking cycles before
// they happen, it's (almost) duplicated here.
type Result struct {
Revision string `json:"revision,omitempty"`
Spec *update.Spec `json:"spec,omitempty"`
Result update.Result `json:"result,omitempty"`
}
// Status holds the possible states of a job; either,
// 1. queued or otherwise pending
// 2. succeeded with a job-specific result
// 3. failed, resulting in an error and possibly a job-specific result
type Status struct {
Result Result
Err string
StatusString StatusString
}
func (s Status) Error() string {
return s.Err
}
// Queue is an unbounded queue of jobs; enqueuing a job will always
// proceed, while dequeuing is done by receiving from a channel. It is
// also possible to iterate over the current list of jobs.
type Queue struct {
ready chan *Job
incoming chan *Job
waiting []*Job
waitingLock sync.Mutex
sync chan struct{}
}
func NewQueue(stop <-chan struct{}, wg *sync.WaitGroup) *Queue {
q := &Queue{
ready: make(chan *Job),
incoming: make(chan *Job),
waiting: make([]*Job, 0),
sync: make(chan struct{}),
}
wg.Add(1)
go q.loop(stop, wg)
return q
}
// This is not guaranteed to be up-to-date; i.e., it is possible to
// receive from `q.Ready()` or enqueue an item, then see the same
// length as before, temporarily.
func (q *Queue) Len() int {
q.waitingLock.Lock()
defer q.waitingLock.Unlock()
return len(q.waiting)
}
// Enqueue puts a job onto the queue. It will block until the queue's
// loop can accept the job; but this does _not_ depend on a job being
// dequeued and will always proceed eventually.
func (q *Queue) Enqueue(j *Job) {
q.incoming <- j
}
// Ready returns a channel that can be used to dequeue items. Note
// that dequeuing is not atomic: you may still see the
// dequeued item with ForEach, for a time.
func (q *Queue) Ready() <-chan *Job {
return q.ready
}
func (q *Queue) ForEach(fn func(int, *Job) bool) {
q.waitingLock.Lock()
jobs := q.waiting
q.waitingLock.Unlock()
for i, job := range jobs {
if !fn(i, job) {
return
}
}
}
// Block until any previous operations have completed. Note that this
// is only meaningful if you are using the queue from a single other
// goroutine; i.e., it makes sense to do, say,
//
// q.Enqueue(j)
// q.Sync()
// fmt.Printf("Queue length is %d\n", q.Len())
//
// but only because those statements are sequential in a single
// thread. So this is really only useful for testing.
func (q *Queue) Sync() {
q.sync <- struct{}{}
}
func (q *Queue) loop(stop <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
var out chan *Job = nil
if len(q.waiting) > 0 {
out = q.ready
}
select {
case <-stop:
return
case <-q.sync:
continue
case in := <-q.incoming:
q.waitingLock.Lock()
q.waiting = append(q.waiting, in)
q.waitingLock.Unlock()
case out <- q.nextOrNil(): // cannot proceed if out is nil
q.waitingLock.Lock()
q.waiting = q.waiting[1:]
q.waitingLock.Unlock()
}
}
}
// nextOrNil returns the head of the queue, or nil if
// the queue is empty.
func (q *Queue) nextOrNil() *Job {
q.waitingLock.Lock()
defer q.waitingLock.Unlock()
if len(q.waiting) > 0 {
return q.waiting[0]
}
return nil
}