Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event aggregation: include latest event message in aggregate event #46034

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 34 additions & 18 deletions staging/src/k8s.io/client-go/tools/record/events_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type EventAggregatorMessageFunc func(event *v1.Event) string

// EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
return "(events with common reason combined)"
return "(combined from similar events): " + event.Message
}

// EventAggregator identifies similar events and aggregates them into a single event
Expand Down Expand Up @@ -141,36 +141,53 @@ type aggregateRecord struct {
lastTimestamp metav1.Time
}

// EventAggregate identifies similar events and groups into a common event if required
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, error) {
aggregateKey, localKey := e.keyFunc(newEvent)
// EventAggregate checks if a similar event has been seen according to the
// aggregation configuration (max events, max interval, etc) and returns:
//
// - The (potentially modified) event that should be created
// - The cache key for the event, for correlation purposes. This will be set to
// the full key for normal events, and to the result of
// EventAggregatorMessageFunc for aggregate events.
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
now := metav1.NewTime(e.clock.Now())
record := aggregateRecord{localKeys: sets.NewString(), lastTimestamp: now}
var record aggregateRecord
// eventKey is the full cache key for this event
eventKey := getEventKey(newEvent)
// aggregateKey is for the aggregate event, if one is needed.
aggregateKey, localKey := e.keyFunc(newEvent)

// Do we have a record of similar events in our cache?
e.Lock()
defer e.Unlock()
value, found := e.cache.Get(aggregateKey)
if found {
record = value.(aggregateRecord)
}

// if the last event was far enough in the past, it is not aggregated, and we must reset state
// Is the previous record too old? If so, make a fresh one. Note: if we didn't
// find a similar record, its lastTimestamp will be the zero value, so we
// create a new one in that case.
maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
interval := now.Time.Sub(record.lastTimestamp.Time)
if interval > maxInterval {
record = aggregateRecord{localKeys: sets.NewString()}
}

// Write the new event into the aggregation record and put it on the cache
record.localKeys.Insert(localKey)
record.lastTimestamp = now
e.cache.Add(aggregateKey, record)

// If we are not yet over the threshold for unique events, don't correlate them
if uint(record.localKeys.Len()) < e.maxEvents {
return newEvent, nil
return newEvent, eventKey
}

// do not grow our local key set any larger than max
record.localKeys.PopAny()

// create a new aggregate event
// create a new aggregate event, and return the aggregateKey as the cache key
// (so that it can be overwritten.)
eventCopy := &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
Expand All @@ -185,7 +202,7 @@ func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, error)
Reason: newEvent.Reason,
Source: newEvent.Source,
}
return eventCopy, nil
return eventCopy, aggregateKey
}

// eventLog records data about when an event was observed
Expand Down Expand Up @@ -215,22 +232,22 @@ func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
}

// eventObserve records the event, and determines if its frequency should update
func (e *eventLogger) eventObserve(newEvent *v1.Event) (*v1.Event, []byte, error) {
// eventObserve records an event, or updates an existing one if key is a cache hit
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
var (
patch []byte
err error
)
key := getEventKey(newEvent)
eventCopy := *newEvent
event := &eventCopy

e.Lock()
defer e.Unlock()

// Check if there is an existing event we should update
lastObservation := e.lastEventObservationFromCache(key)

// we have seen this event before, so we must prepare a patch
// If we found a result, prepare a patch
if lastObservation.count > 0 {
// update the event based on the last observation so patch will work as desired
event.Name = lastObservation.name
Expand All @@ -241,6 +258,7 @@ func (e *eventLogger) eventObserve(newEvent *v1.Event) (*v1.Event, []byte, error
eventCopy2 := *event
eventCopy2.Count = 0
eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
eventCopy2.Message = ""

newData, _ := json.Marshal(event)
oldData, _ := json.Marshal(eventCopy2)
Expand Down Expand Up @@ -337,6 +355,7 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
defaultAggregateMaxEvents,
defaultAggregateIntervalInSeconds,
clock),

logger: newEventLogger(cacheSize, clock),
}
}
Expand All @@ -346,11 +365,8 @@ func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateRes
if c.filterFunc(newEvent) {
return &EventCorrelateResult{Skip: true}, nil
}
aggregateEvent, err := c.aggregator.EventAggregate(newEvent)
if err != nil {
return &EventCorrelateResult{}, err
}
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent)
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ func TestEventAggregatorByReasonFunc(t *testing.T) {

// TestEventAggregatorByReasonMessageFunc validates the proper output for an aggregate message
func TestEventAggregatorByReasonMessageFunc(t *testing.T) {
expected := "(events with common reason combined)"
expectedPrefix := "(combined from similar events): "
event1 := makeEvent("end-of-world", "it was fun", makeObjectReference("Pod", "pod1", "other"))
if actual := EventAggregatorByReasonMessageFunc(&event1); expected != actual {
t.Errorf("Expected %v got %v", expected, actual)
actual := EventAggregatorByReasonMessageFunc(&event1)
if !strings.HasPrefix(actual, expectedPrefix) {
t.Errorf("Expected %v to begin with prefix %v", actual, expectedPrefix)
}
}

Expand Down