Skip to content

Commit

Permalink
Simplify cmd/controller/main.go
Browse files Browse the repository at this point in the history
- Extract ctlConfig type to store configuration
- Move viper and pflag calls into separate function
- Ensure all controllers have same method signature for `Run`
- Use runner interface to start all controllers
- Create healthServer to match controllers
- Use Run signature to match Kubernetes precedent
  • Loading branch information
enocom authored and markmandel committed May 18, 2018
1 parent b8d38ca commit 8ff7632
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 99 deletions.
207 changes: 117 additions & 90 deletions cmd/controller/main.go
Expand Up @@ -19,6 +19,7 @@ import (
"net/http"
"os"
"path/filepath"
"reflect"
"strings"
"time"

Expand All @@ -43,21 +44,82 @@ import (
)

const (
sidecarFlag = "sidecar"
pullSidecarFlag = "always-pull-sidecar"
minPortFlag = "min-port"
maxPortFlag = "max-port"
certFileFlag = "cert-file"
keyFileFlag = "key-file"
controllerThreadiness = 2
sidecarFlag = "sidecar"
pullSidecarFlag = "always-pull-sidecar"
minPortFlag = "min-port"
maxPortFlag = "max-port"
certFileFlag = "cert-file"
keyFileFlag = "key-file"
workers = 2
defaultResync = 30 * time.Second
)

var (
logger = runtime.NewLoggerWithSource("main")
)

// main starts the operator for the gameserver CRD
func main() { // nolint: gocyclo
func main() {
ctlConf := parseEnvFlags()
if err := ctlConf.validate(); err != nil {
logger.WithError(err).Fatal("Could not create controller from environment or flags")
}

clientConf, err := rest.InClusterConfig()
if err != nil {
logger.WithError(err).Fatal("Could not create in cluster config")
}

kubeClient, err := kubernetes.NewForConfig(clientConf)
if err != nil {
logger.WithError(err).Fatal("Could not create the kubernetes clientset")
}

extClient, err := extclientset.NewForConfig(clientConf)
if err != nil {
logger.WithError(err).Fatal("Could not create the api extension clientset")
}

agonesClient, err := versioned.NewForConfig(clientConf)
if err != nil {
logger.WithError(err).Fatal("Could not create the agones api clientset")
}

health := healthcheck.NewHandler()
wh := webhooks.NewWebHook(ctlConf.certFile, ctlConf.keyFile)
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformationFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)

gsController := gameservers.NewController(wh, health,
ctlConf.minPort, ctlConf.maxPort, ctlConf.sidecarImage, ctlConf.alwaysPullSidecar,
kubeClient, kubeInformationFactory, extClient, agonesClient, agonesInformerFactory)
gsSetController := gameserversets.NewController(wh, health,
kubeClient, extClient, agonesClient, agonesInformerFactory)
fleetController := fleets.NewController(wh, health,
kubeClient, extClient, agonesClient, agonesInformerFactory)
faController := fleetallocation.NewController(wh, kubeClient, extClient, agonesClient, agonesInformerFactory)

stop := signals.NewStopChannel()

kubeInformationFactory.Start(stop)
agonesInformerFactory.Start(stop)

rs := []runner{
wh, gsController, gsSetController, fleetController, faController, healthServer{handler: health},
}
for _, r := range rs {
go func(rr runner) {
if runErr := rr.Run(workers, stop); runErr != nil {
logger.WithError(runErr).Fatalf("could not start runner: %s", reflect.TypeOf(rr))
}
}(r)
}

<-stop
logger.Info("Shut down agones controllers")
}

func parseEnvFlags() config {
exec, err := os.Executable()
if err != nil {
logger.WithError(err).Fatal("Could not get executable path")
Expand Down Expand Up @@ -101,95 +163,60 @@ func main() { // nolint: gocyclo
WithField("alwaysPullSidecarImage", alwaysPullSidecar).
WithField("Version", pkg.Version).Info("starting gameServer operator...")

if minPort <= 0 || maxPort <= 0 {
logger.Fatal("Min Port and Max Port values are required.")
} else if maxPort < minPort {
logger.Fatal("Max Port cannot be set less that the Min Port")
}

config, err := rest.InClusterConfig()
if err != nil {
logger.WithError(err).Fatal("Could not create in cluster config")
return config{
minPort: minPort,
maxPort: maxPort,
sidecarImage: sidecarImage,
alwaysPullSidecar: alwaysPullSidecar,
keyFile: keyFile,
certFile: certFile,
}
}

kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
logger.WithError(err).Fatal("Could not create the kubernetes clientset")
}
// config stores all required configuration to create a game server controller.
type config struct {
minPort int32
maxPort int32
sidecarImage string
alwaysPullSidecar bool
keyFile string
certFile string
}

extClient, err := extclientset.NewForConfig(config)
if err != nil {
logger.WithError(err).Fatal("Could not create the api extension clientset")
// validate ensures the ctlConfig data is valid.
func (c config) validate() error {
if c.minPort <= 0 || c.maxPort <= 0 {
return errors.New("min Port and Max Port values are required")
}

agonesClient, err := versioned.NewForConfig(config)
if err != nil {
logger.WithError(err).Fatal("Could not create the agones api clientset")
if c.maxPort < c.minPort {
return errors.New("max Port cannot be set less that the Min Port")
}
return nil
}

health := healthcheck.NewHandler()
wh := webhooks.NewWebHook(certFile, keyFile)
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, 30*time.Second)
kubeInformationFactory := informers.NewSharedInformerFactory(kubeClient, 30*time.Second)

gsController := gameservers.NewController(wh, health, minPort, maxPort, sidecarImage, alwaysPullSidecar, kubeClient, kubeInformationFactory, extClient, agonesClient, agonesInformerFactory)
gsSetController := gameserversets.NewController(wh, health, kubeClient, extClient, agonesClient, agonesInformerFactory)
fleetController := fleets.NewController(wh, health, kubeClient, extClient, agonesClient, agonesInformerFactory)
faController := fleetallocation.NewController(wh, kubeClient, extClient, agonesClient, agonesInformerFactory)

stop := signals.NewStopChannel()

kubeInformationFactory.Start(stop)
agonesInformerFactory.Start(stop)
type runner interface {
Run(workers int, stop <-chan struct{}) error
}

go func() {
if err := wh.Run(stop); err != nil { // nolint: vetshadow
logger.WithError(err).Fatal("could not run webhook server")
}
}()
go func() {
err = gsController.Run(controllerThreadiness, stop)
if err != nil {
logger.WithError(err).Fatal("Could not run gameserver controller")
}
}()
go func() {
err = gsSetController.Run(controllerThreadiness, stop)
if err != nil {
logger.WithError(err).Fatal("Could not run gameserverset controller")
}
}()
go func() {
err = fleetController.Run(controllerThreadiness, stop)
if err != nil {
logger.WithError(err).Fatal("Could not run fleet controller")
}
}()
go func() {
err = faController.Run(stop)
if err != nil {
logger.WithError(err).Fatal("Could not run fleet controller")
}
}()
type healthServer struct {
handler http.Handler
}

go func() {
logger.Info("Starting health check...")
srv := &http.Server{
Addr: ":8080",
Handler: health,
}
defer srv.Close() // nolint: errcheck

if err := srv.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("health check: http server closed")
} else {
err := errors.Wrap(err, "Could not listen on :8080")
runtime.HandleError(logger.WithError(err), err)
}
func (h healthServer) Run(workers int, stop <-chan struct{}) error {
logger.Info("Starting health check...")
srv := &http.Server{
Addr: ":8080",
Handler: h.handler,
}
defer srv.Close() // nolint: errcheck

if err := srv.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
logger.WithError(err).Info("health check: http server closed")
} else {
wrappedErr := errors.Wrap(err, "Could not listen on :8080")
runtime.HandleError(logger.WithError(wrappedErr), wrappedErr)
}
}()

<-stop
logger.Info("Shut down agones controllers")
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/fleetallocation/controller.go
Expand Up @@ -104,7 +104,7 @@ func NewController(

// Run runs this controller. This controller doesn't (currently)
// have a worker/queue, and as such, does not block.
func (c *Controller) Run(stop <-chan struct{}) error {
func (c *Controller) Run(workers int, stop <-chan struct{}) error {
err := crd.WaitForEstablishedCRD(c.crdGetter, "fleetallocations."+stable.GroupName, c.logger)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/fleets/controller.go
Expand Up @@ -160,7 +160,7 @@ func (c *Controller) creationMutationHandler(review admv1beta1.AdmissionReview)

// Run the Fleet controller. Will block until stop is closed.
// Runs threadiness number workers to process the rate limited queue
func (c *Controller) Run(threadiness int, stop <-chan struct{}) error {
func (c *Controller) Run(workers int, stop <-chan struct{}) error {
err := crd.WaitForEstablishedCRD(c.crdGetter, "fleets.stable.agones.dev", c.logger)
if err != nil {
return err
Expand All @@ -171,7 +171,7 @@ func (c *Controller) Run(threadiness int, stop <-chan struct{}) error {
return errors.New("failed to wait for caches to sync")
}

c.workerqueue.Run(threadiness, stop)
c.workerqueue.Run(workers, stop)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/gameservers/controller.go
Expand Up @@ -220,7 +220,7 @@ func (c *Controller) creationValidationHandler(review admv1beta1.AdmissionReview

// Run the GameServer controller. Will block until stop is closed.
// Runs threadiness number workers to process the rate limited queue
func (c *Controller) Run(threadiness int, stop <-chan struct{}) error {
func (c *Controller) Run(workers int, stop <-chan struct{}) error {
err := crd.WaitForEstablishedCRD(c.crdGetter, "gameservers.stable.agones.dev", c.logger)
if err != nil {
return err
Expand All @@ -239,7 +239,7 @@ func (c *Controller) Run(threadiness int, stop <-chan struct{}) error {
// Run the Health Controller
go c.healthController.Run(stop)

c.workerqueue.Run(threadiness, stop)
c.workerqueue.Run(workers, stop)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/gameserversets/controller.go
Expand Up @@ -126,7 +126,7 @@ func NewController(

// Run the GameServerSet controller. Will block until stop is closed.
// Runs threadiness number workers to process the rate limited queue
func (c *Controller) Run(threadiness int, stop <-chan struct{}) error {
func (c *Controller) Run(workers int, stop <-chan struct{}) error {
err := crd.WaitForEstablishedCRD(c.crdGetter, "gameserversets."+stable.GroupName, c.logger)
if err != nil {
return err
Expand All @@ -137,7 +137,7 @@ func (c *Controller) Run(threadiness int, stop <-chan struct{}) error {
return errors.New("failed to wait for caches to sync")
}

c.workerqueue.Run(threadiness, stop)
c.workerqueue.Run(workers, stop)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/webhooks/webhooks.go
Expand Up @@ -75,7 +75,7 @@ func NewWebHook(certFile, keyFile string) *WebHook {

// Run runs the webhook server, starting a https listener.
// Will block on stop channel
func (wh *WebHook) Run(stop <-chan struct{}) error {
func (wh *WebHook) Run(workers int, stop <-chan struct{}) error {
go func() {
<-stop
wh.server.Close() // nolint: errcheck
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/webhooks/webhooks_test.go
Expand Up @@ -116,7 +116,7 @@ func TestWebHookAddHandler(t *testing.T) {
})
}

err := wh.Run(stop)
err := wh.Run(0, stop)
assert.Nil(t, err)

client := ts.server.Client()
Expand Down

0 comments on commit 8ff7632

Please sign in to comment.