Skip to content

Commit

Permalink
Add client side event rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
derekwaynecarr committed Jul 21, 2017
1 parent ab40f52 commit b62fa1d
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 15 deletions.
1 change: 1 addition & 0 deletions staging/src/k8s.io/client-go/tools/record/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/reference:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
21 changes: 12 additions & 9 deletions staging/src/k8s.io/client-go/tools/record/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,8 @@ func TestWriteEventError(t *testing.T) {
},
}

eventCorrelator := NewEventCorrelator(clock.RealClock{})
clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
eventCorrelator := NewEventCorrelator(&clock)
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))

for caseName, ent := range table {
Expand All @@ -435,7 +436,8 @@ func TestWriteEventError(t *testing.T) {
}

func TestUpdateExpiredEvent(t *testing.T) {
eventCorrelator := NewEventCorrelator(clock.RealClock{})
clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
eventCorrelator := NewEventCorrelator(&clock)
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))

var createdEvent *v1.Event
Expand Down Expand Up @@ -497,14 +499,15 @@ func TestLotsOfEvents(t *testing.T) {
loggerCalled <- struct{}{}
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "eventTest"})
ref := &v1.ObjectReference{
Kind: "Pod",
Name: "foo",
Namespace: "baz",
UID: "bar",
APIVersion: "version",
}
for i := 0; i < maxQueuedEvents; i++ {
// we want a unique object to stop spam filtering
ref := &v1.ObjectReference{
Kind: "Pod",
Name: fmt.Sprintf("foo-%v", i),
Namespace: "baz",
UID: "bar",
APIVersion: "version",
}
// we need to vary the reason to prevent aggregation
go recorder.Eventf(ref, v1.EventTypeNormal, "Reason-"+string(i), strconv.Itoa(i))
}
Expand Down
98 changes: 94 additions & 4 deletions staging/src/k8s.io/client-go/tools/record/events_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/util/flowcontrol"
)

const (
Expand All @@ -39,6 +40,13 @@ const (
// more than 10 times in a 10 minute period, aggregate the event
defaultAggregateMaxEvents = 10
defaultAggregateIntervalInSeconds = 600

// by default, allow a source to send 25 events about an object
// but control the refill rate to 1 new event every 5 minutes
// this helps control the long-tail of events for things that are always
// unhealthy
defaultSpamBurst = 25
defaultSpamQPS = 1. / 300.
)

// getEventKey builds unique event key based on source, involvedObject, reason, message
Expand All @@ -59,6 +67,20 @@ func getEventKey(event *v1.Event) string {
"")
}

// getSpamKey builds unique event key based on source, involvedObject
func getSpamKey(event *v1.Event) string {
return strings.Join([]string{
event.Source.Component,
event.Source.Host,
event.InvolvedObject.Kind,
event.InvolvedObject.Namespace,
event.InvolvedObject.Name,
string(event.InvolvedObject.UID),
event.InvolvedObject.APIVersion,
},
"")
}

// EventFilterFunc is a function that returns true if the event should be skipped
type EventFilterFunc func(event *v1.Event) bool

Expand All @@ -67,6 +89,69 @@ func DefaultEventFilterFunc(event *v1.Event) bool {
return false
}

// EventSourceObjectSpamFilter is responsible for throttling
// the amount of events a source and object can produce.
type EventSourceObjectSpamFilter struct {
sync.RWMutex

// the cache that manages last synced state
cache *lru.Cache

// burst is the amount of events we allow per source + object
burst int

// qps is the refill rate of the token bucket in queries per second
qps float32

// clock is used to allow for testing over a time interval
clock clock.Clock
}

// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter {
return &EventSourceObjectSpamFilter{
cache: lru.New(lruCacheSize),
burst: burst,
qps: qps,
clock: clock,
}
}

// spamRecord holds data used to perform spam filtering decisions.
type spamRecord struct {
// rateLimiter controls the rate of events about this object
rateLimiter flowcontrol.RateLimiter
}

// Filter controls that a given source+object are not exceeding the allowed rate.
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
var record spamRecord

// controls our cached information about this event (source+object)
eventKey := getSpamKey(event)

