From 8ff7632b11eb6b489607acd654a01058ec355fc1 Mon Sep 17 00:00:00 2001 From: Eno Compton Date: Thu, 17 May 2018 14:45:16 -0600 Subject: [PATCH] Simplify cmd/controller/main.go - Extract ctlConfig type to store configuration - Move viper and pflag calls into separate function - Ensure all controllers have same method signature for `Run` - Use runner interface to start all controllers - Create healthServer to match controllers - Use Run signature to match Kubernetes precedent --- cmd/controller/main.go | 207 ++++++++++++++++------------- pkg/fleetallocation/controller.go | 2 +- pkg/fleets/controller.go | 4 +- pkg/gameservers/controller.go | 4 +- pkg/gameserversets/controller.go | 4 +- pkg/util/webhooks/webhooks.go | 2 +- pkg/util/webhooks/webhooks_test.go | 2 +- 7 files changed, 126 insertions(+), 99 deletions(-) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 904367cf45..11b16c55a0 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -19,6 +19,7 @@ import ( "net/http" "os" "path/filepath" + "reflect" "strings" "time" @@ -43,13 +44,14 @@ import ( ) const ( - sidecarFlag = "sidecar" - pullSidecarFlag = "always-pull-sidecar" - minPortFlag = "min-port" - maxPortFlag = "max-port" - certFileFlag = "cert-file" - keyFileFlag = "key-file" - controllerThreadiness = 2 + sidecarFlag = "sidecar" + pullSidecarFlag = "always-pull-sidecar" + minPortFlag = "min-port" + maxPortFlag = "max-port" + certFileFlag = "cert-file" + keyFileFlag = "key-file" + workers = 2 + defaultResync = 30 * time.Second ) var ( @@ -57,7 +59,67 @@ var ( ) // main starts the operator for the gameserver CRD -func main() { // nolint: gocyclo +func main() { + ctlConf := parseEnvFlags() + if err := ctlConf.validate(); err != nil { + logger.WithError(err).Fatal("Could not create controller from environment or flags") + } + + clientConf, err := rest.InClusterConfig() + if err != nil { + logger.WithError(err).Fatal("Could not create in cluster config") + } + + kubeClient, err := kubernetes.NewForConfig(clientConf) + if err != nil { + logger.WithError(err).Fatal("Could not create the kubernetes clientset") + } + + extClient, err := extclientset.NewForConfig(clientConf) + if err != nil { + logger.WithError(err).Fatal("Could not create the api extension clientset") + } + + agonesClient, err := versioned.NewForConfig(clientConf) + if err != nil { + logger.WithError(err).Fatal("Could not create the agones api clientset") + } + + health := healthcheck.NewHandler() + wh := webhooks.NewWebHook(ctlConf.certFile, ctlConf.keyFile) + agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync) + kubeInformationFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync) + + gsController := gameservers.NewController(wh, health, + ctlConf.minPort, ctlConf.maxPort, ctlConf.sidecarImage, ctlConf.alwaysPullSidecar, + kubeClient, kubeInformationFactory, extClient, agonesClient, agonesInformerFactory) + gsSetController := gameserversets.NewController(wh, health, + kubeClient, extClient, agonesClient, agonesInformerFactory) + fleetController := fleets.NewController(wh, health, + kubeClient, extClient, agonesClient, agonesInformerFactory) + faController := fleetallocation.NewController(wh, kubeClient, extClient, agonesClient, agonesInformerFactory) + + stop := signals.NewStopChannel() + + kubeInformationFactory.Start(stop) + agonesInformerFactory.Start(stop) + + rs := []runner{ + wh, gsController, gsSetController, fleetController, faController, healthServer{handler: health}, + } + for _, r := range rs { + go func(rr runner) { + if runErr := rr.Run(workers, stop); runErr != nil { + logger.WithError(runErr).Fatalf("could not start runner: %s", reflect.TypeOf(rr)) + } + }(r) + } + + <-stop + logger.Info("Shut down agones controllers") +} + +func parseEnvFlags() config { exec, err := os.Executable() if err != nil { logger.WithError(err).Fatal("Could not get executable path") @@ -101,95 +163,60 @@ func main() { // nolint: gocyclo WithField("alwaysPullSidecarImage", alwaysPullSidecar). WithField("Version", pkg.Version).Info("starting gameServer operator...") - if minPort <= 0 || maxPort <= 0 { - logger.Fatal("Min Port and Max Port values are required.") - } else if maxPort < minPort { - logger.Fatal("Max Port cannot be set less that the Min Port") - } - - config, err := rest.InClusterConfig() - if err != nil { - logger.WithError(err).Fatal("Could not create in cluster config") + return config{ + minPort: minPort, + maxPort: maxPort, + sidecarImage: sidecarImage, + alwaysPullSidecar: alwaysPullSidecar, + keyFile: keyFile, + certFile: certFile, } +} - kubeClient, err := kubernetes.NewForConfig(config) - if err != nil { - logger.WithError(err).Fatal("Could not create the kubernetes clientset") - } +// config stores all required configuration to create a game server controller. +type config struct { + minPort int32 + maxPort int32 + sidecarImage string + alwaysPullSidecar bool + keyFile string + certFile string +} - extClient, err := extclientset.NewForConfig(config) - if err != nil { - logger.WithError(err).Fatal("Could not create the api extension clientset") +// validate ensures the ctlConfig data is valid. +func (c config) validate() error { + if c.minPort <= 0 || c.maxPort <= 0 { + return errors.New("min Port and Max Port values are required") } - - agonesClient, err := versioned.NewForConfig(config) - if err != nil { - logger.WithError(err).Fatal("Could not create the agones api clientset") + if c.maxPort < c.minPort { + return errors.New("max Port cannot be set less that the Min Port") } + return nil +} - health := healthcheck.NewHandler() - wh := webhooks.NewWebHook(certFile, keyFile) - agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, 30*time.Second) - kubeInformationFactory := informers.NewSharedInformerFactory(kubeClient, 30*time.Second) - - gsController := gameservers.NewController(wh, health, minPort, maxPort, sidecarImage, alwaysPullSidecar, kubeClient, kubeInformationFactory, extClient, agonesClient, agonesInformerFactory) - gsSetController := gameserversets.NewController(wh, health, kubeClient, extClient, agonesClient, agonesInformerFactory) - fleetController := fleets.NewController(wh, health, kubeClient, extClient, agonesClient, agonesInformerFactory) - faController := fleetallocation.NewController(wh, kubeClient, extClient, agonesClient, agonesInformerFactory) - - stop := signals.NewStopChannel() - - kubeInformationFactory.Start(stop) - agonesInformerFactory.Start(stop) +type runner interface { + Run(workers int, stop <-chan struct{}) error +} - go func() { - if err := wh.Run(stop); err != nil { // nolint: vetshadow - logger.WithError(err).Fatal("could not run webhook server") - } - }() - go func() { - err = gsController.Run(controllerThreadiness, stop) - if err != nil { - logger.WithError(err).Fatal("Could not run gameserver controller") - } - }() - go func() { - err = gsSetController.Run(controllerThreadiness, stop) - if err != nil { - logger.WithError(err).Fatal("Could not run gameserverset controller") - } - }() - go func() { - err = fleetController.Run(controllerThreadiness, stop) - if err != nil { - logger.WithError(err).Fatal("Could not run fleet controller") - } - }() - go func() { - err = faController.Run(stop) - if err != nil { - logger.WithError(err).Fatal("Could not run fleet controller") - } - }() +type healthServer struct { + handler http.Handler +} - go func() { - logger.Info("Starting health check...") - srv := &http.Server{ - Addr: ":8080", - Handler: health, - } - defer srv.Close() // nolint: errcheck - - if err := srv.ListenAndServe(); err != nil { - if err == http.ErrServerClosed { - logger.WithError(err).Info("health check: http server closed") - } else { - err := errors.Wrap(err, "Could not listen on :8080") - runtime.HandleError(logger.WithError(err), err) - } +func (h healthServer) Run(workers int, stop <-chan struct{}) error { + logger.Info("Starting health check...") + srv := &http.Server{ + Addr: ":8080", + Handler: h.handler, + } + defer srv.Close() // nolint: errcheck + + if err := srv.ListenAndServe(); err != nil { + if err == http.ErrServerClosed { + logger.WithError(err).Info("health check: http server closed") + } else { + wrappedErr := errors.Wrap(err, "Could not listen on :8080") + runtime.HandleError(logger.WithError(wrappedErr), wrappedErr) } - }() - - <-stop - logger.Info("Shut down agones controllers") + } + return nil } diff --git a/pkg/fleetallocation/controller.go b/pkg/fleetallocation/controller.go index cc7736d6a3..db9740a40f 100644 --- a/pkg/fleetallocation/controller.go +++ b/pkg/fleetallocation/controller.go @@ -104,7 +104,7 @@ func NewController( // Run runs this controller. This controller doesn't (currently) // have a worker/queue, and as such, does not block. -func (c *Controller) Run(stop <-chan struct{}) error { +func (c *Controller) Run(workers int, stop <-chan struct{}) error { err := crd.WaitForEstablishedCRD(c.crdGetter, "fleetallocations."+stable.GroupName, c.logger) if err != nil { return err diff --git a/pkg/fleets/controller.go b/pkg/fleets/controller.go index df6d85ec7c..c2133a8c74 100644 --- a/pkg/fleets/controller.go +++ b/pkg/fleets/controller.go @@ -160,7 +160,7 @@ func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview) // Run the Fleet controller. Will block until stop is closed. // Runs threadiness number workers to process the rate limited queue -func (c *Controller) Run(threadiness int, stop <-chan struct{}) error { +func (c *Controller) Run(workers int, stop <-chan struct{}) error { err := crd.WaitForEstablishedCRD(c.crdGetter, "fleets.stable.agones.dev", c.logger) if err != nil { return err @@ -171,7 +171,7 @@ func (c *Controller) Run(threadiness int, stop <-chan struct{}) error { return errors.New("failed to wait for caches to sync") } - c.workerqueue.Run(threadiness, stop) + c.workerqueue.Run(workers, stop) return nil } diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index 60e208560b..5f07a4087c 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -220,7 +220,7 @@ func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview // Run the GameServer controller. Will block until stop is closed. // Runs threadiness number workers to process the rate limited queue -func (c *Controller) Run(threadiness int, stop <-chan struct{}) error { +func (c *Controller) Run(workers int, stop <-chan struct{}) error { err := crd.WaitForEstablishedCRD(c.crdGetter, "gameservers.stable.agones.dev", c.logger) if err != nil { return err @@ -239,7 +239,7 @@ func (c *Controller) Run(threadiness int, stop <-chan struct{}) error { // Run the Health Controller go c.healthController.Run(stop) - c.workerqueue.Run(threadiness, stop) + c.workerqueue.Run(workers, stop) return nil } diff --git a/pkg/gameserversets/controller.go b/pkg/gameserversets/controller.go index bf0aa5a09d..574d42f463 100644 --- a/pkg/gameserversets/controller.go +++ b/pkg/gameserversets/controller.go @@ -126,7 +126,7 @@ func NewController( // Run the GameServerSet controller. Will block until stop is closed. // Runs threadiness number workers to process the rate limited queue -func (c *Controller) Run(threadiness int, stop <-chan struct{}) error { +func (c *Controller) Run(workers int, stop <-chan struct{}) error { err := crd.WaitForEstablishedCRD(c.crdGetter, "gameserversets."+stable.GroupName, c.logger) if err != nil { return err @@ -137,7 +137,7 @@ func (c *Controller) Run(threadiness int, stop <-chan struct{}) error { return errors.New("failed to wait for caches to sync") } - c.workerqueue.Run(threadiness, stop) + c.workerqueue.Run(workers, stop) return nil } diff --git a/pkg/util/webhooks/webhooks.go b/pkg/util/webhooks/webhooks.go index 53d13f9fa9..fea21bc0ec 100644 --- a/pkg/util/webhooks/webhooks.go +++ b/pkg/util/webhooks/webhooks.go @@ -75,7 +75,7 @@ func NewWebHook(certFile, keyFile string) *WebHook { // Run runs the webhook server, starting a https listener. // Will block on stop channel -func (wh *WebHook) Run(stop <-chan struct{}) error { +func (wh *WebHook) Run(workers int, stop <-chan struct{}) error { go func() { <-stop wh.server.Close() // nolint: errcheck diff --git a/pkg/util/webhooks/webhooks_test.go b/pkg/util/webhooks/webhooks_test.go index 8d722f520c..dda5bdec42 100644 --- a/pkg/util/webhooks/webhooks_test.go +++ b/pkg/util/webhooks/webhooks_test.go @@ -116,7 +116,7 @@ func TestWebHookAddHandler(t *testing.T) { }) } - err := wh.Run(stop) + err := wh.Run(0, stop) assert.Nil(t, err) client := ts.server.Client()