Skip to content

Commit

Permalink
cvo: Release our leader lease when we are gracefully terminated
Browse files Browse the repository at this point in the history
On an upgrade the CVO is always restarted but we don't release our
lease, which causes the new CVO pod to have to wait. We should
release our lease on exit.

Kube 1.14 (pr 71490) contains a new flag on leader lease that allows
the caller to step down gracefully. Because backporting that change
to client-go is complicated, we instead emulate the logic. When that
code is available we can simplify down.

cmd: Refactor how the CVO is started so the integration test is consistent

Move the integration test logic into a new package and reuse startup
logic so that we have a much cleaner start command than before and so
that we are testing what we run in the command. Remove rootOpts and
startOpts and replace them with nested operations.

Add a test that verifies we send leader election events.
  • Loading branch information
smarterclayton committed Jan 18, 2019
1 parent 71c91fc commit 2b81f47
Show file tree
Hide file tree
Showing 10 changed files with 597 additions and 483 deletions.
5 changes: 0 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@ var (
Short: "Run Cluster Version Controller",
Long: "",
}

rootOpts struct {
releaseImage string
}
)

func init() {
rootCmd.PersistentFlags().AddGoFlagSet(flag.CommandLine)
rootCmd.PersistentFlags().StringVar(&rootOpts.releaseImage, "release-image", "", "The Openshift release image url.")
}

func main() {
Expand Down
8 changes: 5 additions & 3 deletions cmd/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ var (
}

renderOpts struct {
outputDir string
releaseImage string
outputDir string
}
)

func init() {
rootCmd.AddCommand(renderCmd)
renderCmd.PersistentFlags().StringVar(&renderOpts.outputDir, "output-dir", "", "The output directory where the manifests will be rendered.")
renderCmd.PersistentFlags().StringVar(&renderOpts.releaseImage, "release-image", "", "The Openshift release image url.")
}

