Skip to content

Commit

Permalink
create apiservice controller
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k committed Dec 17, 2019
1 parent 3d23ea6 commit 3cccf75
Show file tree
Hide file tree
Showing 9 changed files with 632 additions and 289 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
github.com/coreos/etcd v3.3.15+incompatible
github.com/davecgh/go-spew v1.1.1
github.com/getsentry/raven-go v0.2.1-0.20190513200303-c977f96e1095 // indirect
github.com/ghodss/yaml v1.0.0
github.com/gonum/graph v0.0.0-20170401004347-50b27dea7ebb
Expand Down
78 changes: 78 additions & 0 deletions pkg/operator/apiservicecontroller/apigroup.go
@@ -0,0 +1,78 @@
package apiservicecontroller

import (
"fmt"
"net/http"

"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/rest"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
)

func NewEndpointPrecondition(kubeInformers kubeinformers.SharedInformerFactory, apiServices []*apiregistrationv1.APIService) wait.ConditionFunc {
// this is outside the func so it always registers before the informers start
endpointsLister := kubeInformers.Core().V1().Endpoints().Lister()

type coordinate struct {
namespace string
name string
}
coordinates := []coordinate{}
for _, apiService := range apiServices {
curr := coordinate{namespace: apiService.Spec.Service.Namespace, name: apiService.Spec.Service.Name}
exists := false
for _, j := range coordinates {
if j == curr {
exists = true
break
}
}
if !exists {
coordinates = append(coordinates)
}
}

return func() (bool, error) {
for _, curr := range coordinates {
endpoints, err := endpointsLister.Endpoints(curr.namespace).Get(curr.name)
if err != nil {
return false, err
}
if len(endpoints.Subsets) == 0 {
return false, nil
}

exists := false
for _, subset := range endpoints.Subsets {
if len(subset.Addresses) > 0 {
exists = true
break
}
}
if !exists {
return false, nil
}
}

return true, nil
}
}

func checkDiscoveryForByAPIServices(recorder events.Recorder, restclient rest.Interface, apiServices []*apiregistrationv1.APIService) []string {
missingMessages := []string{}
for _, apiService := range apiServices {
url := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version

statusCode := 0
result := restclient.Get().AbsPath(url).Do().StatusCode(&statusCode)
if statusCode != http.StatusOK {
groupVersionString := fmt.Sprintf("%s.%s", apiService.Spec.Group, apiService.Spec.Version)
recorder.Warningf("OpenShiftAPICheckFailed", fmt.Sprintf("%q failed with HTTP status code %d (%v)", groupVersionString, statusCode, result.Error()))
missingMessages = append(missingMessages, fmt.Sprintf("%q is not ready: %d (%v)", groupVersionString, statusCode, result.Error()))
}
}

return missingMessages
}
232 changes: 232 additions & 0 deletions pkg/operator/apiservicecontroller/apiservice_controller.go
@@ -0,0 +1,232 @@
package apiservicecontroller

import (
"fmt"
"sort"
"strings"
"time"

operatorsv1 "github.com/openshift/api/operator/v1"
operatorv1 "github.com/openshift/api/operator/v1"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
"github.com/openshift/library-go/pkg/operator/status"
"github.com/openshift/library-go/pkg/operator/v1helpers"
"k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationv1client "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
apiregistrationinformers "k8s.io/kube-aggregator/pkg/client/informers/externalversions"
)

const (
workQueueKey = "key"
)

type APIServiceController struct {
name string
apiServices []*apiregistrationv1.APIService
// precondition must return true before the apiservices will be created
precondition wait.ConditionFunc

versionRecorder status.VersionGetter
operatorClient v1helpers.OperatorClient
kubeClient kubernetes.Interface
apiregistrationv1Client apiregistrationv1client.ApiregistrationV1Interface
eventRecorder events.Recorder

// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface
}

func NewAPIServiceController(
name string,
apiServices []*apiregistrationv1.APIService,
operatorClient v1helpers.OperatorClient,
apiregistrationInformers apiregistrationinformers.SharedInformerFactory,
apiregistrationv1Client apiregistrationv1client.ApiregistrationV1Interface,
kubeInformersForOperandNamespace kubeinformers.SharedInformerFactory,
kubeClient kubernetes.Interface,
eventRecorder events.Recorder,
) *APIServiceController {
fullname := "APIServiceController_" + name
c := &APIServiceController{
name: fullname,
apiServices: apiServices,
precondition: NewEndpointPrecondition(kubeInformersForOperandNamespace, apiServices),

operatorClient: operatorClient,
apiregistrationv1Client: apiregistrationv1Client,
kubeClient: kubeClient,
eventRecorder: eventRecorder.WithComponentSuffix("apiservice-" + name + "-controller"),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fullname),
}

