Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
richabanker committed Mar 11, 2024
1 parent 550fd56 commit b4a3ed5
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 89 deletions.
Expand Up @@ -194,7 +194,7 @@ type storageVersionUpdate struct {
// channel is closed.
processedCh <-chan struct{}

// processedCh is closed by the storage version manager when it
// errCh is closed by the storage version manager when it
// encounters an error while trying to update a storage version.
// The API server will block the serve 503 for CR write requests if
// this channel is closed.
Expand Down Expand Up @@ -456,11 +456,6 @@ func (r *crdHandler) serveResource(w http.ResponseWriter, req *http.Request, req
if justCreated {
time.Sleep(2 * time.Second)
}
if err := crdInfo.waitForStorageVersionUpdate(req.Context()); err != nil {
err := apierrors.NewServiceUnavailable(err.Error())
responsewriters.ErrorNegotiated(err, Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
return nil
}
// Get the latest CRD to make sure it's not terminating or deleted
crd, err := r.crdLister.Get(crdName)
if err != nil || crd.UID != crdUID || apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Terminating) {
Expand All @@ -469,6 +464,12 @@ func (r *crdHandler) serveResource(w http.ResponseWriter, req *http.Request, req
responsewriters.ErrorNegotiated(err, Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
return nil
}

if err := crdInfo.waitForStorageVersionUpdate(req.Context()); err != nil {
err := apierrors.NewServiceUnavailable(err.Error())
responsewriters.ErrorNegotiated(err, Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
return nil
}
return handlers.CreateResource(storage, requestScope, r.admission)
case "update":
if err := crdInfo.waitForStorageVersionUpdate(req.Context()); err != nil {
Expand Down Expand Up @@ -590,12 +591,10 @@ func (r *crdHandler) createCustomResourceDefinition(obj interface{}) {
}
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
processedCh := make(chan struct{})
errCh := make(chan struct{})
ctx := apirequest.NewContext()
// customStorageLock will be released even if UpdateStorageVersion() fails. This is safe
// since we are deleting the old storage here and not creating a new one.
err := r.storageVersionManager.UpdateStorageVersion(ctx, crd, tearDownFinishedCh, processedCh, errCh)
err := r.storageVersionManager.UpdateStorageVersion(ctx, crd, tearDownFinishedCh, nil, nil)
if err != nil {
klog.Errorf("error updating storage version for %v: %v", crd, err)
}
Expand Down Expand Up @@ -644,10 +643,8 @@ func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{})
// Update storage version with the latest info in the watch event
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
processedCh := make(chan struct{})
errCh := make(chan struct{})
ctx := apirequest.NewContext()
err := r.storageVersionManager.UpdateStorageVersion(ctx, newCRD, tearDownFinishedCh, processedCh, errCh)
err := r.storageVersionManager.UpdateStorageVersion(ctx, newCRD, tearDownFinishedCh, nil, nil)
if err != nil {
klog.Errorf("error updating storage version for %v: %v", newCRD, err)
}
Expand Down Expand Up @@ -1187,27 +1184,13 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
var processedCh, errCh chan struct{}
ctx := apirequest.NewContext()
var retry int
retries := 3

done := false
for retry < retries {
// spawn storage version update in background and use channels to make handlers wait
processedCh = make(chan struct{})
errCh = make(chan struct{})
err = r.storageVersionManager.UpdateStorageVersion(ctx, crd, nil, processedCh, errCh)
select {
case <-errCh:
klog.Errorf("retry %d, failed to update storage version for %v : %v", retry, crd, err)
retry++
continue
case <-processedCh:
done = true
}

if done {
break
}
// spawn storage version update in background and use channels to make handlers wait
processedCh = make(chan struct{})
errCh = make(chan struct{})
err = r.storageVersionManager.UpdateStorageVersion(ctx, crd, nil, processedCh, errCh)
if err != nil {
klog.Errorf("error updating storage version for %v: %v", crd, err)
}

ret.storageVersionUpdate = &storageVersionUpdate{
Expand Down
Expand Up @@ -81,10 +81,14 @@ func (m *manager) UpdateStorageVersion(ctx context.Context, crd *apiextensionsv1
case <-waitCh:
done = true
case <-ctx.Done():
close(errCh)
if errCh != nil {
close(errCh)
}
return fmt.Errorf("aborted updating CRD storage version update: %v", ctx.Err())
case <-time.After(1 * time.Minute):
close(errCh)
if errCh != nil {
close(errCh)
}
return fmt.Errorf("timeout waiting for waitCh to close before proceeding with storageversion update for %v", crd)
}
if done {
Expand All @@ -95,7 +99,9 @@ func (m *manager) UpdateStorageVersion(ctx context.Context, crd *apiextensionsv1

if err := m.updateCRDStorageVersion(ctx, crd); err != nil {
utilruntime.HandleError(err)
close(errCh)
if errCh != nil {
close(errCh)
}
return fmt.Errorf("error while updating storage version for crd %v: %v", crd, err)
}
// close processCh after the update is done
Expand Down
25 changes: 19 additions & 6 deletions staging/src/k8s.io/apiserver/pkg/storageversion/updater.go
Expand Up @@ -25,6 +25,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
)

// Client has the methods required to update the storage version.
Expand Down Expand Up @@ -125,13 +126,25 @@ func setStatusCondition(conditions *[]v1alpha1.StorageVersionCondition, newCondi

// UpdateStorageVersionFor updates the storage version object for the resource.
func UpdateStorageVersionFor(ctx context.Context, c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string, servedVersions []string, postProcessFunc processStorageVersionFunc) error {
err := singleUpdate(ctx, c, apiserverID, gr, encodingVersion, decodableVersions, servedVersions, postProcessFunc)
if err != nil {
time.Sleep(1 * time.Second)
return err
retries := 3
var retry int
var err error
for retry < retries {
err = singleUpdate(ctx, c, apiserverID, gr, encodingVersion, decodableVersions, servedVersions, postProcessFunc)
if err == nil {
return nil
}
if apierrors.IsAlreadyExists(err) || apierrors.IsConflict(err) {
time.Sleep(1 * time.Second)
continue
}
if err != nil {
klog.Errorf("retry %d, failed to update storage version for %v: %v", retry, gr, err)
retry++
time.Sleep(1 * time.Second)
}
}

return nil
return err
}

func singleUpdate(ctx context.Context, c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string, servedVersions []string, postProcessFunc processStorageVersionFunc) error {
Expand Down
Expand Up @@ -162,24 +162,20 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
return
}

// no peer endpoints found could mean one of the following things:
// 1. apiservers were found in storage version informer but either they had no endpoint
// lease or their endpoint lease had invalid hostport information. Serve 503 in this case.
// 2. no apiservers were found in storage version informer cache meaning
// this resource is not served by anything in this cluster. Pass request to
// next handler, that should eventually serve 404.
gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}

if serviceableByResp.errorFetchingAddressFromLease {
klog.ErrorS(err, "error fetching ip and port of remote server while proxying")
responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r)
return
}

// no apiservers were found that could serve the request, pass request to
// next handler, that should eventually serve 404

// TODO: maintain locally serviceable GVRs somewhere so that we dont have to
// consult the storageversion-informed map for those
if len(serviceableByResp.peerEndpoints) == 0 {
gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}

if serviceableByResp.errorFetchingAddressFromLease {
klog.ErrorS(err, "error fetching ip and port of remote server while proxying")
responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r)
return
}

klog.Errorf(fmt.Sprintf("GVR %v is not served by anything in this cluster", gvr))
handler.ServeHTTP(w, r)
return
Expand Down Expand Up @@ -207,6 +203,7 @@ func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResou
apiservers.Range(func(key, value interface{}) bool {
apiserverKey := key.(string)
if apiserverKey == localAPIServerId {
response.errorFetchingAddressFromLease = true
response.locallyServiceable = true
// stop iteration
return false
Expand All @@ -232,10 +229,7 @@ func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResou
return true
})

if len(peerServerEndpoints) > 0 {
response.errorFetchingAddressFromLease = false
response.peerEndpoints = peerServerEndpoints
}
response.peerEndpoints = peerServerEndpoints
return response, nil
}

Expand Down
Expand Up @@ -59,8 +59,8 @@ const (
)

type FakeSVMapData struct {
gvr schema.GroupVersionResource
serverIds []string
gvr schema.GroupVersionResource
serverId string
}

type reconciler struct {
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestPeerProxy(t *testing.T) {
Group: "core",
Version: "bar",
Resource: "baz"},
serverIds: []string{}},
serverId: ""},
},
{
desc: "503 if no endpoint fetched from lease",
Expand All @@ -128,7 +128,7 @@ func TestPeerProxy(t *testing.T) {
Group: "core",
Version: "foo",
Resource: "bar"},
serverIds: []string{remoteServerId}},
serverId: remoteServerId},
},
{
desc: "200 if locally serviceable",
Expand All @@ -140,7 +140,7 @@ func TestPeerProxy(t *testing.T) {
Group: "core",
Version: "foo",
Resource: "bar"},
serverIds: []string{localServerId}},
serverId: localServerId},
},
{
desc: "503 unreachable peer bind address",
Expand All @@ -152,7 +152,7 @@ func TestPeerProxy(t *testing.T) {
Group: "core",
Version: "foo",
Resource: "bar"},
serverIds: []string{remoteServerId}},
serverId: remoteServerId},
reconcilerConfig: reconciler{
do: true,
publicIP: "1.2.3.4",
Expand All @@ -177,7 +177,7 @@ func TestPeerProxy(t *testing.T) {
Group: "core",
Version: "foo",
Resource: "bar"},
serverIds: []string{remoteServerId}},
serverId: remoteServerId},
reconcilerConfig: reconciler{
do: true,
publicIP: "1.2.3.4",
Expand All @@ -192,23 +192,6 @@ func TestPeerProxy(t *testing.T) {
apiserver_rerouted_request_total{code="503"} 2
`,
},
{
desc: "503 if one apiserver's endpoint lease wasnt found but another valid apiserver was found",
requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverIds: []string{"aggregated-apiserver", remoteServerId}},
reconcilerConfig: reconciler{
do: true,
publicIP: "1.2.3.4",
serverId: remoteServerId,
},
},
}

metrics.Register()
Expand Down Expand Up @@ -307,18 +290,16 @@ func newFakePeerProxyHandler(informerFinishedSync bool, reconciler reconcilers.P
}
ppI := NewPeerProxyHandler(informerFactory, storageversion.NewDefaultManager(), proxyRoundTripper, id, reconciler, s)
if testDataExists(svdata.gvr) {
ppI.addToStorageVersionMap(svdata.gvr, svdata.serverIds)
ppI.addToStorageVersionMap(svdata.gvr, svdata.serverId)
}
return ppI, nil
}

func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverIds []string) {
func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverId string) {
apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{})
apiservers := apiserversi.(*sync.Map)
for _, serverId := range serverIds {
if serverId != "" {
apiservers.Store(serverId, true)
}
if serverId != "" {
apiservers.Store(serverId, true)
}
}

Expand Down

0 comments on commit b4a3ed5

Please sign in to comment.