/
duplicated_events.go
337 lines (289 loc) · 11.9 KB
/
duplicated_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
package pathologicaleventlibrary
import (
"context"
"fmt"
"strconv"
"strings"
"github.com/openshift/origin/pkg/monitortestlibrary/platformidentification"
"github.com/sirupsen/logrus"
v1 "github.com/openshift/api/config/v1"
configclient "github.com/openshift/client-go/config/clientset/versioned"
operatorv1client "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1"
"github.com/openshift/origin/pkg/monitor/monitorapi"
"github.com/openshift/origin/pkg/test/ginkgo/junitapi"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/rest"
)
func TestDuplicatedEventForUpgrade(events monitorapi.Intervals, kubeClientConfig *rest.Config) []*junitapi.JUnitTestCase {
registry := NewUpgradePathologicalEventMatchers(kubeClientConfig, events)
evaluator := duplicateEventsEvaluator{
registry: registry,
}
platform, topology, err := GetClusterInfraInfo(kubeClientConfig)
if err != nil {
logrus.WithError(err).Error("could not fetch cluster infra info")
} else {
// These could be coming out "" in theory
evaluator.platform = platform
evaluator.topology = topology
}
tests := []*junitapi.JUnitTestCase{}
tests = append(tests, evaluator.testDuplicatedCoreNamespaceEvents(events, kubeClientConfig)...)
tests = append(tests, evaluator.testDuplicatedE2ENamespaceEvents(events, kubeClientConfig)...)
return tests
}
func TestDuplicatedEventForStableSystem(events monitorapi.Intervals, clientConfig *rest.Config) []*junitapi.JUnitTestCase {
registry := NewUniversalPathologicalEventMatchers(clientConfig, events)
evaluator := duplicateEventsEvaluator{
registry: registry,
}
platform, topology, err := GetClusterInfraInfo(clientConfig)
if err != nil {
logrus.WithError(err).Error("could not fetch cluster infra info")
} else {
// These could be coming out "" in theory
evaluator.platform = platform
evaluator.topology = topology
}
tests := []*junitapi.JUnitTestCase{}
tests = append(tests, evaluator.testDuplicatedCoreNamespaceEvents(events, clientConfig)...)
tests = append(tests, evaluator.testDuplicatedE2ENamespaceEvents(events, clientConfig)...)
return tests
}
type duplicateEventsEvaluator struct {
registry *AllowedPathologicalEventRegistry
// platform contains the current platform of the cluster under Test.
platform v1.PlatformType
// topology contains the topology of the cluster under Test.
topology v1.TopologyMode
}
// we want to identify events based on the monitor because it is (currently) our only spot that tracks events over time
// for every run. this means we see events that are created during updates and in e2e tests themselves. A [late] Test
// is easier to author, but less complete in its view.
// I hate regexes, so I only do this because I really have to.
func (d *duplicateEventsEvaluator) testDuplicatedCoreNamespaceEvents(events monitorapi.Intervals, kubeClientConfig *rest.Config) []*junitapi.JUnitTestCase {
const testName = "[sig-arch] events should not repeat pathologically"
return d.testDuplicatedEvents(testName, false, events.Filter(monitorapi.Not(monitorapi.IsInE2ENamespace)), kubeClientConfig, false)
}
// we want to identify events based on the monitor because it is (currently) our only spot that tracks events over time
// for every run. this means we see events that are created during updates and in e2e tests themselves. A [late] Test
// is easier to author, but less complete in its view.
// I hate regexes, so I only do this because I really have to.
func (d *duplicateEventsEvaluator) testDuplicatedE2ENamespaceEvents(events monitorapi.Intervals, kubeClientConfig *rest.Config) []*junitapi.JUnitTestCase {
const testName = "[sig-arch] events should not repeat pathologically in e2e namespaces"
return d.testDuplicatedEvents(testName, true, events.Filter(monitorapi.IsInE2ENamespace), kubeClientConfig, true)
}
// appendToFirstLine appends add to the end of the first line of s
func appendToFirstLine(s string, add string) string {
splits := strings.Split(s, "\n")
splits[0] += add
return strings.Join(splits, "\n")
}
func getJUnitName(testName string, namespace string) string {
jUnitName := testName
if namespace != "" {
jUnitName = jUnitName + " for ns/" + namespace
}
return jUnitName
}
func getNamespacesForJUnits() sets.String {
namespaces := platformidentification.KnownNamespaces.Clone()
namespaces.Insert("")
return namespaces
}
type eventResult struct {
failures []string
flakes []string
}
func generateFailureOutput(failures []string, flakes []string) string {
var output string
if len(failures) > 0 {
output = fmt.Sprintf("%d events happened too frequently\n\n%v", len(failures), strings.Join(failures, "\n"))
}
if len(flakes) > 0 {
if output != "" {
output += "\n\n"
}
output += fmt.Sprintf("%d events with known BZs\n\n%v", len(flakes), strings.Join(flakes, "\n"))
}
return output
}
func generateJUnitTestCasesCoreNamespaces(testName string, nsResults map[string]*eventResult) []*junitapi.JUnitTestCase {
var tests []*junitapi.JUnitTestCase
namespaces := getNamespacesForJUnits()
for namespace := range namespaces {
jUnitName := getJUnitName(testName, namespace)
if result, ok := nsResults[namespace]; ok {
output := generateFailureOutput(result.failures, result.flakes)
tests = append(tests, &junitapi.JUnitTestCase{
Name: jUnitName,
FailureOutput: &junitapi.FailureOutput{
Output: output,
},
})
// Add a success for flakes
if len(result.failures) == 0 && len(result.flakes) > 0 {
tests = append(tests, &junitapi.JUnitTestCase{Name: jUnitName})
}
} else {
tests = append(tests, &junitapi.JUnitTestCase{Name: jUnitName})
}
}
return tests
}
func generateJUnitTestCasesE2ENamespaces(testName string, nsResults map[string]*eventResult) []*junitapi.JUnitTestCase {
var tests []*junitapi.JUnitTestCase
if result, ok := nsResults[""]; ok {
if len(result.failures) > 0 || len(result.flakes) > 0 {
output := generateFailureOutput(result.failures, result.flakes)
tests = append(tests, &junitapi.JUnitTestCase{
Name: testName,
FailureOutput: &junitapi.FailureOutput{
Output: output,
},
})
}
if len(result.failures) == 0 {
// Add success for flake
tests = append(tests, &junitapi.JUnitTestCase{Name: testName})
}
}
if len(tests) == 0 {
tests = append(tests, &junitapi.JUnitTestCase{Name: testName})
}
return tests
}
// we want to identify events based on the monitor because it is (currently) our only spot that tracks events over time
// for every run. this means we see events that are created during updates and in e2e tests themselves. A [late] Test
// is easier to author, but less complete in its view.
// I hate regexes, so I only do this because I really have to.
func (d *duplicateEventsEvaluator) testDuplicatedEvents(testName string, flakeOnly bool, events monitorapi.Intervals, kubeClientConfig *rest.Config, isE2E bool) []*junitapi.JUnitTestCase {
// displayToCount maps a static display message to the matching repeating interval we saw with the highest count
displayToCount := map[string]monitorapi.Interval{}
for _, event := range events {
times := GetTimesAnEventHappened(event.StructuredMessage)
if times > DuplicateEventThreshold {
// Check if we have an allowance for this event. This code used to just check if it had an interesting flag,
// implying it matches some pattern, but that happens even for upgrade patterns occurring in non-upgrade jobs,
// so we were ignoring patterns that were meant to be allowed only in upgrade jobs in all jobs. The list of
// allowed patterns passed to this object wasn't even used.
if allowed, _ := d.registry.AllowedByAny(event, d.topology); allowed {
continue
}
// key used in a map to identify the common interval that is repeating and we may
// encounter multiple times.
eventDisplayMessage := fmt.Sprintf("%s - reason/%s %s", event.Locator,
event.StructuredMessage.Reason, event.StructuredMessage.HumanMessage)
if _, ok := displayToCount[eventDisplayMessage]; !ok {
displayToCount[eventDisplayMessage] = event
}
if times > GetTimesAnEventHappened(displayToCount[eventDisplayMessage].StructuredMessage) {
// Update to the latest interval we saw with the higher count, so from/to are more accurate
displayToCount[eventDisplayMessage] = event
}
}
}
nsResults := map[string]*eventResult{}
for intervalDisplayMsg, interval := range displayToCount {
namespace := interval.StructuredLocator.Keys[monitorapi.LocatorNamespaceKey]
intervalMsgWithTime := intervalDisplayMsg + " From: " + interval.From.Format("15:04:05Z") + " To: " + interval.To.Format("15:04:05Z")
msg := fmt.Sprintf("event happened %d times, something is wrong: %v",
GetTimesAnEventHappened(interval.StructuredMessage), intervalMsgWithTime)
// We only create junit for known namespaces
if !platformidentification.KnownNamespaces.Has(namespace) {
namespace = ""
}
if _, ok := nsResults[namespace]; !ok {
tmp := &eventResult{}
nsResults[namespace] = tmp
}
if flakeOnly {
nsResults[namespace].flakes = append(nsResults[namespace].flakes, appendToFirstLine(msg, " result=allow "))
} else {
nsResults[namespace].failures = append(nsResults[namespace].failures, appendToFirstLine(msg, " result=reject "))
}
}
var tests []*junitapi.JUnitTestCase
if isE2E {
tests = generateJUnitTestCasesE2ENamespaces(testName, nsResults)
} else {
tests = generateJUnitTestCasesCoreNamespaces(testName, nsResults)
}
return tests
}
func GetTimesAnEventHappened(msg monitorapi.Message) int {
countStr, ok := msg.Annotations[monitorapi.AnnotationCount]
if !ok {
return 1
}
times, err := strconv.ParseInt(countStr, 10, 0)
if err != nil { // not an int somehow
logrus.Warnf("interval had a non-integer count? %+v", msg)
return 0
}
return int(times)
}
func GetClusterInfraInfo(c *rest.Config) (platform v1.PlatformType, topology v1.TopologyMode, err error) {
if c == nil {
return
}
oc, err := configclient.NewForConfig(c)
if err != nil {
return "", "", err
}
infra, err := oc.ConfigV1().Infrastructures().Get(context.Background(), "cluster", metav1.GetOptions{})
if err != nil {
return "", "", err
}
if infra.Status.PlatformStatus != nil && infra.Status.PlatformStatus.Type != "" {
platform = infra.Status.PlatformStatus.Type
}
if infra.Status.ControlPlaneTopology != "" {
topology = infra.Status.ControlPlaneTopology
}
return platform, topology, nil
}
// getBiggestRevisionForEtcdOperator calculates the biggest revision among replicas of the most recently successful deployment
func getBiggestRevisionForEtcdOperator(ctx context.Context, operatorClient operatorv1client.OperatorV1Interface) (int, error) {
etcd, err := operatorClient.Etcds().Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
// instead of panicking when there no etcd operator (e.g. microshift), just estimate the biggest revision to be 0
if apierrors.IsNotFound(err) {
return 0, nil
} else {
return 0, err
}
}
biggestRevision := 0
for _, nodeStatus := range etcd.Status.NodeStatuses {
if int(nodeStatus.CurrentRevision) > biggestRevision {
biggestRevision = int(nodeStatus.CurrentRevision)
}
}
return biggestRevision, nil
}
// BuildTestDupeKubeEvent is a test utility to make the process of creating these specific intervals a little
// more brief.
func BuildTestDupeKubeEvent(namespace, pod, reason, msg string, count int) monitorapi.Interval {
l := monitorapi.Locator{
Type: monitorapi.LocatorTypePod,
Keys: map[monitorapi.LocatorKey]string{},
}
if namespace != "" {
l.Keys[monitorapi.LocatorNamespaceKey] = namespace
}
if pod != "" {
l.Keys[monitorapi.LocatorPodKey] = pod
}
i := monitorapi.NewInterval(monitorapi.SourceKubeEvent, monitorapi.Info).
Locator(l).
Message(
monitorapi.NewMessage().
Reason(monitorapi.IntervalReason(reason)).
HumanMessage(msg).
WithAnnotation(monitorapi.AnnotationCount, fmt.Sprintf("%d", count))).
BuildNow()
return i
}