Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle termination gracefully for controller manager and scheduler #76452

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions cmd/cloud-controller-manager/app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/util/configz:go_default_library",
"//pkg/util/flag:go_default_library",
"//pkg/version/verflag:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
Expand Down
26 changes: 22 additions & 4 deletions cmd/cloud-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/spf13/cobra"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -39,6 +40,7 @@ import (
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/version"
"k8s.io/klog"

mfojtik marked this conversation as resolved.
Show resolved Hide resolved
cloudcontrollerconfig "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config"
"k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
genericcontrollermanager "k8s.io/kubernetes/cmd/controller-manager/app"
Expand Down Expand Up @@ -113,6 +115,16 @@ the cloud specific control loops shipped with Kubernetes.`,

// Run runs the ExternalCMServer. This should never exit.
func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error {
ctx, cancel := context.WithCancel(context.TODO())
mfojtik marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
mfojtik marked this conversation as resolved.
Show resolved Hide resolved
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of having both a stop channel and a done channel here? Especially as a context is usually associated with a request and our main run method does not seem related to a request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop channel is controlled by the signals( SIGTERM and SIGINT) whereas done channel is controlled by Run method and allows for graceful termination. For example, done channel will be closed when the component cannot create HTTP{S} sockets, when it loses leadership or when it receives one of the signals. Note that closing one of the channels is equivalent to requesting closing the application.

}
}()

// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())

Expand Down Expand Up @@ -151,9 +163,13 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error
if c.SecureServing != nil {
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
if serverStoppedCh, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serving should use ctx.Done()

return err
} else {
defer func() {
cancel()
<-serverStoppedCh
}()
}
}
if c.InsecureServing != nil {
Expand Down Expand Up @@ -207,13 +223,15 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that most of this pull is useful even without this change. @mfojtik can you keep this fatal and solve the 90% case first?

cancel()
utilruntime.HandleError(fmt.Errorf("leaderelection lost"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be fatal and this change appears to make it non fatal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please see my previous comment #76452 (comment)

},
},
WatchDog: electionChecker,
Name: "cloud-controller-manager",
})
panic("unreachable")

return nil
}

// startControllers starts the cloud specific controller loops.
Expand Down
4 changes: 2 additions & 2 deletions cmd/genkubedocs/gen_kube_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ func main() {
doc.GenMarkdownTree(apiserver, outDir)
case "kube-controller-manager":
// generate docs for kube-controller-manager
controllermanager := cmapp.NewControllerManagerCommand()
controllermanager := cmapp.NewControllerManagerCommand(server.SetupSignalHandler())
doc.GenMarkdownTree(controllermanager, outDir)
case "kube-proxy":
// generate docs for kube-proxy
proxy := proxyapp.NewProxyCommand()
doc.GenMarkdownTree(proxy, outDir)
case "kube-scheduler":
// generate docs for kube-scheduler
scheduler := schapp.NewSchedulerCommand()
scheduler := schapp.NewSchedulerCommand(server.SetupSignalHandler())
doc.GenMarkdownTree(scheduler, outDir)
case "kubelet":
// generate docs for kubelet
Expand Down
4 changes: 2 additions & 2 deletions cmd/genman/gen_kube_man.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
}
case "kube-controller-manager":
// generate manpage for kube-controller-manager
controllermanager := cmapp.NewControllerManagerCommand()
controllermanager := cmapp.NewControllerManagerCommand(server.SetupSignalHandler())
genMarkdown(controllermanager, "", outDir)
for _, c := range controllermanager.Commands() {
genMarkdown(c, "kube-controller-manager", outDir)
Expand All @@ -82,7 +82,7 @@ func main() {
}
case "kube-scheduler":
// generate manpage for kube-scheduler
scheduler := schapp.NewSchedulerCommand()
scheduler := schapp.NewSchedulerCommand(server.SetupSignalHandler())
genMarkdown(scheduler, "", outDir)
for _, c := range scheduler.Commands() {
genMarkdown(c, "kube-scheduler", outDir)
Expand Down
1 change: 1 addition & 0 deletions cmd/kube-controller-manager/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
importpath = "k8s.io/kubernetes/cmd/kube-controller-manager",
deps = [
"//cmd/kube-controller-manager/app:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/component-base/logs:go_default_library",
"//staging/src/k8s.io/component-base/metrics/prometheus/clientgo:go_default_library",
],
Expand Down
28 changes: 22 additions & 6 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ func ResyncPeriod(c *config.CompletedConfig) func() time.Duration {

// Run runs the KubeControllerManagerOptions. This should never exit.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()

// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())

Expand All @@ -180,16 +190,20 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
if c.SecureServing != nil {
unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
if serverStoppedCh, err := c.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
return err
} else {
defer func() {
cancel()
<-serverStoppedCh
}()
}
}
if c.InsecureServing != nil {
unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
if err := c.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
return err
}
}
Expand Down Expand Up @@ -268,21 +282,23 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
klog.Fatalf("error creating lock: %v", err)
}

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
cancel()
utilruntime.HandleError(fmt.Errorf("leaderelection lost"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be fatal and this change appears to make it non fatal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this pull introduces a graceful shutdown for kcm, ccm and scheduler, in this case, it means that when the component loses leadership it notifies and waits for all dependant controllers and listeners before shutting down. For example, for kcm it means that it will wait for all its controllers as well as for HTTPS and HTTP listeners.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fundamental guarantee we have for controllers right now is that we will not run them concurrently. (Or at least minimize then window where that might happen). This change violates that guarantee.

As soon as we are told we are not the leader anymore (OnStopLeading). We have to assume that another KCM has taken over the leadership role. We also know that we have other threads in this process which are continuing the role of active controllers. They must be stopped immediately to prevent them from making concurrent changes with the new KCM master. This needs to be fatal.

Copy link
Member

@cheftako cheftako Sep 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I'm not saying that having the process kill itself on OnStoppedLeading is the ideal solution for controller concurrency. I think we can do better. However I believe this kill itself behavior is needed for HA clusters until we build a better solution of controller concurrency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just change this to log and os.Exit(0), since this is an expected exit? cc @smarterclayton

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we probably should, I didn't know that KCM has such strong assumptions in this area especially the scheduler. Although it seems like the leader election library doesn't guarantee anything - it may happen that two KCMs will be running at the same time. Can someone confirm this?

I suspect that KCM cares about efficiency - since correctness will be checked by the API server (resourceVersion). The scheduler, on the other hand, seems to care about correctness - #76452 (review)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the KCMs in HA configurations use the fatal on leader election to ensure that there are not two active KCMs running at the same time. While it would be nice to get additional efficiency by letting multiple KCMs process simultaneously, I do not think we have the necessary correctness guarantees in place for that to be safe. resourceVersion is not sufficient for all controllers to behave correctly. (Eg. Fairly sure things like the cron/job controllers will schedule too much work)

},
},
WatchDog: electionChecker,
Name: "kube-controller-manager",
})
panic("unreachable")

return nil
}

// ControllerContext defines the context object for controller
Expand Down
3 changes: 2 additions & 1 deletion cmd/kube-controller-manager/controller-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"time"

"k8s.io/apiserver/pkg/server"
"k8s.io/component-base/logs"
_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugin
"k8s.io/kubernetes/cmd/kube-controller-manager/app"
Expand All @@ -33,7 +34,7 @@ import (
func main() {
rand.Seed(time.Now().UnixNano())

command := app.NewControllerManagerCommand()
command := app.NewControllerManagerCommand(server.SetupSignalHandler())

// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
Expand Down
1 change: 1 addition & 0 deletions cmd/kube-scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
importpath = "k8s.io/kubernetes/cmd/kube-scheduler",
deps = [
"//cmd/kube-scheduler/app:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/component-base/cli/flag:go_default_library",
"//staging/src/k8s.io/component-base/logs:go_default_library",
"//staging/src/k8s.io/component-base/metrics/prometheus/clientgo:go_default_library",
Expand Down
47 changes: 22 additions & 25 deletions cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,15 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, registryOptions ...Option) error {
// To help debugging, immediately log version
klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()

registry := framework.NewRegistry()
for _, option := range registryOptions {
Expand Down Expand Up @@ -224,10 +233,14 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
}
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil {
if serverStoppedCh, err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
} else {
defer func() {
cancel()
<-serverStoppedCh
}()
}
}

Expand All @@ -238,27 +251,12 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(stopCh)

// Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}

ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
defer cancel()

go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()

// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStartedLeading: func(context.Context) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... Are we reusing the sched object each time this process. I don't see any code that would exit after graceful cleanup. I'm certain that attempting to re-use this object will fail. The leader election context is being ignored, and we still have a cancelled context in the sched struct. (This is why it would be preferable to pass context into sched.Run().

sched.Run()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean when the context is closed? Will leaderElector.Run(ctx) below return ever if this call does not use the context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sched.config.StopEverything

It seems it use this chan to synchronize over the provided context?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing a context or stop chan in a struct is generally not preferred. Passing the context into Scheduler.Run would be more idiomatic.

},
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
Expand All @@ -269,13 +267,12 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
}

leaderElector.Run(ctx)

return fmt.Errorf("lost lease")
} else {
// Leader election is disabled, so runCommand inline until done.
sched.Run()
}

// Leader election is disabled, so runCommand inline until done.
run(ctx)
return fmt.Errorf("finished without leader elect")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear here: we change the return value to be inline with kcm and ccm.

return nil
}

// buildHandlerChain wraps the given handler with the standard filters.
Expand Down
3 changes: 2 additions & 1 deletion cmd/kube-scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/spf13/pflag"

"k8s.io/apiserver/pkg/server"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/logs"
_ "k8s.io/component-base/metrics/prometheus/clientgo"
Expand All @@ -32,7 +33,7 @@ import (
func main() {
rand.Seed(time.Now().UnixNano())

command := app.NewSchedulerCommand()
command := app.NewSchedulerCommand(server.SetupSignalHandler())

// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
Expand Down