func runRenderCmd(cmd *cobra.Command, args []string) {
Expand All @@ -34,10 +36,10 @@ func runRenderCmd(cmd *cobra.Command, args []string) {
if renderOpts.outputDir == "" {
glog.Fatalf("missing --output-dir flag, it is required")
}
if rootOpts.releaseImage == "" {
if renderOpts.releaseImage == "" {
glog.Fatalf("missing --release-image flag, it is required")
}
if err := cvo.Render(renderOpts.outputDir, rootOpts.releaseImage); err != nil {
if err := cvo.Render(renderOpts.outputDir, renderOpts.releaseImage); err != nil {
glog.Fatalf("Render command failed: %v", err)
}
}
282 changes: 17 additions & 265 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,284 +2,36 @@ package main

import (
"flag"
"fmt"
"math/rand"
"net/http"
"os"
"time"

"github.com/golang/glog"
"github.com/google/uuid"
clientset "github.com/openshift/client-go/config/clientset/versioned"
informers "github.com/openshift/client-go/config/informers/externalversions"
"github.com/openshift/cluster-version-operator/pkg/autoupdate"
"github.com/openshift/cluster-version-operator/pkg/cvo"
"github.com/openshift/cluster-version-operator/pkg/start"
"github.com/openshift/cluster-version-operator/pkg/version"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
apiext "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
)

const (
minResyncPeriod = 2 * time.Minute

leaseDuration = 90 * time.Second
renewDeadline = 45 * time.Second
retryPeriod = 30 * time.Second
)

var (
startCmd = &cobra.Command{
func init() {
opts := start.NewOptions()
cmd := &cobra.Command{
Use: "start",
Short: "Starts Cluster Version Operator",
Long: "",
Run: runStartCmd,
}

startOpts struct {
// name is provided for testing only to allow multiple CVO's to be running at once
name string
// namespace is provided for testing only
namespace string

kubeconfig string
nodeName string
listenAddr string

enableAutoUpdate bool
}
)

func init() {
rootCmd.AddCommand(startCmd)
startCmd.PersistentFlags().StringVar(&startOpts.listenAddr, "listen", "0.0.0.0:9099", "Address to listen on for metrics")
startCmd.PersistentFlags().StringVar(&startOpts.kubeconfig, "kubeconfig", "", "Kubeconfig file to access a remote cluster (testing only)")
startCmd.PersistentFlags().StringVar(&startOpts.nodeName, "node-name", "", "kubernetes node name CVO is scheduled on.")
startCmd.PersistentFlags().BoolVar(&startOpts.enableAutoUpdate, "enable-auto-update", true, "Enables the autoupdate controller.")
}

func runStartCmd(cmd *cobra.Command, args []string) {
flag.Set("logtostderr", "true")
flag.Parse()

// To help debugging, immediately log version
glog.Infof("%s", version.String)

if startOpts.nodeName == "" {
name, ok := os.LookupEnv("NODE_NAME")
if !ok || name == "" {
glog.Fatalf("node-name is required")
}
startOpts.nodeName = name
}

// exposed for end-to-end testing only
startOpts.name = os.Getenv("CVO_NAME")
if len(startOpts.name) == 0 {
startOpts.name = componentName
}
startOpts.namespace = os.Getenv("CVO_NAMESPACE")
if len(startOpts.name) == 0 {
startOpts.namespace = componentNamespace
}
Run: func(cmd *cobra.Command, args []string) {
flag.Set("logtostderr", "true")
flag.Parse()

if rootOpts.releaseImage == "" {
glog.Fatalf("missing --release-image flag, it is required")
}
// To help debugging, immediately log version
glog.Infof("%s", version.String)

if len(startOpts.listenAddr) > 0 {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
go func() {
if err := http.ListenAndServe(startOpts.listenAddr, mux); err != nil {
glog.Fatalf("Unable to start metrics server: %v", err)
if err := opts.Run(); err != nil {
glog.Fatalf("error: %v", err)
}
}()
}

cb, err := newClientBuilder(startOpts.kubeconfig)
if err != nil {
glog.Fatalf("error creating clients: %v", err)
}
stopCh := make(chan struct{})
run := func(stop <-chan struct{}) {

ctx := createControllerContext(cb, startOpts.name, stopCh)
if err := startControllers(ctx); err != nil {
glog.Fatalf("error starting controllers: %v", err)
}

ctx.CVInformerFactory.Start(ctx.Stop)
ctx.InformerFactory.Start(ctx.Stop)
close(ctx.InformersStarted)

select {}
}

leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: createResourceLock(cb),
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
panic("unreachable")
}

func createResourceLock(cb *clientBuilder) resourcelock.Interface {
recorder := record.
NewBroadcaster().
NewRecorder(runtime.NewScheme(), v1.EventSource{Component: componentName})

id, err := os.Hostname()
if err != nil {
glog.Fatalf("error creating lock: %v", err)
}

uuid, err := uuid.NewRandom()
if err != nil {
glog.Fatalf("Failed to generate UUID: %v", err)
}

// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + uuid.String()

return &resourcelock.ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{
Namespace: componentNamespace,
Name: componentName,
},
Client: cb.KubeClientOrDie("leader-election").CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
},
}
}

func resyncPeriod() func() time.Duration {
return func() time.Duration {
factor := rand.Float64() + 1
return time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
}
}

type clientBuilder struct {
config *rest.Config
}

func (cb *clientBuilder) RestConfig() *rest.Config {
c := rest.CopyConfig(cb.config)
return c
}

func (cb *clientBuilder) ClientOrDie(name string) clientset.Interface {
return clientset.NewForConfigOrDie(rest.AddUserAgent(cb.config, name))
}

func (cb *clientBuilder) KubeClientOrDie(name string) kubernetes.Interface {
return kubernetes.NewForConfigOrDie(rest.AddUserAgent(cb.config, name))
}

func (cb *clientBuilder) APIExtClientOrDie(name string) apiext.Interface {
return apiext.NewForConfigOrDie(rest.AddUserAgent(cb.config, name))
}

func newClientBuilder(kubeconfig string) (*clientBuilder, error) {
var config *rest.Config
var err error

if kubeconfig != "" {
glog.V(4).Infof("Loading kube client config from path %q", kubeconfig)
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
} else {
glog.V(4).Infof("Using in-cluster kube client config")
config, err = rest.InClusterConfig()
}
if err != nil {
return nil, err
}

return &clientBuilder{
config: config,
}, nil
}

type controllerContext struct {
ClientBuilder *clientBuilder

CVInformerFactory informers.SharedInformerFactory
InformerFactory informers.SharedInformerFactory

Stop <-chan struct{}

InformersStarted chan struct{}

ResyncPeriod func() time.Duration
}

func createControllerContext(cb *clientBuilder, name string, stop <-chan struct{}) *controllerContext {
client := cb.ClientOrDie("shared-informer")

cvInformer := informers.NewFilteredSharedInformerFactory(client, resyncPeriod()(), "", func(opts *metav1.ListOptions) {
opts.FieldSelector = fmt.Sprintf("metadata.name=%s", name)
})
sharedInformers := informers.NewSharedInformerFactory(client, resyncPeriod()())

return &controllerContext{
ClientBuilder: cb,
CVInformerFactory: cvInformer,
InformerFactory: sharedInformers,
Stop: stop,
InformersStarted: make(chan struct{}),
ResyncPeriod: resyncPeriod(),
}
}

func startControllers(ctx *controllerContext) error {
overrideDirectory := os.Getenv("PAYLOAD_OVERRIDE")
if len(overrideDirectory) > 0 {
glog.Warningf("Using an override payload directory for testing only: %s", overrideDirectory)
}

go cvo.New(
startOpts.nodeName,
startOpts.namespace, startOpts.name,
rootOpts.releaseImage,
overrideDirectory,
ctx.ResyncPeriod(),
ctx.CVInformerFactory.Config().V1().ClusterVersions(),
ctx.InformerFactory.Config().V1().ClusterOperators(),
ctx.ClientBuilder.RestConfig(),
ctx.ClientBuilder.ClientOrDie(componentName),
ctx.ClientBuilder.KubeClientOrDie(componentName),
ctx.ClientBuilder.APIExtClientOrDie(componentName),
true,
).Run(2, ctx.Stop)

if startOpts.enableAutoUpdate {
go autoupdate.New(
componentNamespace, componentName,
ctx.CVInformerFactory.Config().V1().ClusterVersions(),
ctx.InformerFactory.Config().V1().ClusterOperators(),
ctx.ClientBuilder.ClientOrDie(componentName),
ctx.ClientBuilder.KubeClientOrDie(componentName),
).Run(2, ctx.Stop)
}

return nil
cmd.PersistentFlags().StringVar(&opts.ListenAddr, "listen", opts.ListenAddr, "Address to listen on for metrics")
cmd.PersistentFlags().StringVar(&opts.Kubeconfig, "kubeconfig", opts.Kubeconfig, "Kubeconfig file to access a remote cluster (testing only)")
cmd.PersistentFlags().StringVar(&opts.NodeName, "node-name", opts.NodeName, "kubernetes node name CVO is scheduled on.")
cmd.PersistentFlags().BoolVar(&opts.EnableAutoUpdate, "enable-auto-update", opts.EnableAutoUpdate, "Enables the autoupdate controller.")
cmd.PersistentFlags().StringVar(&opts.ReleaseImage, "release-image", opts.ReleaseImage, "The Openshift release image url.")
rootCmd.AddCommand(cmd)
}
4 changes: 2 additions & 2 deletions pkg/autoupdate/autoupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/blang/semver"

"github.com/golang/glog"
"github.com/openshift/api/config/v1"
v1 "github.com/openshift/api/config/v1"
clientset "github.com/openshift/client-go/config/clientset/versioned"
"github.com/openshift/client-go/config/clientset/versioned/scheme"
configinformersv1 "github.com/openshift/client-go/config/informers/externalversions/config/v1"
Expand Down Expand Up @@ -65,7 +65,7 @@ func New(
) *Controller {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(namespace)})

ctrl := &Controller{
namespace: namespace,
Expand Down
9 changes: 7 additions & 2 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func New(
) *Operator {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(namespace)})

optr := &Operator{
nodename: nodename,
Expand All @@ -162,7 +162,7 @@ func New(

optr.configSync = NewSyncWorker(
optr.defaultPayloadRetriever(),
optr.defaultResourceBuilder(),
NewResourceBuilder(optr.restConfig),
minimumInterval,
wait.Backoff{
Duration: time.Second * 10,
Expand Down Expand Up @@ -452,3 +452,8 @@ func (optr *Operator) currentVersion() configv1.Update {
Payload: optr.releaseImage,
}
}

// SetSyncWorkerForTesting updates the sync worker for whitebox testing.
func (optr *Operator) SetSyncWorkerForTesting(worker ConfigSyncWorker) {
optr.configSync = worker
}

0 comments on commit 2b81f47

Please sign in to comment.