kubeInformersForOperandNamespace.Core().V1().Services().Informer().AddEventHandler(c.eventHandler())
kubeInformersForOperandNamespace.Core().V1().Endpoints().Informer().AddEventHandler(c.eventHandler())
apiregistrationInformers.Apiregistration().V1().APIServices().Informer().AddEventHandler(c.eventHandler())

return c
}

func (c *APIServiceController) sync() error {
operatorConfigSpec, _, _, err := c.operatorClient.GetOperatorState()
if err != nil {
return err
}

switch operatorConfigSpec.ManagementState {
case operatorsv1.Managed:
case operatorsv1.Unmanaged:
return nil
case operatorsv1.Removed:
errs := []error{}
for _, apiService := range c.apiServices {
if err := c.apiregistrationv1Client.APIServices().Delete(apiService.Name, nil); err != nil {
errs = append(errs, err)
}
}
return errors.NewAggregate(errs)
default:
c.eventRecorder.Warningf("ManagementStateUnknown", "Unrecognized operator management state %q", operatorConfigSpec.ManagementState)
return nil
}

ready, err := c.precondition()
if err != nil {
v1helpers.UpdateStatus(c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: "APIServicesAvailable",
Status: operatorv1.ConditionFalse,
Reason: "ErrorCheckingPrecondition",
Message: err.Error(),
}))
return err
}
if !ready {
v1helpers.UpdateStatus(c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: "APIServicesAvailable",
Status: operatorv1.ConditionFalse,
Reason: "PreconditionNotReady",
Message: "PreconditionNotReady",
}))
return err
}

err = c.syncAPIServices()

// update failing condition
cond := operatorv1.OperatorCondition{
Type: "APIServicesAvailable",
Status: operatorv1.ConditionTrue,
}
if err != nil {
cond.Status = operatorv1.ConditionFalse
cond.Reason = "Error"
cond.Message = err.Error()
}
if _, _, updateError := v1helpers.UpdateStatus(c.operatorClient, v1helpers.UpdateConditionFn(cond)); updateError != nil {
if err == nil {
return updateError
}
}

return err
}

func (c *APIServiceController) syncAPIServices() error {
errs := []error{}
var availableConditionMessages []string

for _, apiService := range c.apiServices {
apiregistrationv1.SetDefaults_ServiceReference(apiService.Spec.Service)
apiService, _, err := resourceapply.ApplyAPIService(c.apiregistrationv1Client, c.eventRecorder, apiService)
if err != nil {
errs = append(errs, err)
continue
}

for _, condition := range apiService.Status.Conditions {
if condition.Type == apiregistrationv1.Available {
if condition.Status == apiregistrationv1.ConditionFalse {
availableConditionMessages = append(availableConditionMessages, fmt.Sprintf("apiservices.apiregistration.k8s.io/%v: not available: %v", apiService.Name, condition.Message))
}
break
}
}
}
if len(errs) > 0 {
return errors.NewAggregate(errs)
}
if len(availableConditionMessages) > 0 {
sort.Sort(sort.StringSlice(availableConditionMessages))
return fmt.Errorf(strings.Join(availableConditionMessages, "\n"))
}

// if the apiservices themselves check out ok, try to actually hit the discovery endpoints. We have a history in clusterup
// of something delaying them. This isn't perfect because of round-robining, but let's see if we get an improvement
if len(availableConditionMessages) == 0 && c.kubeClient.Discovery().RESTClient() != nil {
missingAPIMessages := checkDiscoveryForByAPIServices(c.eventRecorder, c.kubeClient.Discovery().RESTClient(), c.apiServices)
availableConditionMessages = append(availableConditionMessages, missingAPIMessages...)
}

if len(availableConditionMessages) > 0 {
sort.Sort(sort.StringSlice(availableConditionMessages))
return fmt.Errorf(strings.Join(availableConditionMessages, "\n"))
}

return nil
}

// Run starts the openshift-apiserver and blocks until stopCh is closed.
func (c *APIServiceController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting %v", c.name)
defer klog.Infof("Shutting down %v", c.name)

// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)

<-stopCh
}

func (c *APIServiceController) runWorker() {
for c.processNextWorkItem() {
}
}

func (c *APIServiceController) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)

err := c.sync()
if err == nil {
c.queue.Forget(dsKey)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
c.queue.AddRateLimited(dsKey)

return true
}

// eventHandler queues the operator to check spec and status
func (c *APIServiceController) eventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(workQueueKey) },
DeleteFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
}
}

0 comments on commit 3cccf75

Please sign in to comment.