-
Notifications
You must be signed in to change notification settings - Fork 13
/
reporter.go
331 lines (294 loc) · 11.1 KB
/
reporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
package reporter
import (
"context"
"strings"
"k8s.io/client-go/util/retry"
"github.com/hashicorp/go-multierror"
"github.com/solo-io/go-utils/contextutils"
"github.com/solo-io/go-utils/hashutils"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/resources"
"github.com/solo-io/solo-kit/pkg/api/v1/resources/core"
"github.com/solo-io/solo-kit/pkg/errors"
)
type Report struct {
Warnings []string
Errors error
}
type ResourceReports map[resources.InputResource]Report
func (e ResourceReports) Accept(res ...resources.InputResource) ResourceReports {
for _, r := range res {
e[r] = Report{}
}
return e
}
// Merge merges the given resourceReports into this resourceReports.
// Any resources which appear in both resourceReports will
// have their warnings and errors merged.
// Errors appearing in both reports, as determined by the error strings,
// will not be duplicated in the resulting merged report.
func (e ResourceReports) Merge(resErrs ResourceReports) {
for k, v := range resErrs {
if firstReport, exists := e[k]; exists {
// report already exists for this resource,
// merge new report into existing report:
secondReport := v
// Merge warnings lists
allWarnings := make(map[string]bool)
for _, warning := range firstReport.Warnings {
allWarnings[warning] = true
}
for _, warning := range secondReport.Warnings {
if _, found := allWarnings[warning]; !found {
firstReport.Warnings = append(firstReport.Warnings, warning)
}
}
if firstReport.Errors == nil {
// Only 2nd has errs
firstReport.Errors = secondReport.Errors
e[k] = firstReport
continue
} else if secondReport.Errors == nil {
// Only 1st has errs
e[k] = firstReport
continue
}
// Both first and second have errors for the same resource:
// Any errors which are identical won't be duplicated,
// Any errors which are unique will be added to the final list
errs1, isFirstMulti := firstReport.Errors.(*multierror.Error)
errs2, isSecondMulti := secondReport.Errors.(*multierror.Error)
// If the errors are not mutliErrs, wrap them in multiErrs:
if !isFirstMulti {
errs1 = &multierror.Error{Errors: []error{firstReport.Errors}}
}
if !isSecondMulti {
errs2 = &multierror.Error{Errors: []error{secondReport.Errors}}
}
allErrsMap := make(map[string]error)
for _, err := range errs1.Errors {
allErrsMap[err.Error()] = err
}
for _, err := range errs2.Errors {
if _, found := allErrsMap[err.Error()]; !found {
allErrsMap[err.Error()] = err
errs1.Errors = append(errs1.Errors, err)
}
}
firstReport.Errors = errs1
e[k] = firstReport
} else {
// Resource in 2nd report is not yet in 1st report
e[k] = v
}
}
}
func (e ResourceReports) AddErrors(res resources.InputResource, errs ...error) {
for _, err := range errs {
e.AddError(res, err)
}
}
func (e ResourceReports) AddError(res resources.InputResource, err error) {
if err == nil {
return
}
rpt := e[res]
rpt.Errors = multierror.Append(rpt.Errors, err)
e[res] = rpt
}
func (e ResourceReports) AddWarnings(res resources.InputResource, warning ...string) {
for _, warn := range warning {
e.AddWarning(res, warn)
}
}
func (e ResourceReports) AddWarning(res resources.InputResource, warning string) {
if warning == "" {
return
}
rpt := e[res]
rpt.Warnings = append(rpt.Warnings, warning)
e[res] = rpt
}
func (e ResourceReports) Find(kind string, ref core.ResourceRef) (resources.InputResource, Report) {
for res, rpt := range e {
if resources.Kind(res) == kind && ref == res.GetMetadata().Ref() {
return res, rpt
}
}
return nil, Report{}
}
// ignores warnings
func (e ResourceReports) Validate() error {
var errs error
for res, rpt := range e {
if rpt.Errors != nil {
if errs == nil {
errs = errors.Errorf("invalid resource %v.%v", res.GetMetadata().Namespace, res.GetMetadata().Name)
}
errs = multierror.Append(errs, rpt.Errors)
}
}
return errs
}
// does not ignore warnings
func (e ResourceReports) ValidateStrict() error {
errs := e.Validate()
for res, rpt := range e {
if len(rpt.Warnings) > 0 {
if errs == nil {
errs = errors.Errorf("invalid resource %v.%v", res.GetMetadata().Namespace, res.GetMetadata().Name)
}
errs = multierror.Append(errs, errors.Errorf("WARN: \n %v", rpt.Warnings))
}
}
return errs
}
// Minimal set of client operations required for reporters.
type ReporterResourceClient interface {
Kind() string
Read(namespace, name string, opts clients.ReadOpts) (resources.Resource, error)
Write(resource resources.Resource, opts clients.WriteOpts) (resources.Resource, error)
}
type Reporter interface {
WriteReports(ctx context.Context, errs ResourceReports, subresourceStatuses map[string]*core.Status) error
}
type StatusReporter interface {
Reporter
StatusFromReport(report Report, subresourceStatuses map[string]*core.Status) core.Status
}
type reporter struct {
clients map[string]ReporterResourceClient
ref string
}
func NewReporter(reporterRef string, reporterClients ...ReporterResourceClient) StatusReporter {
clientsByKind := make(map[string]ReporterResourceClient)
for _, client := range reporterClients {
clientsByKind[client.Kind()] = client
}
return &reporter{
ref: reporterRef,
clients: clientsByKind,
}
}
// ResourceReports may be modified, and end up with fewer resources than originally requested.
// If resources referenced in the resourceErrs don't exist, they will be removed.
func (r *reporter) WriteReports(ctx context.Context, resourceErrs ResourceReports, subresourceStatuses map[string]*core.Status) error {
ctx = contextutils.WithLogger(ctx, "reporter")
logger := contextutils.LoggerFrom(ctx)
var merr *multierror.Error
// copy the map so we can iterate over the copy, deleting resources from
// the original map if they are not found/no longer exist.
resourceErrsCopy := make(ResourceReports, len(resourceErrs))
for resource, report := range resourceErrs {
resourceErrsCopy[resource] = report
}
for resource, report := range resourceErrsCopy {
kind := resources.Kind(resource)
client, ok := r.clients[kind]
if !ok {
return errors.Errorf("reporter: was passed resource of kind %v but no client to support it", kind)
}
status := r.StatusFromReport(report, subresourceStatuses)
resourceToWrite := resources.Clone(resource).(resources.InputResource)
if status.Equal(resource.GetStatus()) {
logger.Debugf("skipping report for %v as it has not changed", resourceToWrite.GetMetadata().Ref())
continue
}
resourceToWrite.SetStatus(status)
var updatedResource resources.Resource
writeErr := errors.RetryOnConflict(retry.DefaultBackoff, func() error {
var writeErr error
updatedResource, resourceToWrite, writeErr = attemptUpdateStatus(ctx, client, resourceToWrite)
return writeErr
})
if writeErr != nil {
err := errors.Wrapf(writeErr, "failed to write status %v for resource %v", status, resource.GetMetadata().Name)
logger.Warn(err)
merr = multierror.Append(merr, err)
continue
}
if updatedResource != nil {
logger.Debugf("wrote report for %v : %v", updatedResource.GetMetadata().Ref(), status)
} else {
logger.Debugf("did not write report for %v : %v because resource was not found", resourceToWrite.GetMetadata().Ref(), status)
delete(resourceErrs, resource)
}
}
return merr.ErrorOrNil()
}
// Ideally, this and its caller, WriteReports, would just take the resource ref and its status, rather than the resource itself,
// to avoid confusion about whether this may update the resource rather than just its status.
// However, this change is not worth the effort and risk right now. (Ariana, June 2020)
func attemptUpdateStatus(ctx context.Context, client ReporterResourceClient, resourceToWrite resources.InputResource) (resources.Resource, resources.InputResource, error) {
var readErr error
resourceFromRead, readErr := client.Read(resourceToWrite.GetMetadata().Namespace, resourceToWrite.GetMetadata().Name, clients.ReadOpts{Ctx: ctx})
if readErr != nil && errors.IsNotExist(readErr) { // resource has been deleted, don't re-create
return nil, resourceToWrite, nil
}
if readErr == nil {
// set resourceToWrite to the resource we read but with the new status
// Note: it's possible that this resourceFromRead is newer than the resourceToWrite and therefore the status will be out of sync.
// If so, we will soon recalculate the status. The interim incorrect status is not dangerous since the status is informational only.
// Also, the status is accurate for the resource as it's stored in Gloo's memory in the interim.
// This is explained further here: https://github.com/solo-io/solo-kit/pull/360#discussion_r433397163
if inputResourceFromRead, ok := resourceFromRead.(resources.InputResource); ok {
status := resourceToWrite.GetStatus()
resourceToWrite = inputResourceFromRead
resourceToWrite.SetStatus(status)
}
}
updatedResource, writeErr := client.Write(resourceToWrite, clients.WriteOpts{Ctx: ctx, OverwriteExisting: true})
if writeErr == nil {
return updatedResource, resourceToWrite, nil
}
updatedResource, readErr = client.Read(resourceToWrite.GetMetadata().Namespace, resourceToWrite.GetMetadata().Name, clients.ReadOpts{Ctx: ctx})
if readErr != nil {
if errors.IsResourceVersion(writeErr) {
// we don't want to return the unwrapped resource version writeErr if we also had a read error
// otherwise we could get into infinite retry loop if reads repeatedly failed (e.g., no read RBAC)
return nil, resourceToWrite, errors.Wrapf(writeErr, "unable to read updated resource, no reason to retry resource version conflict; readErr %v", readErr)
}
return nil, resourceToWrite, writeErr
}
// we successfully read an updated version of the resource we are
// trying to update. let's update resourceToWrite for the next iteration
equal, _ := hashutils.HashableEqual(updatedResource, resourceToWrite)
if !equal {
// different hash, something important was done, do not try again:
return updatedResource, resourceToWrite, nil
}
resourceToWriteUpdated := resources.Clone(updatedResource).(resources.InputResource)
resourceToWriteUpdated.SetStatus(resourceToWrite.GetStatus())
return updatedResource, resourceToWriteUpdated, writeErr
}
func (r *reporter) StatusFromReport(report Report, subresourceStatuses map[string]*core.Status) core.Status {
var warningReason string
if len(report.Warnings) > 0 {
warningReason = "warning: \n " + strings.Join(report.Warnings, "\n")
}
if report.Errors != nil {
errorReason := report.Errors.Error()
if warningReason != "" {
errorReason += "\n" + warningReason
}
return core.Status{
State: core.Status_Rejected,
Reason: errorReason,
ReportedBy: r.ref,
SubresourceStatuses: subresourceStatuses,
}
}
if warningReason != "" {
return core.Status{
State: core.Status_Warning,
Reason: warningReason,
ReportedBy: r.ref,
SubresourceStatuses: subresourceStatuses,
}
}
return core.Status{
State: core.Status_Accepted,
ReportedBy: r.ref,
SubresourceStatuses: subresourceStatuses,
}
}