Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #32275 #32269 #32354 #32456

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/api/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,11 @@ func ValidateObjectMetaUpdate(newMeta, oldMeta *api.ObjectMeta, fldPath *field.P
allErrs = append(allErrs, field.Invalid(fldPath.Child("deletionTimestamp"), newMeta.DeletionTimestamp, "field is immutable; may only be changed via deletion"))
}

// Finalizers cannot be added if the object is already being deleted.
if oldMeta.DeletionTimestamp != nil {
allErrs = append(allErrs, ValidateNoNewFinalizers(newMeta.Finalizers, oldMeta.Finalizers, fldPath.Child("finalizers"))...)
}

// Reject updates that don't specify a resource version
if len(newMeta.ResourceVersion) == 0 {
allErrs = append(allErrs, field.Invalid(fldPath.Child("resourceVersion"), newMeta.ResourceVersion, "must be specified for an update"))
Expand All @@ -461,6 +466,16 @@ func ValidateObjectMetaUpdate(newMeta, oldMeta *api.ObjectMeta, fldPath *field.P
return allErrs
}

func ValidateNoNewFinalizers(newFinalizers []string, oldFinalizers []string, fldPath *field.Path) field.ErrorList {
const newFinalizersErrorMsg string = `no new finalizers can be added if the object is being deleted`
allErrs := field.ErrorList{}
extra := sets.NewString(newFinalizers...).Difference(sets.NewString(oldFinalizers...))
if len(extra) != 0 {
allErrs = append(allErrs, field.Forbidden(fldPath, fmt.Sprintf("no new finalizers can be added if the object is being deleted, found new finalizers %#v", extra.List())))
}
return allErrs
}

func validateVolumes(volumes []api.Volume, fldPath *field.Path) (sets.String, field.ErrorList) {
allErrs := field.ErrorList{}

Expand Down
39 changes: 39 additions & 0 deletions pkg/api/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,45 @@ func TestValidateObjectMetaUpdateIgnoresCreationTimestamp(t *testing.T) {
}
}

func TestValidateFinalizersUpdate(t *testing.T) {
testcases := map[string]struct {
Old api.ObjectMeta
New api.ObjectMeta
ExpectedErr string
}{
"invalid adding finalizers": {
Old: api.ObjectMeta{Name: "test", ResourceVersion: "1", DeletionTimestamp: &unversioned.Time{}, Finalizers: []string{"x/a"}},
New: api.ObjectMeta{Name: "test", ResourceVersion: "1", DeletionTimestamp: &unversioned.Time{}, Finalizers: []string{"x/a", "y/b"}},
ExpectedErr: "y/b",
},
"invalid changing finalizers": {
Old: api.ObjectMeta{Name: "test", ResourceVersion: "1", DeletionTimestamp: &unversioned.Time{}, Finalizers: []string{"x/a"}},
New: api.ObjectMeta{Name: "test", ResourceVersion: "1", DeletionTimestamp: &unversioned.Time{}, Finalizers: []string{"x/b"}},
ExpectedErr: "x/b",
},
"valid removing finalizers": {
Old: api.ObjectMeta{Name: "test", ResourceVersion: "1", DeletionTimestamp: &unversioned.Time{}, Finalizers: []string{"x/a", "y/b"}},
New: api.ObjectMeta{Name: "test", ResourceVersion: "1", DeletionTimestamp: &unversioned.Time{}, Finalizers: []string{"x/a"}},
ExpectedErr: "",
},
"valid adding finalizers for objects not being deleted": {
Old: api.ObjectMeta{Name: "test", ResourceVersion: "1", Finalizers: []string{"x/a"}},
New: api.ObjectMeta{Name: "test", ResourceVersion: "1", Finalizers: []string{"x/a", "y/b"}},
ExpectedErr: "",
},
}
for name, tc := range testcases {
errs := ValidateObjectMetaUpdate(&tc.New, &tc.Old, field.NewPath("field"))
if len(errs) == 0 {
if len(tc.ExpectedErr) != 0 {
t.Errorf("case: %q, expected error to contain %q", name, tc.ExpectedErr)
}
} else if e, a := tc.ExpectedErr, errs.ToAggregate().Error(); !strings.Contains(a, e) {
t.Errorf("case: %q, expected error to contain %q, got error %q", name, e, a)
}
}
}

func TestValidateObjectMetaUpdatePreventsDeletionFieldMutation(t *testing.T) {
now := unversioned.NewTime(time.Unix(1000, 0).UTC())
later := unversioned.NewTime(time.Unix(2000, 0).UTC())
Expand Down
40 changes: 36 additions & 4 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ type Cacher struct {
watcherIdx int
watchers indexedWatchers

// Incoming events that should be dispatched to watchers.
incoming chan watchCacheEvent

// Handling graceful termination.
stopLock sync.RWMutex
stopped bool
Expand Down Expand Up @@ -197,6 +200,8 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
allWatchers: make(map[int]*cacheWatcher),
valueWatchers: make(map[string]watchersMap),
},
// TODO: Figure out the correct value for the buffer size.
incoming: make(chan watchCacheEvent, 100),
// We need to (potentially) stop both:
// - wait.Until go-routine
// - reflector.ListAndWatch
Expand All @@ -205,6 +210,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
stopCh: make(chan struct{}),
}
watchCache.SetOnEvent(cacher.processEvent)
go cacher.dispatchEvents()

stopCh := cacher.stopCh
cacher.stopWg.Add(1)
Expand Down Expand Up @@ -403,8 +409,32 @@ func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
return result, len(result) > 0
}

