Skip to content

Commit

Permalink
UPSTREAM: 93286: wait for apiservices on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k authored and damemi committed Dec 6, 2021
1 parent 55953e0 commit dedf56d
Showing 1 changed file with 40 additions and 2 deletions.
42 changes: 40 additions & 2 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ import (
"context"
"fmt"
"net/http"
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/features"
Expand All @@ -35,6 +38,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/version"
"k8s.io/klog/v2"
openapicommon "k8s.io/kube-openapi/pkg/common"

"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand Down Expand Up @@ -289,6 +293,33 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
go availableController.Run(5, context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHook("apiservice-wait-for-first-sync", func(context genericapiserver.PostStartHookContext) error {
// when the aggregator first starts, it should make sure that it has proxy handlers for all the known good API services at this time
// we only need to do this once.
err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
// fix race
handledAPIServices := sets.StringKeySet(s.proxyHandlers)
apiservices, err := s.lister.List(labels.Everything())
if err != nil {
return false, err
}
expectedAPIServices := sets.NewString()
for _, apiservice := range apiservices {
if v1helper.IsAPIServiceConditionTrue(apiservice, v1.Available) {
expectedAPIServices.Insert(apiservice.Name)
}
}

notYetHandledAPIServices := expectedAPIServices.Difference(handledAPIServices)
if len(notYetHandledAPIServices) == 0 {
return true, nil
}
klog.Infof("still waiting on handling APIServices: %v", strings.Join(notYetHandledAPIServices.List(), ","))

return false, nil
}, context.StopCh)
return err
})

if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
Expand Down Expand Up @@ -425,12 +456,19 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
}
proxyHandler.updateAPIService(apiService)
if s.openAPIAggregationController != nil {
s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
// this is calling a controller. It should already handle being async.
go func() {
defer utilruntime.HandleCrash()
s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
}()
}
if s.openAPIV3AggregationController != nil {
s.openAPIV3AggregationController.AddAPIService(proxyHandler, apiService)
}
s.proxyHandlers[apiService.Name] = proxyHandler
// we want to update the registration bit last after all the pieces are wired together
defer func() {
s.proxyHandlers[apiService.Name] = proxyHandler
}()
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)

Expand Down

0 comments on commit dedf56d

Please sign in to comment.