// do we have a record of similar events in our cache?
f.Lock()
defer f.Unlock()
value, found := f.cache.Get(eventKey)
if found {
record = value.(spamRecord)
}

// verify we have a rate limiter for this record
if record.rateLimiter == nil {
record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
}

// ensure we have available rate
filter := !record.rateLimiter.TryAccept()

// update the cache
f.cache.Add(eventKey, record)

return filter
}

// EventAggregatorKeyFunc is responsible for grouping events for aggregation
// It returns a tuple of the following:
// aggregateKey - key the identifies the aggregate group to bucket this event
Expand Down Expand Up @@ -337,18 +422,20 @@ type EventCorrelateResult struct {
// prior to interacting with the API server to record the event.
//
// The default behavior is as follows:
// * No events are filtered from being recorded
// * Aggregation is performed if a similar event is recorded 10 times in a
// in a 10 minute rolling interval. A similar event is an event that varies only by
// the Event.Message field. Rather than recording the precise event, aggregation
// will create a new event whose message reports that it has combined events with
// the same reason.
// * Events are incrementally counted if the exact same event is encountered multiple
// times.
// * A source may burst 25 events about an object, but has a refill rate budget
// per object of 1 event every 5 minutes to control long-tail of spam.
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
cacheSize := maxLruCacheEntries
spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock)
return &EventCorrelator{
filterFunc: DefaultEventFilterFunc,
filterFunc: spamFilter.Filter,
aggregator: NewEventAggregator(
cacheSize,
EventAggregatorByReasonFunc,
Expand All @@ -363,11 +450,14 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator {

// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
if c.filterFunc(newEvent) {
return &EventCorrelateResult{Skip: true}, nil
if newEvent == nil {
return nil, fmt.Errorf("event is nil")
}
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
if c.filterFunc(observedEvent) {
return &EventCorrelateResult{Skip: true}, nil
}
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

Expand Down
26 changes: 24 additions & 2 deletions staging/src/k8s.io/client-go/tools/record/events_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func TestEventCorrelator(t *testing.T) {
newEvent v1.Event
expectedEvent v1.Event
intervalSeconds int
expectedSkip bool
}{
"create-a-single-event": {
previousEvents: []v1.Event{},
Expand All @@ -198,7 +199,13 @@ func TestEventCorrelator(t *testing.T) {
previousEvents: makeEvents(defaultAggregateMaxEvents, duplicateEvent),
newEvent: duplicateEvent,
expectedEvent: setCount(duplicateEvent, defaultAggregateMaxEvents+1),
intervalSeconds: 5,
intervalSeconds: 30, // larger interval induces aggregation but not spam.
},
"the-same-event-is-spam-if-happens-too-frequently": {
previousEvents: makeEvents(defaultSpamBurst+1, duplicateEvent),
newEvent: duplicateEvent,
expectedSkip: true,
intervalSeconds: 1,
},
"create-many-unique-events": {
previousEvents: makeUniqueEvents(30),
Expand Down Expand Up @@ -245,7 +252,10 @@ func TestEventCorrelator(t *testing.T) {
if err != nil {
t.Errorf("scenario %v: unexpected error playing back prevEvents %v", testScenario, err)
}
correlator.UpdateState(result.Event)
// if we are skipping the event, we can avoid updating state
if !result.Skip {
correlator.UpdateState(result.Event)
}
}

// update the input to current clock value
Expand All @@ -257,6 +267,18 @@ func TestEventCorrelator(t *testing.T) {
t.Errorf("scenario %v: unexpected error correlating input event %v", testScenario, err)
}

// verify we did not get skip from filter function unexpectedly...
if result.Skip != testInput.expectedSkip {
t.Errorf("scenario %v: expected skip %v, but got %v", testScenario, testInput.expectedSkip, result.Skip)
continue
}

// we wanted to actually skip, so no event is needed to validate
if testInput.expectedSkip {
continue
}

// validate event
_, err = validateEvent(testScenario, result.Event, &testInput.expectedEvent, t)
if err != nil {
t.Errorf("scenario %v: unexpected error validating result %v", testScenario, err)
Expand Down

0 comments on commit b62fa1d

Please sign in to comment.