This repository has been archived by the owner on Nov 1, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
loop.go
221 lines (195 loc) · 6.31 KB
/
loop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package daemon
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/weaveworks/flux/git"
fluxmetrics "github.com/weaveworks/flux/metrics"
fluxsync "github.com/weaveworks/flux/sync"
)
type LoopVars struct {
SyncInterval time.Duration
AutomationInterval time.Duration
GitTimeout time.Duration
GitVerifySignatures bool
SyncState fluxsync.State
initOnce sync.Once
syncSoon chan struct{}
automatedWorkloadsSoon chan struct{}
}
func (loop *LoopVars) ensureInit() {
loop.initOnce.Do(func() {
loop.syncSoon = make(chan struct{}, 1)
loop.automatedWorkloadsSoon = make(chan struct{}, 1)
})
}
func (d *Daemon) Loop(stop chan struct{}, wg *sync.WaitGroup, logger log.Logger) {
defer wg.Done()
// We want to sync at least every `SyncInterval`. Being told to
// sync, or completing a job, may intervene (in which case,
// reschedule the next sync).
syncTimer := time.NewTimer(d.SyncInterval)
// Similarly checking to see if any controllers have new images
// available.
automatedWorkloadTimer := time.NewTimer(d.AutomationInterval)
// Keep track of current, verified (if signature verification is
// enabled), HEAD, so we can know when to treat a repo
// mirror notification as a change. Otherwise, we'll just sync
// every timer tick as well as every mirror refresh.
syncHead := ""
// In-memory sync tag state
ratchet := &lastKnownSyncState{logger: logger, state: d.SyncState}
// If the git repo is read-only, the image update will fail; to
// avoid repeated failures in the log, mention it here and
// otherwise skip it when it comes around.
if d.Repo.Readonly() {
logger.Log("info", "Repo is read-only; no image updates will be attempted")
}
// Ask for a sync, and to check
d.AskForSync()
d.AskForAutomatedWorkloadImageUpdates()
for {
select {
case <-stop:
logger.Log("stopping", "true")
return
case <-d.automatedWorkloadsSoon:
if !automatedWorkloadTimer.Stop() {
select {
case <-automatedWorkloadTimer.C:
default:
}
}
if d.Repo.Readonly() {
// don't bother trying to update images, and don't
// bother setting the timer again
continue
}
d.pollForNewAutomatedWorkloadImages(logger)
automatedWorkloadTimer.Reset(d.AutomationInterval)
case <-automatedWorkloadTimer.C:
d.AskForAutomatedWorkloadImageUpdates()
case <-d.syncSoon:
if !syncTimer.Stop() {
select {
case <-syncTimer.C:
default:
}
}
started := time.Now().UTC()
err := d.Sync(context.Background(), started, syncHead, ratchet)
syncDuration.With(
fluxmetrics.LabelSuccess, fmt.Sprint(err == nil),
).Observe(time.Since(started).Seconds())
if err != nil {
logger.Log("err", err)
}
syncTimer.Reset(d.SyncInterval)
case <-syncTimer.C:
d.AskForSync()
case <-d.Repo.C:
var newSyncHead string
var invalidCommit git.Commit
var err error
ctx, cancel := context.WithTimeout(context.Background(), d.GitTimeout)
if d.GitVerifySignatures {
newSyncHead, invalidCommit, err = latestValidRevision(ctx, d.Repo, d.SyncState)
} else {
newSyncHead, err = d.Repo.BranchHead(ctx)
}
cancel()
if err != nil {
logger.Log("url", d.Repo.Origin().URL, "err", err)
continue
}
if invalidCommit.Revision != "" {
logger.Log("err", "found invalid GPG signature for commit", "revision", invalidCommit.Revision, "key", invalidCommit.Signature.Key)
}
logger.Log("event", "refreshed", "url", d.Repo.Origin().URL, "branch", d.GitConfig.Branch, "HEAD", newSyncHead)
if newSyncHead != syncHead {
syncHead = newSyncHead
d.AskForSync()
}
case job := <-d.Jobs.Ready():
queueLength.Set(float64(d.Jobs.Len()))
jobLogger := log.With(logger, "jobID", job.ID)
jobLogger.Log("state", "in-progress")
// It's assumed that (successful) jobs will push commits
// to the upstream repo, and therefore we probably want to
// pull from there and sync the cluster afterwards.
start := time.Now()
err := job.Do(jobLogger)
jobDuration.With(
fluxmetrics.LabelSuccess, fmt.Sprint(err == nil),
).Observe(time.Since(start).Seconds())
if err != nil {
jobLogger.Log("state", "done", "success", "false", "err", err)
} else {
jobLogger.Log("state", "done", "success", "true")
ctx, cancel := context.WithTimeout(context.Background(), d.GitTimeout)
err := d.Repo.Refresh(ctx)
if err != nil {
logger.Log("err", err)
}
cancel()
}
}
}
}
// Ask for a sync, or if there's one waiting, let that happen.
func (d *LoopVars) AskForSync() {
d.ensureInit()
select {
case d.syncSoon <- struct{}{}:
default:
}
}
// Ask for an image poll, or if there's one waiting, let that happen.
func (d *LoopVars) AskForAutomatedWorkloadImageUpdates() {
d.ensureInit()
select {
case d.automatedWorkloadsSoon <- struct{}{}:
default:
}
}
// -- internals to keep track of sync tag state
type lastKnownSyncState struct {
logger log.Logger
state fluxsync.State
// bookkeeping
revision string
warnedAboutChange bool
}
// Current returns the revision from the state
func (s *lastKnownSyncState) Current(ctx context.Context) (string, error) {
return s.state.GetRevision(ctx)
}
// Update records the synced revision in persistent storage (the
// sync.State). In addition, it checks that the old revision matches
// the last sync revision before making the update; mismatches suggest
// multiple Flux daemons are using the same state, so we log these.
func (s *lastKnownSyncState) Update(ctx context.Context, oldRev, newRev string) (bool, error) {
// Check if something other than the current instance of fluxd
// changed the sync tag. This is likely caused by another instance
// using the same tag. Having multiple instances fight for the same
// tag can lead to fluxd missing manifest changes.
if s.revision != "" && oldRev != s.revision && !s.warnedAboutChange {
s.logger.Log("warning",
"detected external change in sync state; the sync state should not be shared by fluxd instances",
"state", s.state.String())
s.warnedAboutChange = true
}
// Did it actually change?
if s.revision == newRev {
return false, nil
}
if err := s.state.UpdateMarker(ctx, newRev); err != nil {
return false, err
}
// Update in-memory revision
s.revision = newRev
s.logger.Log("state", s.state.String(), "old", oldRev, "new", newRev)
return true, nil
}