diff --git a/go.mod b/go.mod index 5bd4a104f..a37adf0d4 100644 --- a/go.mod +++ b/go.mod @@ -186,7 +186,7 @@ replace ( k8s.io/api => k8s.io/api v0.27.7 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.27.7 k8s.io/apimachinery => k8s.io/apimachinery v0.27.7 - k8s.io/apiserver => github.com/openshift/kubernetes-apiserver v0.0.0-20231102175227-e58ea8bb8517 + k8s.io/apiserver => github.com/openshift/kubernetes-apiserver v0.0.0-20240112180301-bc101dc8b952 k8s.io/cli-runtime => k8s.io/cli-runtime v0.27.7 k8s.io/client-go => k8s.io/client-go v0.27.7 k8s.io/cloud-provider => k8s.io/cloud-provider v0.27.7 diff --git a/go.sum b/go.sum index 9b300a6f9..e1d8def98 100644 --- a/go.sum +++ b/go.sum @@ -1022,8 +1022,8 @@ github.com/openshift/docker-distribution/v3 v3.0.0-20230613095533-f65dc997445a h github.com/openshift/docker-distribution/v3 v3.0.0-20230613095533-f65dc997445a/go.mod h1:+fqBJ4vPYo4Uu1ZE4d+bUtTLRXfdSL3NvCZIZ9GHv58= github.com/openshift/kubernetes v0.0.0-20231102175105-d61d2d7405d7 h1:aIA+4q6ZDDRJBkfLuw78WlnvLJmLxjHLe/DuLkKqOcE= github.com/openshift/kubernetes v0.0.0-20231102175105-d61d2d7405d7/go.mod h1:9RuASG1ooCxhMDOxMG5nJ0Vw/wap9iylq4Y/Uzn/R/0= -github.com/openshift/kubernetes-apiserver v0.0.0-20231102175227-e58ea8bb8517 h1:6igw70rzurPqW5qqLTiFkpORG0KzNLe+xM/+iwtDm3A= -github.com/openshift/kubernetes-apiserver v0.0.0-20231102175227-e58ea8bb8517/go.mod h1:OrLG9RwCOerutAlo8QJW5EHzUG9Dad7k6rgcDUNSO/w= +github.com/openshift/kubernetes-apiserver v0.0.0-20240112180301-bc101dc8b952 h1:bnMz+L8IjuURKTZHCmCBz3LS1jbTIU67pXGP3yi/sQc= +github.com/openshift/kubernetes-apiserver v0.0.0-20240112180301-bc101dc8b952/go.mod h1:OrLG9RwCOerutAlo8QJW5EHzUG9Dad7k6rgcDUNSO/w= github.com/openshift/library-go v0.0.0-20230808150704-ce4395c85e8c h1:UJjxHFSTcasHxRXtDc3od9p7UJUBJxUKjhZHFyp2uUQ= github.com/openshift/library-go v0.0.0-20230808150704-ce4395c85e8c/go.mod h1:ZFwNwC3opc/7aOvzUbU95zp33Lbxet48h80ryH3p6DY= github.com/openshift/moby-moby v0.0.0-20190308215630-da810a85109d h1:fLITXDjxMSvUDjnXs/zljIWktbST9+Om8XbrmmM7T4I= diff --git a/vendor/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient.go b/vendor/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient.go new file mode 100644 index 000000000..12bd733f3 --- /dev/null +++ b/vendor/k8s.io/apiserver/pkg/storage/etcd3/etcd3retry/retry_etcdclient.go @@ -0,0 +1,201 @@ +package etcd3retry + +import ( + "context" + "time" + + etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "google.golang.org/grpc/codes" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/etcd3/metrics" + "k8s.io/klog/v2" +) + +var defaultRetry = wait.Backoff{ + Duration: 300 * time.Millisecond, + Factor: 2, // double the timeout for every failure + Jitter: 0.1, + Steps: 6, // .3 + .6 + 1.2 + 2.4 + 4.8 = 10ish this lets us smooth out short bumps but not long ones and keeps retry behavior closer. +} + +type retryClient struct { + // embed because we only want to override a few states + storage.Interface +} + +// New returns an etcd3 implementation of storage.Interface. +func NewRetryingEtcdStorage(delegate storage.Interface) storage.Interface { + return &retryClient{Interface: delegate} +} + +// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live +// in seconds (0 means forever). If no error is returned and out is not nil, out will be +// set to the read value from database. +func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return c.Interface.Create(ctx, key, obj, out, ttl) + }) +} + +// Delete removes the specified key and returns the value that existed at that spot. +// If key didn't exist, it will return NotFound storage error. +func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { + return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return c.Interface.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject) + }) +} + +// Watch begins watching the specified key. Events are decoded into API objects, +// and any items selected by 'p' are sent down to returned watch.Interface. +// resourceVersion may be used to specify what version to begin watching, +// which should be the current resourceVersion, and no longer rv+1 +// (e.g. reconnecting without missing any updates). +// If resource version is "0", this interface will get current object at given key +// and send it in an "ADDED" event, before watch starts. +func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + var ret watch.Interface + err := onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + var innerErr error + ret, innerErr = c.Interface.Watch(ctx, key, opts) + return innerErr + }) + return ret, err +} + +// Get unmarshals json found at key into objPtr. On a not found error, will either +// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'. +// Treats empty responses and nil response nodes exactly like a not found error. +// The returned contents may be delayed, but it is guaranteed that they will +// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'. +func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { + return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return c.Interface.Get(ctx, key, opts, objPtr) + }) +} + +// GetList unmarshalls objects found at key into a *List api object (an object +// that satisfies runtime.IsList definition). +// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive' +// is true, 'key' is used as a prefix. +// The returned contents may be delayed, but it is guaranteed that they will +// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'. +func (c *retryClient) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return c.Interface.GetList(ctx, key, opts, listObj) + }) +} + +// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination') +// retrying the update until success if there is index conflict. +// Note that object passed to tryUpdate may change across invocations of tryUpdate() if +// other writers are simultaneously updating it, so tryUpdate() needs to take into account +// the current contents of the object when deciding how the update object should look. +// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false +// else `destination` will be set to the zero value of it's type. +// If the eventual successful invocation of `tryUpdate` returns an output with the same serialized +// contents as the input, it won't perform any update, but instead set `destination` to an object with those +// contents. +// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the +// current version of the object to avoid read operation from storage to get it. +// However, the implementations have to retry in case suggestion is stale. +// +// Example: +// +// s := /* implementation of Interface */ +// err := s.GuaranteedUpdate( +// +// "myKey", &MyType{}, true, preconditions, +// func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { +// // Before each invocation of the user defined function, "input" is reset to +// // current contents for "myKey" in database. +// curr := input.(*MyType) // Guaranteed to succeed. +// +// // Make the modification +// curr.Counter++ +// +// // Return the modified object - return an error to stop iterating. Return +// // a uint64 to alter the TTL on the object, or nil to keep it the same value. +// return cur, nil, nil +// }, cachedExistingObject +// +// ) +func (c *retryClient) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, + preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error { + return onError(ctx, defaultRetry, isRetriableEtcdError, func() error { + return c.Interface.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject) + }) +} + +// isRetriableEtcdError returns true if a retry should be attempted, otherwise false. +// errorLabel is set to a non-empty value that reflects the type of error encountered. +func isRetriableEtcdError(err error) (errorLabel string, retry bool) { + if err != nil { + if etcdError, ok := etcdrpc.Error(err).(etcdrpc.EtcdError); ok { + if etcdError.Code() == codes.Unavailable { + errorLabel = "Unavailable" + retry = true + } + } + } + return +} + +// onError allows the caller to retry fn in case the error returned by fn is retriable +// according to the provided function. backoff defines the maximum retries and the wait +// interval between two retries. +func onError(ctx context.Context, backoff wait.Backoff, retriable func(error) (string, bool), fn func() error) error { + var lastErr error + var lastErrLabel string + var retry bool + var retryCounter int + err := backoffWithRequestContext(ctx, backoff, func() (bool, error) { + err := fn() + if retry { + klog.V(1).Infof("etcd retry - counter: %v, lastErrLabel: %s lastError: %v, error: %v", retryCounter, lastErrLabel, lastErr, err) + metrics.UpdateEtcdRequestRetry(lastErrLabel) + } + if err == nil { + return true, nil + } + + lastErrLabel, retry = retriable(err) + if retry { + lastErr = err + retryCounter++ + return false, nil + } + + return false, err + }) + if err == wait.ErrWaitTimeout && lastErr != nil { + err = lastErr + } + return err +} + +// backoffWithRequestContext works with a request context and a Backoff. It ensures that the retry wait never +// exceeds the deadline specified by the request context. +func backoffWithRequestContext(ctx context.Context, backoff wait.Backoff, condition wait.ConditionFunc) error { + for backoff.Steps > 0 { + if ok, err := condition(); err != nil || ok { + return err + } + + if backoff.Steps == 1 { + break + } + + waitBeforeRetry := backoff.Step() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(waitBeforeRetry): + } + } + + return wait.ErrWaitTimeout +} diff --git a/vendor/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go b/vendor/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go index 6f155c0ad..f50fa023a 100644 --- a/vendor/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go +++ b/vendor/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go @@ -131,6 +131,14 @@ var ( }, []string{"resource"}, ) + etcdRequestRetry = compbasemetrics.NewCounterVec( + &compbasemetrics.CounterOpts{ + Name: "etcd_request_retry_total", + Help: "Etcd request retry total", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"error"}, + ) ) var registerMetrics sync.Once @@ -149,6 +157,7 @@ func Register() { legacyregistry.MustRegister(listStorageNumSelectorEvals) legacyregistry.MustRegister(listStorageNumReturned) legacyregistry.MustRegister(decodeErrorCounts) + legacyregistry.MustRegister(etcdRequestRetry) }) } @@ -199,6 +208,11 @@ func UpdateLeaseObjectCount(count int64) { etcdLeaseObjectCounts.WithLabelValues().Observe(float64(count)) } +// UpdateEtcdRequestRetry sets the etcd_request_retry_total metric. +func UpdateEtcdRequestRetry(errorCode string) { + etcdRequestRetry.WithLabelValues(errorCode).Inc() +} + // RecordListEtcd3Metrics notes various metrics of the cost to serve a LIST request func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned int) { listStorageCount.WithLabelValues(resource).Inc() diff --git a/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 64bcabadb..f79c2717a 100644 --- a/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -46,6 +46,7 @@ import ( "k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3" + "k8s.io/apiserver/pkg/storage/etcd3/etcd3retry" "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/value/encrypt/identity" @@ -436,7 +437,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime. if transformer == nil { transformer = identity.NewEncryptCheckTransformer() } - return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil + return etcd3retry.NewRetryingEtcdStorage(etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig)), destroyFunc, nil } // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the diff --git a/vendor/modules.txt b/vendor/modules.txt index a936b1ca1..748a523d3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1253,7 +1253,7 @@ k8s.io/apimachinery/pkg/watch k8s.io/apimachinery/third_party/forked/golang/json k8s.io/apimachinery/third_party/forked/golang/netutil k8s.io/apimachinery/third_party/forked/golang/reflect -# k8s.io/apiserver v0.27.7 => github.com/openshift/kubernetes-apiserver v0.0.0-20231102175227-e58ea8bb8517 +# k8s.io/apiserver v0.27.7 => github.com/openshift/kubernetes-apiserver v0.0.0-20240112180301-bc101dc8b952 ## explicit; go 1.20 k8s.io/apiserver/pkg/admission k8s.io/apiserver/pkg/admission/cel @@ -1372,6 +1372,7 @@ k8s.io/apiserver/pkg/storage/cacher k8s.io/apiserver/pkg/storage/cacher/metrics k8s.io/apiserver/pkg/storage/errors k8s.io/apiserver/pkg/storage/etcd3 +k8s.io/apiserver/pkg/storage/etcd3/etcd3retry k8s.io/apiserver/pkg/storage/etcd3/metrics k8s.io/apiserver/pkg/storage/etcd3/testing k8s.io/apiserver/pkg/storage/etcd3/testserver @@ -2234,7 +2235,7 @@ sigs.k8s.io/yaml # k8s.io/api => k8s.io/api v0.27.7 # k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.27.7 # k8s.io/apimachinery => k8s.io/apimachinery v0.27.7 -# k8s.io/apiserver => github.com/openshift/kubernetes-apiserver v0.0.0-20231102175227-e58ea8bb8517 +# k8s.io/apiserver => github.com/openshift/kubernetes-apiserver v0.0.0-20240112180301-bc101dc8b952 # k8s.io/cli-runtime => k8s.io/cli-runtime v0.27.7 # k8s.io/client-go => k8s.io/client-go v0.27.7 # k8s.io/cloud-provider => k8s.io/cloud-provider v0.27.7