Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #51921 #51969

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/kube-apiserver/app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ go_library(
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion:go_default_library",
"//vendor/k8s.io/apiextensions-apiserver/pkg/cmd/server:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/openapi:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
Expand All @@ -98,10 +97,12 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apiserver:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/controllers/autoregister:go_default_library",
],
)
Expand Down
90 changes: 58 additions & 32 deletions cmd/kube-apiserver/app/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,24 @@ import (
"io/ioutil"
"net/http"
"strings"
"sync"

"github.com/golang/glog"

apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
genericoptions "k8s.io/apiserver/pkg/server/options"
kubeexternalinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
apiregistrationinformers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion"
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
Expand Down Expand Up @@ -104,39 +107,23 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
autoRegistrationController)

aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go autoRegistrationController.Run(5, context.StopCh)
go tprRegistrationController.Run(5, context.StopCh)
go func() {
// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
tprRegistrationController.WaitForInitialSync()
autoRegistrationController.Run(5, context.StopCh)
}()
return nil
})
aggregatorServer.GenericAPIServer.AddHealthzChecks(healthz.NamedCheck("autoregister-completion", func(r *http.Request) error {
items, err := aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices().Lister().List(labels.Everything())
if err != nil {
return err
}

missing := []apiregistration.APIService{}
for _, apiService := range apiServices {
found := false
for _, item := range items {
if item.Name != apiService.Name {
continue
}
if apiregistration.IsAPIServiceConditionTrue(item, apiregistration.Available) {
found = true
break
}
}

if !found {
missing = append(missing, *apiService)
}
}

if len(missing) > 0 {
return fmt.Errorf("missing APIService: %v", missing)
}
return nil
}))
aggregatorServer.GenericAPIServer.AddHealthzChecks(
makeAPIServiceAvailableHealthzCheck(
"autoregister-completion",
apiServices,
aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices(),
),
)

return aggregatorServer, nil
}
Expand All @@ -160,6 +147,45 @@ func makeAPIService(gv schema.GroupVersion) *apiregistration.APIService {
}
}

// makeAPIServiceAvailableHealthzCheck returns a healthz check that returns healthy
// once all of the specified services have been observed to be available at least once.
func makeAPIServiceAvailableHealthzCheck(name string, apiServices []*apiregistration.APIService, apiServiceInformer apiregistrationinformers.APIServiceInformer) healthz.HealthzChecker {
// Track the auto-registered API services that have not been observed to be available yet
pendingServiceNamesLock := &sync.RWMutex{}
pendingServiceNames := sets.NewString()
for _, service := range apiServices {
pendingServiceNames.Insert(service.Name)
}

// When an APIService in the list is seen as available, remove it from the pending list
handleAPIServiceChange := func(service *apiregistration.APIService) {
pendingServiceNamesLock.Lock()
defer pendingServiceNamesLock.Unlock()
if !pendingServiceNames.Has(service.Name) {
return
}
if apiregistration.IsAPIServiceConditionTrue(service, apiregistration.Available) {
pendingServiceNames.Delete(service.Name)
}
}

// Watch add/update events for APIServices
apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*apiregistration.APIService)) },
UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*apiregistration.APIService)) },
})

// Don't return healthy until the pending list is empty
return healthz.NamedCheck(name, func(r *http.Request) error {
pendingServiceNamesLock.RLock()
defer pendingServiceNamesLock.RUnlock()
if pendingServiceNames.Len() > 0 {
return fmt.Errorf("missing APIService: %v", pendingServiceNames.List())
}
return nil
})
}

