forked from hashicorp/nomad
/
batcher.go
139 lines (116 loc) · 3.42 KB
/
batcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package deploymentwatcher
import (
"context"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
// AllocUpdateBatcher is used to batch the updates to the desired transitions
// of allocations and the creation of evals.
type AllocUpdateBatcher struct {
// batch is the batching duration
batch time.Duration
// raft is used to actually commit the updates
raft DeploymentRaftEndpoints
// workCh is used to pass evaluations to the daemon process
workCh chan *updateWrapper
// ctx is used to exit the daemon batcher
ctx context.Context
}
// NewAllocUpdateBatcher returns an AllocUpdateBatcher that uses the passed raft endpoints to
// create the allocation desired transition updates and new evaluations and
// exits the batcher when the passed exit channel is closed.
func NewAllocUpdateBatcher(batchDuration time.Duration, raft DeploymentRaftEndpoints, ctx context.Context) *AllocUpdateBatcher {
b := &AllocUpdateBatcher{
batch: batchDuration,
raft: raft,
ctx: ctx,
workCh: make(chan *updateWrapper, 10),
}
go b.batcher()
return b
}
// CreateUpdate batches the allocation desired transition update and returns a
// future that tracks the completion of the request.
func (b *AllocUpdateBatcher) CreateUpdate(allocs map[string]*structs.DesiredTransition, eval *structs.Evaluation) *BatchFuture {
wrapper := &updateWrapper{
allocs: allocs,
e: eval,
f: make(chan *BatchFuture, 1),
}
b.workCh <- wrapper
return <-wrapper.f
}
type updateWrapper struct {
allocs map[string]*structs.DesiredTransition
e *structs.Evaluation
f chan *BatchFuture
}
// batcher is the long lived batcher goroutine
func (b *AllocUpdateBatcher) batcher() {
var timerCh <-chan time.Time
allocs := make(map[string]*structs.DesiredTransition)
evals := make(map[string]*structs.Evaluation)
future := NewBatchFuture()
for {
select {
case <-b.ctx.Done():
return
case w := <-b.workCh:
if timerCh == nil {
timerCh = time.After(b.batch)
}
// Store the eval and alloc updates, and attach the future
evals[w.e.DeploymentID] = w.e
for id, upd := range w.allocs {
allocs[id] = upd
}
w.f <- future
case <-timerCh:
// Capture the future and create a new one
f := future
future = NewBatchFuture()
// Shouldn't be possible
if f == nil {
panic("no future")
}
// Create the request
req := &structs.AllocUpdateDesiredTransitionRequest{
Allocs: allocs,
Evals: make([]*structs.Evaluation, 0, len(evals)),
}
for _, e := range evals {
req.Evals = append(req.Evals, e)
}
// Upsert the evals in a go routine
go f.Set(b.raft.UpdateAllocDesiredTransition(req))
// Reset the evals list and timer
evals = make(map[string]*structs.Evaluation)
allocs = make(map[string]*structs.DesiredTransition)
timerCh = nil
}
}
}
// BatchFuture is a future that can be used to retrieve the index the eval was
// created at or any error in the creation process
type BatchFuture struct {
index uint64
err error
waitCh chan struct{}
}
// NewBatchFuture returns a new BatchFuture
func NewBatchFuture() *BatchFuture {
return &BatchFuture{
waitCh: make(chan struct{}),
}
}
// Set sets the results of the future, unblocking any client.
func (f *BatchFuture) Set(index uint64, err error) {
f.index = index
f.err = err
close(f.waitCh)
}
// Results returns the creation index and any error.
func (f *BatchFuture) Results() (uint64, error) {
<-f.waitCh
return f.index, f.err
}