Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't spawn a goroutine for every event recording #95664

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 28 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/watch/mux.go
Expand Up @@ -74,6 +74,22 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B
return m
}

// NewLongQueueBroadcaster functions nearly identically to NewBroadcaster,
// except that the incoming queue is the same size as the outgoing queues
// (specified by queueLength).
func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, queueLength),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so effectively, the incoming queue length is increased from 25 to 1000, any idea why it was only 25 in the first place? the comment below indicated it should rarely happen, did you see the events dropped due to incoming queue length?

// Buffer the incoming queue a little bit even though it should rarely ever accumulate
// anything, just in case a few events are received in such a short window that
// Broadcaster can't move them onto the watchers' queues fast enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we're no longer blocking, I figured I'd extend it to match the outgoing queue, which currently already has a "drop on full behavior". As much as possible, I wanted to avoid "breaking" something in k/k itself, so I figured matching the outgoing queues was a safe bet.

stopped: make(chan struct{}),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1)
go m.loop()
return m
}

const internalRunFunctionMarker = "internal-do-function"

// a function type we can shoehorn into the queue.
Expand Down Expand Up @@ -198,6 +214,18 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}

// Action distributes the given event among all watchers, or drops it on the floor
// if too many incoming actions are queued up. Returns true if the action was sent,
// false if dropped.
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
select {
case m.incoming <- Event{action, obj}:
return true
default:
return false
}
}

// Shutdown disconnects all watchers (but any queued events will still be distributed).
// You must not call Action or Watch* after calling Shutdown. This call blocks
// until all events have been distributed through the outbound channels. Note
Expand Down
19 changes: 11 additions & 8 deletions staging/src/k8s.io/client-go/tools/record/event.go
Expand Up @@ -155,21 +155,21 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: defaultSleepDuration,
}
}

func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: sleepDuration,
}
}

func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: defaultSleepDuration,
options: options,
}
Expand Down Expand Up @@ -338,11 +338,14 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source

go func() {
// NOTE: events should be a non-blocking operation
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
// NOTE: events should be a non-blocking operation, but we also need to not
// put this in a goroutine, otherwise we'll race to write to a closed channel
// when we go to shut down this broadcaster. Just drop events if we get overloaded,
// and log an error if that happens (we've configured the broadcaster to drop
// outgoing events anyway).
if sent := recorder.ActionOrDrop(watch.Added, event); !sent {
klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is dropping event an error? could it possibly spam logs when too many events are dropped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, we log when we drop from the other end. It's unlikely that this happens, you probably want to know when it does, and we log events anyway.

}
}

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
Expand Down
24 changes: 24 additions & 0 deletions staging/src/k8s.io/client-go/tools/record/event_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -101,6 +102,29 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event)
}
}

func TestNonRacyShutdown(t *testing.T) {
// Attempt to simulate previously racy conditions, and ensure that no race
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test by itself does not assert no race condition. it would rely on the test being run with race detector enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, that's not true -- the "race" is pretty easily detected as a write to a closed channel, which works even w/o the race detector (it just panics)

// occurs: Nominally, calling "Eventf" *followed by* shutdown from the same
// thread should be a safe operation, but it's not if we launch recorder.Action
// in a goroutine.

caster := NewBroadcasterForTests(0)
clock := clock.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, caster, clock)

var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
recorder.Eventf(&v1.ObjectReference{}, v1.EventTypeNormal, "Started", "blah")
}()
}

wg.Wait()
caster.Shutdown()
}

func TestEventf(t *testing.T) {
testPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down