/
run_queue.go
127 lines (103 loc) · 2.63 KB
/
run_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package services
import (
"fmt"
"sync"
uuid "github.com/satori/go.uuid"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
numberRunsQueued = promauto.NewCounter(prometheus.CounterOpts{
Name: "run_queue_runs_queued",
Help: "The total number of runs that have been queued",
})
numberRunQueueWorkers = promauto.NewGauge(prometheus.GaugeOpts{
Name: "run_queue_queue_size",
Help: "The size of the run queue",
})
)
//go:generate mockery --name RunQueue --output ../internal/mocks/ --case=underscore
// RunQueue safely handles coordinating job runs.
type RunQueue interface {
Start() error
Stop()
Run(uuid.UUID)
WorkerCount() int
}
type runQueue struct {
workersMutex sync.RWMutex
workers map[string]int
workersWg sync.WaitGroup
stopRequested bool
runExecutor RunExecutor
}
// NewRunQueue initializes a RunQueue.
func NewRunQueue(runExecutor RunExecutor) RunQueue {
return &runQueue{
workers: make(map[string]int),
runExecutor: runExecutor,
}
}
// Start prepares the job runner for accepting runs to execute.
func (rq *runQueue) Start() error {
return nil
}
// Stop closes all open worker channels.
func (rq *runQueue) Stop() {
rq.workersMutex.Lock()
rq.stopRequested = true
rq.workersMutex.Unlock()
rq.workersWg.Wait()
}
func (rq *runQueue) incrementQueue(runID string) bool {
defer rq.workersMutex.Unlock()
rq.workersMutex.Lock()
numberRunsQueued.Inc()
wasEmpty := rq.workers[runID] == 0
rq.workers[runID]++
numberRunQueueWorkers.Set(float64(len(rq.workers)))
return wasEmpty
}
func (rq *runQueue) decrementQueue(runID string) bool {
defer rq.workersMutex.Unlock()
rq.workersMutex.Lock()
rq.workers[runID]--
isEmpty := rq.workers[runID] <= 0
if isEmpty {
delete(rq.workers, runID)
}
numberRunQueueWorkers.Set(float64(len(rq.workers)))
return isEmpty
}
// Run tells the job runner to start executing a job
func (rq *runQueue) Run(runID uuid.UUID) {
rq.workersMutex.Lock()
if rq.stopRequested {
rq.workersMutex.Unlock()
return
}
rq.workersMutex.Unlock()
id := runID.String()
if !rq.incrementQueue(id) {
return
}
rq.workersWg.Add(1)
go func() {
defer rq.workersWg.Done()
for {
if err := rq.runExecutor.Execute(runID); err != nil {
logger.Errorw(fmt.Sprint("Error executing run ", id), "error", err)
}
if rq.decrementQueue(id) {
return
}
}
}()
}
// WorkerCount returns the number of workers currently processing a job run
func (rq *runQueue) WorkerCount() int {
rq.workersMutex.RLock()
defer rq.workersMutex.RUnlock()
return len(rq.workers)
}