Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve APIService auto-registration for HA/upgrade scenarios #51921

Merged
merged 4 commits into from Sep 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/kube-apiserver/app/BUILD
Expand Up @@ -50,7 +50,6 @@ go_library(
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion:go_default_library",
"//vendor/k8s.io/apiextensions-apiserver/pkg/cmd/server:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
Expand All @@ -68,10 +67,12 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/storage/etcd3/preflight:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apiserver:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/controllers/autoregister:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/common:go_default_library",
],
Expand Down
90 changes: 58 additions & 32 deletions cmd/kube-apiserver/app/aggregator.go
Expand Up @@ -24,21 +24,24 @@ import (
"io/ioutil"
"net/http"
"strings"
"sync"

"github.com/golang/glog"

apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
genericoptions "k8s.io/apiserver/pkg/server/options"
kubeexternalinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion"
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/master/controller/crdregistration"
Expand Down Expand Up @@ -102,39 +105,23 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
autoRegistrationController)

aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go autoRegistrationController.Run(5, context.StopCh)
go crdRegistrationController.Run(5, context.StopCh)
go func() {
// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
crdRegistrationController.WaitForInitialSync()
autoRegistrationController.Run(5, context.StopCh)
}()
return nil
})
aggregatorServer.GenericAPIServer.AddHealthzChecks(healthz.NamedCheck("autoregister-completion", func(r *http.Request) error {
items, err := aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices().Lister().List(labels.Everything())
if err != nil {
return err
}

missing := []apiregistration.APIService{}
for _, apiService := range apiServices {
found := false
for _, item := range items {
if item.Name != apiService.Name {
continue
}
if apiregistration.IsAPIServiceConditionTrue(item, apiregistration.Available) {
found = true
break
}
}

if !found {
missing = append(missing, *apiService)
}
}

if len(missing) > 0 {
return fmt.Errorf("missing APIService: %v", missing)
}
return nil
}))
aggregatorServer.GenericAPIServer.AddHealthzChecks(
makeAPIServiceAvailableHealthzCheck(
"autoregister-completion",
apiServices,
aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices(),
),
)

return aggregatorServer, nil
}
Expand All @@ -158,6 +145,45 @@ func makeAPIService(gv schema.GroupVersion) *apiregistration.APIService {
}
}

// makeAPIServiceAvailableHealthzCheck returns a healthz check that returns healthy
// once all of the specified services have been observed to be available at least once.
func makeAPIServiceAvailableHealthzCheck(name string, apiServices []*apiregistration.APIService, apiServiceInformer informers.APIServiceInformer) healthz.HealthzChecker {
// Track the auto-registered API services that have not been observed to be available yet
pendingServiceNamesLock := &sync.RWMutex{}
pendingServiceNames := sets.NewString()
for _, service := range apiServices {
pendingServiceNames.Insert(service.Name)
}

// When an APIService in the list is seen as available, remove it from the pending list
handleAPIServiceChange := func(service *apiregistration.APIService) {
pendingServiceNamesLock.Lock()
defer pendingServiceNamesLock.Unlock()
if !pendingServiceNames.Has(service.Name) {
return
}
if apiregistration.IsAPIServiceConditionTrue(service, apiregistration.Available) {
pendingServiceNames.Delete(service.Name)
}
}

// Watch add/update events for APIServices
apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*apiregistration.APIService)) },
UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*apiregistration.APIService)) },
})

// Don't return healthy until the pending list is empty
return healthz.NamedCheck(name, func(r *http.Request) error {
pendingServiceNamesLock.RLock()
defer pendingServiceNamesLock.RUnlock()
if pendingServiceNames.Len() > 0 {
return fmt.Errorf("missing APIService: %v", pendingServiceNames.List())
}
return nil
})
}

type priority struct {
group int32
version int32
Expand Down Expand Up @@ -203,7 +229,7 @@ func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget,
for _, curr := range delegateAPIServer.ListedPaths() {
if curr == "/api/v1" {
apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
registration.AddAPIServiceToSync(apiService)
registration.AddAPIServiceToSyncOnStart(apiService)
apiServices = append(apiServices, apiService)
continue
}
Expand All @@ -221,7 +247,7 @@ func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget,
if apiService == nil {
continue
}
registration.AddAPIServiceToSync(apiService)
registration.AddAPIServiceToSyncOnStart(apiService)
apiServices = append(apiServices, apiService)
}

Expand Down
Expand Up @@ -53,6 +53,8 @@ type crdRegistrationController struct {

syncHandler func(groupVersion schema.GroupVersion) error

syncedInitialSet chan struct{}

// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors
// this is actually keyed by a groupVersion
queue workqueue.RateLimitingInterface
Expand All @@ -67,7 +69,8 @@ func NewAutoRegistrationController(crdinformer crdinformers.CustomResourceDefini
crdLister: crdinformer.Lister(),
crdSynced: crdinformer.Informer().HasSynced,
apiServiceRegistration: apiServiceRegistration,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd-autoregister"),
syncedInitialSet: make(chan struct{}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd-autoregister"),
}
c.syncHandler = c.handleVersionUpdate

Expand Down Expand Up @@ -114,6 +117,18 @@ func (c *crdRegistrationController) Run(threadiness int, stopCh <-chan struct{})
return
}

// process each item in the list once
if crds, err := c.crdLister.List(labels.Everything()); err != nil {
utilruntime.HandleError(err)
} else {
for _, crd := range crds {
if err := c.syncHandler(schema.GroupVersion{Group: crd.Spec.Group, Version: crd.Spec.Version}); err != nil {
utilruntime.HandleError(err)
}
}
}
close(c.syncedInitialSet)

// start up your worker threads based on threadiness. Some controllers have multiple kinds of workers
for i := 0; i < threadiness; i++ {
// runWorker will loop until "something bad" happens. The .Until will then rekick the worker
Expand All @@ -125,6 +140,11 @@ func (c *crdRegistrationController) Run(threadiness int, stopCh <-chan struct{})
<-stopCh
}

// WaitForInitialSync blocks until the initial set of CRD resources has been processed
func (c *crdRegistrationController) WaitForInitialSync() {
<-c.syncedInitialSet
}

func (c *crdRegistrationController) runWorker() {
// hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work
// available, so we don't worry about secondary waits
Expand Down
Expand Up @@ -20,6 +20,7 @@ import (
"sort"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

Expand Down Expand Up @@ -85,6 +86,17 @@ func APIServiceNameToGroupVersion(apiServiceName string) schema.GroupVersion {
return schema.GroupVersion{Group: tokens[1], Version: tokens[0]}
}

// NewLocalAvailableAPIServiceCondition returns a condition for an available local APIService.
func NewLocalAvailableAPIServiceCondition() APIServiceCondition {
return APIServiceCondition{
Type: Available,
Status: ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: "Local",
Message: "Local APIServices are always available",
}
}

// SetAPIServiceCondition sets the status condition. It either overwrites the existing one or
// creates a new one
func SetAPIServiceCondition(apiService *APIService, newCondition APIServiceCondition) {
Expand Down
Expand Up @@ -27,6 +27,7 @@ go_library(
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
Expand Down