type priority struct {
group int32
version int32
Expand Down Expand Up @@ -202,7 +228,7 @@ func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget,
for _, curr := range delegateAPIServer.ListedPaths() {
if curr == "/api/v1" {
apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
registration.AddAPIServiceToSync(apiService)
registration.AddAPIServiceToSyncOnStart(apiService)
apiServices = append(apiServices, apiService)
continue
}
Expand All @@ -220,7 +246,7 @@ func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget,
if apiService == nil {
continue
}
registration.AddAPIServiceToSync(apiService)
registration.AddAPIServiceToSyncOnStart(apiService)
apiServices = append(apiServices, apiService)
}

Expand Down
41 changes: 39 additions & 2 deletions pkg/master/thirdparty/tprregistration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type tprRegistrationController struct {

syncHandler func(groupVersion schema.GroupVersion) error

syncedInitialSet chan struct{}

// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors
// this is actually keyed by a groupVersion
queue workqueue.RateLimitingInterface
Expand All @@ -75,7 +77,8 @@ func NewAutoRegistrationController(tprInformer informers.ThirdPartyResourceInfor
crdLister: crdinformer.Lister(),
crdSynced: crdinformer.Informer().HasSynced,
apiServiceRegistration: apiServiceRegistration,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tpr-autoregister"),
syncedInitialSet: make(chan struct{}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tpr-autoregister"),
}
c.syncHandler = c.handleVersionUpdate

Expand Down Expand Up @@ -145,10 +148,39 @@ func (c *tprRegistrationController) Run(threadiness int, stopCh <-chan struct{})
defer glog.Infof("Shutting down tpr-autoregister controller")

// wait for your secondary caches to fill before starting your work
if !controller.WaitForCacheSync("tpr-autoregister", stopCh, c.tprSynced) {
if !controller.WaitForCacheSync("tpr-autoregister", stopCh, c.tprSynced, c.crdSynced) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiting for crdSynced was missing from the 1.7 controller previously

return
}

// process each tpr in the list once
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was only in the 1.7 version

if tprs, err := c.tprLister.List(labels.Everything()); err != nil {
utilruntime.HandleError(err)
} else {
for _, tpr := range tprs {
_, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(tpr)
if err != nil {
utilruntime.HandleError(err)
continue
}
for _, version := range tpr.Versions {
if err := c.syncHandler(schema.GroupVersion{Group: group, Version: version.Name}); err != nil {
utilruntime.HandleError(err)
}
}
}
}
// process each item in the list once
if crds, err := c.crdLister.List(labels.Everything()); err != nil {
utilruntime.HandleError(err)
} else {
for _, crd := range crds {
if err := c.syncHandler(schema.GroupVersion{Group: crd.Spec.Group, Version: crd.Spec.Version}); err != nil {
utilruntime.HandleError(err)
}
}
}
close(c.syncedInitialSet)

// start up your worker threads based on threadiness. Some controllers have multiple kinds of workers
for i := 0; i < threadiness; i++ {
// runWorker will loop until "something bad" happens. The .Until will then rekick the worker
Expand All @@ -160,6 +192,11 @@ func (c *tprRegistrationController) Run(threadiness int, stopCh <-chan struct{})
<-stopCh
}

// WaitForInitialSync blocks until the initial set of CRD resources has been processed
func (c *tprRegistrationController) WaitForInitialSync() {
<-c.syncedInitialSet
}

func (c *tprRegistrationController) runWorker() {
// hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work
// available, so we don't worry about secondary waits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sort"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

Expand Down Expand Up @@ -85,6 +86,17 @@ func APIServiceNameToGroupVersion(apiServiceName string) schema.GroupVersion {
return schema.GroupVersion{Group: tokens[1], Version: tokens[0]}
}

// NewLocalAvailableAPIServiceCondition returns a condition for an available local APIService.
func NewLocalAvailableAPIServiceCondition() APIServiceCondition {
return APIServiceCondition{
Type: Available,
Status: ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: "Local",
Message: "Local APIServices are always available",
}
}

// SetAPIServiceCondition sets the status condition. It either overwrites the existing one or
// creates a new one
func SetAPIServiceCondition(apiService *APIService, newCondition APIServiceCondition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
Expand Down