Skip to content

Commit

Permalink
UPSTREAM: 93286: wait for apiservices on startup
Browse files Browse the repository at this point in the history
OpenShift-Rebase-Source: 5a2488c
  • Loading branch information
deads2k authored and sairameshv committed Dec 3, 2023
1 parent 2302085 commit 365ce8d
Showing 1 changed file with 41 additions and 3 deletions.
44 changes: 41 additions & 3 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
Expand Up @@ -20,11 +20,14 @@ import (
"context"
"fmt"
"net/http"
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
Expand All @@ -36,8 +39,9 @@ import (
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/version"
"k8s.io/client-go/transport"
"k8s.io/component-base/version"
"k8s.io/klog/v2"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
Expand Down Expand Up @@ -342,6 +346,33 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
go availableController.Run(5, context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHook("apiservice-wait-for-first-sync", func(context genericapiserver.PostStartHookContext) error {
// when the aggregator first starts, it should make sure that it has proxy handlers for all the known good API services at this time
// we only need to do this once.
err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
// fix race
handledAPIServices := sets.StringKeySet(s.proxyHandlers)
apiservices, err := s.lister.List(labels.Everything())
if err != nil {
return false, err
}
expectedAPIServices := sets.NewString()
for _, apiservice := range apiservices {
if v1helper.IsAPIServiceConditionTrue(apiservice, v1.Available) {
expectedAPIServices.Insert(apiservice.Name)
}
}

notYetHandledAPIServices := expectedAPIServices.Difference(handledAPIServices)
if len(notYetHandledAPIServices) == 0 {
return true, nil
}
klog.Infof("still waiting on handling APIServices: %v", strings.Join(notYetHandledAPIServices.List(), ","))

return false, nil
}, context.StopCh)
return err
})

if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
Expand Down Expand Up @@ -506,7 +537,11 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
}
proxyHandler.updateAPIService(apiService)
if s.openAPIAggregationController != nil {
s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
// this is calling a controller. It should already handle being async.
go func() {
defer utilruntime.HandleCrash()
s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
}()
}
if s.openAPIV3AggregationController != nil {
s.openAPIV3AggregationController.AddAPIService(proxyHandler, apiService)
Expand All @@ -515,7 +550,10 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
s.discoveryAggregationController.AddAPIService(apiService, proxyHandler)
}

s.proxyHandlers[apiService.Name] = proxyHandler
// we want to update the registration bit last after all the pieces are wired together
defer func() {
s.proxyHandlers[apiService.Name] = proxyHandler
}()
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)

Expand Down

0 comments on commit 365ce8d

Please sign in to comment.