Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use apiservice.status to break apart controller and handling concerns #45432

Merged
merged 2 commits into from
May 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,37 @@ func APIServiceNameToGroupVersion(apiServiceName string) schema.GroupVersion {
tokens := strings.SplitN(apiServiceName, ".", 2)
return schema.GroupVersion{Group: tokens[1], Version: tokens[0]}
}

// SetAPIServiceCondition sets the status condition. It either overwrites the existing one or
// creates a new one
func SetAPIServiceCondition(apiService *APIService, newCondition APIServiceCondition) {
var existingCondition *APIServiceCondition
for i := range apiService.Status.Conditions {
if apiService.Status.Conditions[i].Type == newCondition.Type {
existingCondition = &apiService.Status.Conditions[i]
break
}
}
if existingCondition == nil {
apiService.Status.Conditions = append(apiService.Status.Conditions, newCondition)
return
}

if existingCondition.Status != newCondition.Status {
existingCondition.Status = newCondition.Status
existingCondition.LastTransitionTime = newCondition.LastTransitionTime
}

existingCondition.Reason = newCondition.Reason
existingCondition.Message = newCondition.Message
}

// IsAPIServiceConditionTrue indicates if the condition is present and strictly true
func IsAPIServiceConditionTrue(apiService *APIService, conditionType APIServiceConditionType) bool {
for _, condition := range apiService.Status.Conditions {
if condition.Type == conditionType && condition.Status == ConditionTrue {
return true
}
}
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
utilvalidation "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"

discoveryapi "k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
)

func ValidateAPIService(apiService *discoveryapi.APIService) field.ErrorList {
func ValidateAPIService(apiService *apiregistration.APIService) field.ErrorList {
requiredName := apiService.Spec.Version + "." + apiService.Spec.Group

allErrs := validation.ValidateObjectMeta(&apiService.ObjectMeta, false,
Expand Down Expand Up @@ -86,9 +86,30 @@ func ValidateAPIService(apiService *discoveryapi.APIService) field.ErrorList {
return allErrs
}

func ValidateAPIServiceUpdate(newAPIService *discoveryapi.APIService, oldAPIService *discoveryapi.APIService) field.ErrorList {
func ValidateAPIServiceUpdate(newAPIService *apiregistration.APIService, oldAPIService *apiregistration.APIService) field.ErrorList {
allErrs := validation.ValidateObjectMetaUpdate(&newAPIService.ObjectMeta, &oldAPIService.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateAPIService(newAPIService)...)

return allErrs
}

func ValidateAPIServiceStatus(status *apiregistration.APIServiceStatus, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}

for i, condition := range status.Conditions {
if condition.Status != apiregistration.ConditionTrue &&
condition.Status != apiregistration.ConditionFalse &&
condition.Status != apiregistration.ConditionUnknown {
allErrs = append(allErrs, field.NotSupported(fldPath.Child("conditions").Index(i).Child("status"), condition.Status, []string{
string(apiregistration.ConditionTrue), string(apiregistration.ConditionFalse), string(apiregistration.ConditionUnknown)}))
}
}

return allErrs
}

func ValidateAPIServiceStatusUpdate(newAPIService *apiregistration.APIService, oldAPIService *apiregistration.APIService) field.ErrorList {
allErrs := validation.ValidateObjectMetaUpdate(&newAPIService.ObjectMeta, &oldAPIService.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateAPIServiceStatus(&newAPIService.Status, field.NewPath("status"))...)
return allErrs
}
1 change: 1 addition & 0 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ go_library(
"//vendor/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/controllers:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/controllers/status:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/registry/apiservice/etcd:go_default_library",
],
)
61 changes: 33 additions & 28 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/pkg/version"

"k8s.io/kube-aggregator/pkg/apis/apiregistration"
Expand All @@ -41,6 +40,7 @@ import (
"k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset"
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status"
apiservicestorage "k8s.io/kube-aggregator/pkg/registry/apiservice/etcd"
)

Expand Down Expand Up @@ -102,11 +102,6 @@ type APIAggregator struct {
// controller state
lister listers.APIServiceLister

// serviceLister is used by the aggregator handler to determine whether or not to try to expose the group
serviceLister v1listers.ServiceLister
// endpointsLister is used by the aggregator handler to determine whether or not to try to expose the group
endpointsLister v1listers.EndpointsLister

// provided for easier embedding
APIRegistrationInformers informers.SharedInformerFactory
}
Expand Down Expand Up @@ -140,46 +135,54 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil, err
}

apiregistrationClient, err := internalclientset.NewForConfig(c.Config.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(
internalclientset.NewForConfigOrDie(c.Config.GenericConfig.LoopbackClientConfig),
apiregistrationClient,
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
)
kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute)

s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
contextMapper: c.GenericConfig.RequestContextMapper,
proxyClientCert: c.ProxyClientCert,
proxyClientKey: c.ProxyClientKey,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
serviceLister: kubeInformers.Core().V1().Services().Lister(),
endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(),
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
contextMapper: c.GenericConfig.RequestContextMapper,
proxyClientCert: c.ProxyClientCert,
proxyClientKey: c.ProxyClientKey,
proxyHandlers: map[string]*proxyHandler{},
handledGroups: sets.String{},
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
APIRegistrationInformers: informerFactory,
}

apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, registry, Scheme, metav1.ParameterCodec, Codecs)
apiGroupInfo.GroupMeta.GroupVersion = v1alpha1.SchemeGroupVersion
v1alpha1storage := map[string]rest.Storage{}
v1alpha1storage["apiservices"] = apiservicestorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
apiServiceREST := apiservicestorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
v1alpha1storage["apiservices"] = apiServiceREST
v1alpha1storage["apiservices/status"] = apiservicestorage.NewStatusREST(Scheme, apiServiceREST)
apiGroupInfo.VersionedResourcesStorageMap["v1alpha1"] = v1alpha1storage

if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}

apisHandler := &apisHandler{
codecs: Codecs,
lister: s.lister,
serviceLister: s.serviceLister,
endpointsLister: s.endpointsLister,
codecs: Codecs,
lister: s.lister,
}
s.GenericAPIServer.Handler.PostGoRestfulMux.Handle("/apis", apisHandler)
s.GenericAPIServer.Handler.PostGoRestfulMux.UnlistedHandle("/apis/", apisHandler)

apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), kubeInformers.Core().V1().Services(), s)
availableController := statuscontrollers.NewAvailableConditionController(
informerFactory.Apiregistration().InternalVersion().APIServices(),
kubeInformers.Core().V1().Services(),
kubeInformers.Core().V1().Endpoints(),
apiregistrationClient.Apiregistration(),
)

s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(stopCh)
Expand All @@ -190,6 +193,10 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
go apiserviceRegistrationController.Run(stopCh)
return nil
})
s.GenericAPIServer.AddPostStartHook("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
go availableController.Run(stopCh)
return nil
})

return s, nil
}
Expand Down Expand Up @@ -235,12 +242,10 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, de
// it's time to register the group aggregation endpoint
groupPath := "/apis/" + apiService.Spec.Group
groupDiscoveryHandler := &apiGroupHandler{
codecs: Codecs,
groupName: apiService.Spec.Group,
lister: s.lister,
serviceLister: s.serviceLister,
endpointsLister: s.endpointsLister,
delegate: s.delegateHandler,
codecs: Codecs,
groupName: apiService.Spec.Group,
lister: s.lister,
delegate: s.delegateHandler,
}
// aggregation is protected
s.GenericAPIServer.Handler.PostGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ func (c *APIServiceRegistrationController) sync(key string) error {
return err
}

// remove registration handling for APIServices which are not available
if !apiregistration.IsAPIServiceConditionTrue(apiService, apiregistration.Available) {
c.apiHandlerManager.RemoveAPIService(key)
return nil
}

// TODO move the destination host to status so that you can see where its going
c.apiHandlerManager.AddAPIService(apiService, c.getDestinationHost(apiService))
return nil
}
Expand Down
36 changes: 5 additions & 31 deletions staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
v1listers "k8s.io/client-go/listers/core/v1"

apiregistrationapi "k8s.io/kube-aggregator/pkg/apis/apiregistration"
apiregistrationv1alpha1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1alpha1"
Expand All @@ -37,9 +36,6 @@ import (
type apisHandler struct {
codecs serializer.CodecFactory
lister listers.APIServiceLister

serviceLister v1listers.ServiceLister
endpointsLister v1listers.EndpointsLister
}

var discoveryGroup = metav1.APIGroup{
Expand Down Expand Up @@ -74,7 +70,7 @@ func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if len(apiGroupServers[0].Spec.Group) == 0 {
continue
}
discoveryGroup := convertToDiscoveryAPIGroup(apiGroupServers, r.serviceLister, r.endpointsLister)
discoveryGroup := convertToDiscoveryAPIGroup(apiGroupServers)
if discoveryGroup != nil {
discoveryGroupList.Groups = append(discoveryGroupList.Groups, *discoveryGroup)
}
Expand All @@ -85,33 +81,14 @@ func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {

// convertToDiscoveryAPIGroup takes apiservices in a single group and returns a discovery compatible object.
// if none of the services are available, it will return nil.
func convertToDiscoveryAPIGroup(apiServices []*apiregistrationapi.APIService, serviceLister v1listers.ServiceLister, endpointsLister v1listers.EndpointsLister) *metav1.APIGroup {
func convertToDiscoveryAPIGroup(apiServices []*apiregistrationapi.APIService) *metav1.APIGroup {
apiServicesByGroup := apiregistrationapi.SortedByGroup(apiServices)[0]

var discoveryGroup *metav1.APIGroup

for _, apiService := range apiServicesByGroup {
if apiService.Spec.Service != nil {
// skip any API services without actual services
if _, err := serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name); err != nil {
continue
}

hasActiveEndpoints := false
endpoints, err := endpointsLister.Endpoints(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
// skip any API services without endpoints
if err != nil {
continue
}
for _, subset := range endpoints.Subsets {
if len(subset.Addresses) > 0 {
hasActiveEndpoints = true
break
}
}
if !hasActiveEndpoints {
continue
}
if !apiregistrationapi.IsAPIServiceConditionTrue(apiService, apiregistrationapi.Available) {
continue
}

// the first APIService which is valid becomes the default
Expand Down Expand Up @@ -143,9 +120,6 @@ type apiGroupHandler struct {

lister listers.APIServiceLister

serviceLister v1listers.ServiceLister
endpointsLister v1listers.EndpointsLister

delegate http.Handler
}

Expand All @@ -172,7 +146,7 @@ func (r *apiGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

discoveryGroup := convertToDiscoveryAPIGroup(apiServicesForGroup, r.serviceLister, r.endpointsLister)
discoveryGroup := convertToDiscoveryAPIGroup(apiServicesForGroup)
if discoveryGroup == nil {
http.Error(w, "", http.StatusNotFound)
return
Expand Down