/
argo_status_updater.go
240 lines (201 loc) · 7.55 KB
/
argo_status_updater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
package argocd
import (
"bytes"
"errors"
"fmt"
"slices"
"strings"
"sync"
"time"
"github.com/shini4i/argo-watcher/internal/helpers"
"github.com/avast/retry-go/v4"
"github.com/rs/zerolog/log"
"github.com/shini4i/argo-watcher/internal/models"
)
const failedToUpdateTaskStatusTemplate string = "Failed to change task status: %s"
type MutexMap struct {
m sync.Map
}
func (mm *MutexMap) Get(key string) *sync.Mutex {
log.Debug().Msgf("acquiring mutex for %s app", key)
m, _ := mm.m.LoadOrStore(key, &sync.Mutex{})
return m.(*sync.Mutex) // nolint:forcetypeassert // type assertion is guaranteed to be correct
}
type ArgoStatusUpdater struct {
argo Argo
registryProxyUrl string
retryOptions []retry.Option
mutex MutexMap
acceptSuspended bool
}
func (updater *ArgoStatusUpdater) Init(argo Argo, retryAttempts uint, retryDelay time.Duration, registryProxyUrl string, acceptSuspended bool) {
updater.argo = argo
updater.registryProxyUrl = registryProxyUrl
updater.retryOptions = []retry.Option{
retry.DelayType(retry.FixedDelay),
retry.Attempts(retryAttempts),
retry.Delay(retryDelay),
retry.LastErrorOnly(true),
}
updater.acceptSuspended = acceptSuspended
}
func (updater *ArgoStatusUpdater) collectInitialAppStatus(task *models.Task) error {
application, err := updater.argo.api.GetApplication(task.App)
if err != nil {
return err
}
status := application.GetRolloutStatus(task.ListImages(), updater.registryProxyUrl, updater.acceptSuspended)
// sort images to avoid hash mismatch
slices.Sort(application.Status.Summary.Images)
task.SavedAppStatus = models.SavedAppStatus{
Status: status,
ImagesHash: helpers.GenerateHash(strings.Join(application.Status.Summary.Images, ",")),
}
return nil
}
func (updater *ArgoStatusUpdater) WaitForRollout(task models.Task) {
// wait for application to get into deployed status or timeout
application, err := updater.waitForApplicationDeployment(task)
// handle application failure
if err != nil {
// deployment failed
updater.argo.metrics.AddFailedDeployment(task.App)
// update task status regarding failure
updater.handleArgoAPIFailure(task, err)
return
}
// get application status
status := application.GetRolloutStatus(task.ListImages(), updater.registryProxyUrl, updater.acceptSuspended)
if status == models.ArgoRolloutAppSuccess {
log.Info().Str("id", task.Id).Msg("App is running on the expected version.")
// deployment success
updater.argo.metrics.ResetFailedDeployment(task.App)
// update task status
errStatusChange := updater.argo.State.SetTaskStatus(task.Id, models.StatusDeployedMessage, "")
if errStatusChange != nil {
log.Error().Str("id", task.Id).Msgf(failedToUpdateTaskStatusTemplate, errStatusChange)
}
} else {
log.Info().Str("id", task.Id).Msg("App deployment failed.")
// deployment failed
updater.argo.metrics.AddFailedDeployment(task.App)
// generate failure reason
reason := fmt.Sprintf(
"Application deployment failed. Rollout status \"%s\"\n\n%s",
status,
application.GetRolloutMessage(status, task.ListImages()),
)
// update task status
errStatusChange := updater.argo.State.SetTaskStatus(task.Id, models.StatusFailedMessage, reason)
if errStatusChange != nil {
log.Error().Str("id", task.Id).Msgf(failedToUpdateTaskStatusTemplate, errStatusChange)
}
}
}
func (updater *ArgoStatusUpdater) waitForApplicationDeployment(task models.Task) (*models.Application, error) {
var application *models.Application
var err error
app, err := updater.argo.api.GetApplication(task.App)
if err != nil {
return nil, err
}
// save the initial application status to compare with the final one
if err := updater.collectInitialAppStatus(&task); err != nil {
return nil, err
}
// This mutex is used only to avoid concurrent updates of the same application.
mutex := updater.mutex.Get(task.App)
// Locking the mutex here to unlock within the next if block without duplicating the code,
// avoiding defer to unlock before the function's end. This approach may be revised later
mutex.Lock()
if app.IsManagedByWatcher() && task.Validated {
err = updater.updateGitRepo(app, task, mutex)
} else {
mutex.Unlock()
log.Debug().Str("id", task.Id).Msg("Skipping git repo update: Application does not have the necessary annotations or token is missing.")
}
if err != nil {
return nil, err
}
// wait for application to get into deployed status or timeout
application, err = updater.waitRollout(task)
// return application and latest error
return application, err
}
func (updater *ArgoStatusUpdater) updateGitRepo(app *models.Application, task models.Task, mutex *sync.Mutex) error {
log.Debug().Str("id", task.Id).Msg("Application managed by watcher. Initiating git repo update.")
// simplest way to deal with potential git conflicts
// need to be replaced with a more sophisticated solution after PoC
err := retry.Do(
func() error {
if err := app.UpdateGitImageTag(&task); err != nil {
return err
}
return nil
},
retry.DelayType(retry.BackOffDelay),
retry.Attempts(5),
retry.OnRetry(func(n uint, err error) {
log.Warn().Str("id", task.Id).Msgf("Failed to update git repo. Error: %s, retrying...", err.Error())
}),
retry.LastErrorOnly(true),
)
mutex.Unlock()
if err != nil {
log.Error().Str("id", task.Id).Msgf("Failed to update git repo. Error: %s", err.Error())
return err
}
return nil
}
func (updater *ArgoStatusUpdater) waitRollout(task models.Task) (*models.Application, error) {
var application *models.Application
var err error
log.Debug().Str("id", task.Id).Msg("Waiting for rollout")
_ = retry.Do(func() error {
application, err = updater.argo.api.GetApplication(task.App)
if err != nil {
// check if ArgoCD didn't have the app
if task.IsAppNotFoundError(err) {
// no need to retry in such cases
return retry.Unrecoverable(err)
}
// print application api failure here
log.Debug().Str("id", task.Id).Msgf("Failed fetching application status. Error: %s", err.Error())
return err
}
status := application.GetRolloutStatus(task.ListImages(), updater.registryProxyUrl, updater.acceptSuspended)
switch status {
case models.ArgoRolloutAppDegraded:
log.Debug().Str("id", task.Id).Msgf("Application is degraded")
hash := helpers.GenerateHash(strings.Join(application.Status.Summary.Images, ","))
if !bytes.Equal(task.SavedAppStatus.ImagesHash, hash) {
return retry.Unrecoverable(errors.New("application has degraded"))
}
case models.ArgoRolloutAppSuccess:
log.Debug().Str("id", task.Id).Msgf("Application rollout finished")
return nil
default:
log.Debug().Str("id", task.Id).Msgf("Application status is not final. Status received \"%s\"", status)
}
return errors.New("force retry")
}, updater.retryOptions...)
// return application and latest error
return application, err
}
func (updater *ArgoStatusUpdater) handleArgoAPIFailure(task models.Task, err error) {
var apiFailureStatus = models.StatusFailedMessage
// check if ArgoCD didn't have the app
if task.IsAppNotFoundError(err) {
apiFailureStatus = models.StatusAppNotFoundMessage
}
// check if ArgoCD was unavailable
if strings.Contains(err.Error(), argoUnavailableErrorMessage) {
apiFailureStatus = models.StatusAborted
}
// write debug reason
reason := fmt.Sprintf(ArgoAPIErrorTemplate, err.Error())
log.Warn().Str("id", task.Id).Msgf("Deployment failed with status \"%s\". Aborting with error: %s", apiFailureStatus, reason)
if err := updater.argo.State.SetTaskStatus(task.Id, apiFailureStatus, reason); err != nil {
log.Error().Str("id", task.Id).Msgf(failedToUpdateTaskStatusTemplate, err)
}
}