diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go index 71ff2220dba37..e63e03212f8de 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go @@ -194,7 +194,7 @@ type storageVersionUpdate struct { // channel is closed. processedCh <-chan struct{} - // processedCh is closed by the storage version manager when it + // errCh is closed by the storage version manager when it // encounters an error while trying to update a storage version. // The API server will block the serve 503 for CR write requests if // this channel is closed. @@ -456,11 +456,6 @@ func (r *crdHandler) serveResource(w http.ResponseWriter, req *http.Request, req if justCreated { time.Sleep(2 * time.Second) } - if err := crdInfo.waitForStorageVersionUpdate(req.Context()); err != nil { - err := apierrors.NewServiceUnavailable(err.Error()) - responsewriters.ErrorNegotiated(err, Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req) - return nil - } // Get the latest CRD to make sure it's not terminating or deleted crd, err := r.crdLister.Get(crdName) if err != nil || crd.UID != crdUID || apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Terminating) { @@ -469,6 +464,12 @@ func (r *crdHandler) serveResource(w http.ResponseWriter, req *http.Request, req responsewriters.ErrorNegotiated(err, Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req) return nil } + + if err := crdInfo.waitForStorageVersionUpdate(req.Context()); err != nil { + err := apierrors.NewServiceUnavailable(err.Error()) + responsewriters.ErrorNegotiated(err, Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req) + return nil + } return handlers.CreateResource(storage, requestScope, r.admission) case "update": if err := crdInfo.waitForStorageVersionUpdate(req.Context()); err != nil { @@ -590,12 +591,10 @@ func (r *crdHandler) createCustomResourceDefinition(obj interface{}) { } if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) && utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) { - processedCh := make(chan struct{}) - errCh := make(chan struct{}) ctx := apirequest.NewContext() // customStorageLock will be released even if UpdateStorageVersion() fails. This is safe // since we are deleting the old storage here and not creating a new one. - err := r.storageVersionManager.UpdateStorageVersion(ctx, crd, tearDownFinishedCh, processedCh, errCh) + err := r.storageVersionManager.UpdateStorageVersion(ctx, crd, tearDownFinishedCh, nil, nil) if err != nil { klog.Errorf("error updating storage version for %v: %v", crd, err) } @@ -644,10 +643,8 @@ func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{}) // Update storage version with the latest info in the watch event if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) && utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) { - processedCh := make(chan struct{}) - errCh := make(chan struct{}) ctx := apirequest.NewContext() - err := r.storageVersionManager.UpdateStorageVersion(ctx, newCRD, tearDownFinishedCh, processedCh, errCh) + err := r.storageVersionManager.UpdateStorageVersion(ctx, newCRD, tearDownFinishedCh, nil, nil) if err != nil { klog.Errorf("error updating storage version for %v: %v", newCRD, err) } @@ -1187,27 +1184,13 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) { var processedCh, errCh chan struct{} ctx := apirequest.NewContext() - var retry int - retries := 3 - - done := false - for retry < retries { - // spawn storage version update in background and use channels to make handlers wait - processedCh = make(chan struct{}) - errCh = make(chan struct{}) - err = r.storageVersionManager.UpdateStorageVersion(ctx, crd, nil, processedCh, errCh) - select { - case <-errCh: - klog.Errorf("retry %d, failed to update storage version for %v : %v", retry, crd, err) - retry++ - continue - case <-processedCh: - done = true - } - if done { - break - } + // spawn storage version update in background and use channels to make handlers wait + processedCh = make(chan struct{}) + errCh = make(chan struct{}) + err = r.storageVersionManager.UpdateStorageVersion(ctx, crd, nil, processedCh, errCh) + if err != nil { + klog.Errorf("error updating storage version for %v: %v", crd, err) } ret.storageVersionUpdate = &storageVersionUpdate{ diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/storageversion/manager.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/storageversion/manager.go index 04091905db524..78d8aef5269e7 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/storageversion/manager.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/storageversion/manager.go @@ -81,10 +81,14 @@ func (m *manager) UpdateStorageVersion(ctx context.Context, crd *apiextensionsv1 case <-waitCh: done = true case <-ctx.Done(): - close(errCh) + if errCh != nil { + close(errCh) + } return fmt.Errorf("aborted updating CRD storage version update: %v", ctx.Err()) case <-time.After(1 * time.Minute): - close(errCh) + if errCh != nil { + close(errCh) + } return fmt.Errorf("timeout waiting for waitCh to close before proceeding with storageversion update for %v", crd) } if done { @@ -95,7 +99,9 @@ func (m *manager) UpdateStorageVersion(ctx context.Context, crd *apiextensionsv1 if err := m.updateCRDStorageVersion(ctx, crd); err != nil { utilruntime.HandleError(err) - close(errCh) + if errCh != nil { + close(errCh) + } return fmt.Errorf("error while updating storage version for crd %v: %v", crd, err) } // close processCh after the update is done diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go b/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go index 79c66a6a12eed..f6c488c80160d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go @@ -25,6 +25,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" ) // Client has the methods required to update the storage version. @@ -125,13 +126,25 @@ func setStatusCondition(conditions *[]v1alpha1.StorageVersionCondition, newCondi // UpdateStorageVersionFor updates the storage version object for the resource. func UpdateStorageVersionFor(ctx context.Context, c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string, servedVersions []string, postProcessFunc processStorageVersionFunc) error { - err := singleUpdate(ctx, c, apiserverID, gr, encodingVersion, decodableVersions, servedVersions, postProcessFunc) - if err != nil { - time.Sleep(1 * time.Second) - return err + retries := 3 + var retry int + var err error + for retry < retries { + err = singleUpdate(ctx, c, apiserverID, gr, encodingVersion, decodableVersions, servedVersions, postProcessFunc) + if err == nil { + return nil + } + if apierrors.IsAlreadyExists(err) || apierrors.IsConflict(err) { + time.Sleep(1 * time.Second) + continue + } + if err != nil { + klog.Errorf("retry %d, failed to update storage version for %v: %v", retry, gr, err) + retry++ + time.Sleep(1 * time.Second) + } } - - return nil + return err } func singleUpdate(ctx context.Context, c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string, servedVersions []string, postProcessFunc processStorageVersionFunc) error { diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go index 1a41654072eee..bc342165b2150 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go @@ -162,24 +162,20 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler { return } - // no peer endpoints found could mean one of the following things: - // 1. apiservers were found in storage version informer but either they had no endpoint - // lease or their endpoint lease had invalid hostport information. Serve 503 in this case. - // 2. no apiservers were found in storage version informer cache meaning - // this resource is not served by anything in this cluster. Pass request to - // next handler, that should eventually serve 404. + gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version} + + if serviceableByResp.errorFetchingAddressFromLease { + klog.ErrorS(err, "error fetching ip and port of remote server while proxying") + responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r) + return + } + + // no apiservers were found that could serve the request, pass request to + // next handler, that should eventually serve 404 // TODO: maintain locally serviceable GVRs somewhere so that we dont have to // consult the storageversion-informed map for those if len(serviceableByResp.peerEndpoints) == 0 { - gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version} - - if serviceableByResp.errorFetchingAddressFromLease { - klog.ErrorS(err, "error fetching ip and port of remote server while proxying") - responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r) - return - } - klog.Errorf(fmt.Sprintf("GVR %v is not served by anything in this cluster", gvr)) handler.ServeHTTP(w, r) return @@ -207,6 +203,7 @@ func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResou apiservers.Range(func(key, value interface{}) bool { apiserverKey := key.(string) if apiserverKey == localAPIServerId { + response.errorFetchingAddressFromLease = true response.locallyServiceable = true // stop iteration return false @@ -232,10 +229,7 @@ func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResou return true }) - if len(peerServerEndpoints) > 0 { - response.errorFetchingAddressFromLease = false - response.peerEndpoints = peerServerEndpoints - } + response.peerEndpoints = peerServerEndpoints return response, nil } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go index bfd42a955e910..9b1cc9b5eb809 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go @@ -59,8 +59,8 @@ const ( ) type FakeSVMapData struct { - gvr schema.GroupVersionResource - serverIds []string + gvr schema.GroupVersionResource + serverId string } type reconciler struct { @@ -116,7 +116,7 @@ func TestPeerProxy(t *testing.T) { Group: "core", Version: "bar", Resource: "baz"}, - serverIds: []string{}}, + serverId: ""}, }, { desc: "503 if no endpoint fetched from lease", @@ -128,7 +128,7 @@ func TestPeerProxy(t *testing.T) { Group: "core", Version: "foo", Resource: "bar"}, - serverIds: []string{remoteServerId}}, + serverId: remoteServerId}, }, { desc: "200 if locally serviceable", @@ -140,7 +140,7 @@ func TestPeerProxy(t *testing.T) { Group: "core", Version: "foo", Resource: "bar"}, - serverIds: []string{localServerId}}, + serverId: localServerId}, }, { desc: "503 unreachable peer bind address", @@ -152,7 +152,7 @@ func TestPeerProxy(t *testing.T) { Group: "core", Version: "foo", Resource: "bar"}, - serverIds: []string{remoteServerId}}, + serverId: remoteServerId}, reconcilerConfig: reconciler{ do: true, publicIP: "1.2.3.4", @@ -177,7 +177,7 @@ func TestPeerProxy(t *testing.T) { Group: "core", Version: "foo", Resource: "bar"}, - serverIds: []string{remoteServerId}}, + serverId: remoteServerId}, reconcilerConfig: reconciler{ do: true, publicIP: "1.2.3.4", @@ -192,23 +192,6 @@ func TestPeerProxy(t *testing.T) { apiserver_rerouted_request_total{code="503"} 2 `, }, - { - desc: "503 if one apiserver's endpoint lease wasnt found but another valid apiserver was found", - requestPath: "/api/foo/bar", - expectedStatus: http.StatusServiceUnavailable, - informerFinishedSync: true, - svdata: FakeSVMapData{ - gvr: schema.GroupVersionResource{ - Group: "core", - Version: "foo", - Resource: "bar"}, - serverIds: []string{"aggregated-apiserver", remoteServerId}}, - reconcilerConfig: reconciler{ - do: true, - publicIP: "1.2.3.4", - serverId: remoteServerId, - }, - }, } metrics.Register() @@ -307,18 +290,16 @@ func newFakePeerProxyHandler(informerFinishedSync bool, reconciler reconcilers.P } ppI := NewPeerProxyHandler(informerFactory, storageversion.NewDefaultManager(), proxyRoundTripper, id, reconciler, s) if testDataExists(svdata.gvr) { - ppI.addToStorageVersionMap(svdata.gvr, svdata.serverIds) + ppI.addToStorageVersionMap(svdata.gvr, svdata.serverId) } return ppI, nil } -func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverIds []string) { +func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverId string) { apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{}) apiservers := apiserversi.(*sync.Map) - for _, serverId := range serverIds { - if serverId != "" { - apiservers.Store(serverId, true) - } + if serverId != "" { + apiservers.Store(serverId, true) } }