// TODO: Most probably splitting this method to a separate thread will visibily
// improve throughput of our watch machinery. So what we should do is to:
// - OnEvent handler simply put an element to channel
// - processEvent be another goroutine processing events from that channel
// Additionally, if we make this channel buffered, cacher will be more resistant
// to single watchers being slow - see cacheWatcher::add method.
func (c *Cacher) processEvent(event watchCacheEvent) {
triggerValues, supported := c.triggerValues(&event)
c.incoming <- event
}

func (c *Cacher) dispatchEvents() {
for {
select {
case event, ok := <-c.incoming:
if !ok {
return
}
c.dispatchEvent(&event)
case <-c.stopCh:
return
}
}
}

func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
triggerValues, supported := c.triggerValues(event)

c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -608,17 +638,18 @@ func (c *cacheWatcher) stop() {

var timerPool sync.Pool

func (c *cacheWatcher) add(event watchCacheEvent) {
func (c *cacheWatcher) add(event *watchCacheEvent) {
// Try to send the event immediately, without blocking.
select {
case c.input <- event:
case c.input <- *event:
return
default:
}

// OK, block sending, but only for up to 5 seconds.
// cacheWatcher.add is called very often, so arrange
// to reuse timers instead of constantly allocating.
startTime := time.Now()
const timeout = 5 * time.Second
t, ok := timerPool.Get().(*time.Timer)
if ok {
Expand All @@ -629,7 +660,7 @@ func (c *cacheWatcher) add(event watchCacheEvent) {
defer timerPool.Put(t)

select {
case c.input <- event:
case c.input <- *event:
stopped := t.Stop()
if !stopped {
// Consume triggered (but not yet received) timer event
Expand All @@ -643,6 +674,7 @@ func (c *cacheWatcher) add(event watchCacheEvent) {
c.forget(false)
c.stop()
}
glog.V(2).Infof("cacheWatcher add function blocked processing for %v", time.Since(startTime))
}

func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
Expand Down
17 changes: 11 additions & 6 deletions pkg/storage/etcd/etcd_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package etcd
import (
"fmt"
"net/http"
"reflect"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -107,6 +108,10 @@ type etcdWatcher struct {
// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)

// HighWaterMarks for performance debugging.
incomingHWM HighWaterMark
outgoingHWM HighWaterMark

cache etcdCache
}

Expand Down Expand Up @@ -150,6 +155,10 @@ func newEtcdWatcher(
cancel: nil,
}
w.emit = func(e watch.Event) {
if curLen := int64(len(w.outgoing)); w.outgoingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(1).Infof("watch (%v): %v objects queued in outgoing channel.", reflect.TypeOf(e.Object).String(), curLen)
}
// Give up on user stop, without this we leak a lot of goroutines in tests.
select {
case w.outgoing <- e:
Expand Down Expand Up @@ -262,10 +271,6 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming
incoming <- &copied
}

var (
watchChannelHWM HighWaterMark
)

// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
// called as a goroutine.
func (w *etcdWatcher) translate() {
Expand Down Expand Up @@ -308,9 +313,9 @@ func (w *etcdWatcher) translate() {
return
case res, ok := <-w.etcdIncoming:
if ok {
if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Update(curLen) {
if curLen := int64(len(w.etcdIncoming)); w.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(2).Infof("watch: %v objects queued in channel.", curLen)
glog.V(1).Infof("watch: %v objects queued in incoming channel.", curLen)
}
w.sendResult(res)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/etcd3/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
if res == nil {
continue
}
if len(wc.resultChan) == outgoingBufSize {
glog.Warningf("Fast watcher, slow processing. Number of buffered events: %d."+
"Probably caused by slow dispatching events to watchers", outgoingBufSize)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
Expand Down Expand Up @@ -300,7 +304,7 @@ func (wc *watchChan) sendError(err error) {

func (wc *watchChan) sendEvent(e *event) {
if len(wc.incomingEventChan) == incomingBufSize {
glog.V(2).Infof("Fast watcher, slow processing. Number of buffered events: %d."+
glog.Warningf("Fast watcher, slow processing. Number of buffered events: %d."+
"Probably caused by slow decoding, user not receiving fast, or other processing logic",
incomingBufSize)
}
Expand Down