Skip to content

Commit

Permalink
Merge pull request #110207 from wojtek-t/clean_shutdown_managers
Browse files Browse the repository at this point in the history
Clean shutdown of kcm, ccm and scheduler
  • Loading branch information
k8s-ci-robot committed May 26, 2022
2 parents f9f9e71 + fe3616c commit 029b1bb
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 44 deletions.
4 changes: 2 additions & 2 deletions cmd/kube-controller-manager/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type Config struct {
// the rest config for the master
Kubeconfig *restclient.Config

// the event sink
EventRecorder record.EventRecorder
EventBroadcaster record.EventBroadcaster
EventRecorder record.EventRecorder
}

type completedConfig struct {
Expand Down
19 changes: 13 additions & 6 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/informers"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
restclient "k8s.io/client-go/rest"
Expand Down Expand Up @@ -174,13 +175,18 @@ func ResyncPeriod(c *config.CompletedConfig) func() time.Duration {
}
}

// Run runs the KubeControllerManagerOptions. This should never exit.
// Run runs the KubeControllerManagerOptions.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())

klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

// Start events processing pipeline.
c.EventBroadcaster.StartStructuredLogging(0)
c.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1().Events("")})
defer c.EventBroadcaster.Shutdown()

if cfgz, err := configz.New(ConfigzName); err == nil {
cfgz.Set(c.ComponentConfig)
} else {
Expand Down Expand Up @@ -213,7 +219,6 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController

run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {

controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
Expand All @@ -227,13 +232,14 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
close(controllerContext.InformersStarted)

select {}
<-ctx.Done()
}

// No leader election, run directly
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO(), saTokenControllerInitFunc, NewControllerInitializers)
panic("unreachable")
ctx, _ := wait.ContextForChannel(stopCh)
run(ctx, saTokenControllerInitFunc, NewControllerInitializers)
return nil
}

id, err := os.Hostname()
Expand Down Expand Up @@ -311,7 +317,8 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
})
}

select {}
<-stopCh
return nil
}

// ControllerContext defines the context object for controller
Expand Down
18 changes: 6 additions & 12 deletions cmd/kube-controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
clientgokubescheme "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -430,12 +429,14 @@ func (s KubeControllerManagerOptions) Config(allControllers []string, disabledBy
return nil, err
}

eventRecorder := createRecorder(client, KubeControllerManagerUserAgent)
eventBroadcaster := record.NewBroadcaster()
eventRecorder := eventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: KubeControllerManagerUserAgent})

c := &kubecontrollerconfig.Config{
Client: client,
Kubeconfig: kubeconfig,
EventRecorder: eventRecorder,
Client: client,
Kubeconfig: kubeconfig,
EventBroadcaster: eventBroadcaster,
EventRecorder: eventRecorder,
}
if err := s.ApplyTo(c); err != nil {
return nil, err
Expand All @@ -444,10 +445,3 @@ func (s KubeControllerManagerOptions) Config(allControllers []string, disabledBy

return c, nil
}

func createRecorder(kubeClient clientset.Interface, userAgent string) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
return eventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: userAgent})
}
3 changes: 2 additions & 1 deletion cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
return fmt.Errorf("unable to register configz: %s", err)
}

// Prepare the event broadcaster.
// Start events processing pipeline.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
defer cc.EventBroadcaster.Shutdown()

// Setup healthz checks.
var checks []healthz.HealthChecker
Expand Down
16 changes: 15 additions & 1 deletion cmd/kube-scheduler/app/testing/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"k8s.io/kubernetes/cmd/kube-scheduler/app"
kubeschedulerconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"

"k8s.io/klog/v2"
)

// TearDownFunc is to be called to tear down a test server.
Expand Down Expand Up @@ -61,8 +63,19 @@ type Logger interface {
// enough time to remove temporary files.
func StartTestServer(t Logger, customFlags []string) (result TestServer, err error) {
ctx, cancel := context.WithCancel(context.Background())

var errCh chan error
tearDown := func() {
cancel()

// If the scheduler was started, let's wait for it to
// shutdown clearly.
if errCh != nil {
err, ok := <-errCh
if ok && err != nil {
klog.ErrorS(err, "Failed to shutdown test server clearly")
}
}
if len(result.TmpDir) != 0 {
os.RemoveAll(result.TmpDir)
}
Expand Down Expand Up @@ -103,8 +116,9 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
return result, fmt.Errorf("failed to create config from options: %v", err)
}

errCh := make(chan error)
errCh = make(chan error)
go func(ctx context.Context) {
defer close(errCh)
if err := app.Run(ctx, cc, sched); err != nil {
errCh <- err
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,16 @@ func New(client clientset.Interface,
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
sched.SchedulingQueue.Run()
wait.UntilWithContext(ctx, sched.scheduleOne, 0)

// We need to start scheduleOne loop in a dedicated goroutine,
// because scheduleOne function hangs on getting the next item
// from the SchedulingQueue.
// If there are no new pods to schedule, it will be hanging there
// and if done in this goroutine it will be blocking closing
// SchedulingQueue, in effect causing a deadlock on shutdown.
go wait.UntilWithContext(ctx, sched.scheduleOne, 0)

<-ctx.Done()
sched.SchedulingQueue.Close()
}

Expand Down
10 changes: 5 additions & 5 deletions staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,13 @@ func (b *Backoff) Step() time.Duration {
return duration
}

// contextForChannel derives a child context from a parent channel.
// ContextForChannel derives a child context from a parent channel.
//
// The derived context's Done channel is closed when the returned cancel function
// is called or when the parent channel is closed, whichever happens first.
//
// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked.
func contextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) {
func ContextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())

go func() {
Expand Down Expand Up @@ -464,7 +464,7 @@ func PollWithContext(ctx context.Context, interval, timeout time.Duration, condi
// PollUntil always waits interval before the first run of 'condition'.
// 'condition' will always be invoked at least once.
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
ctx, cancel := contextForChannel(stopCh)
ctx, cancel := ContextForChannel(stopCh)
defer cancel()
return PollUntilWithContext(ctx, interval, condition.WithContext())
}
Expand Down Expand Up @@ -531,7 +531,7 @@ func PollImmediateWithContext(ctx context.Context, interval, timeout time.Durati
// PollImmediateUntil runs the 'condition' before waiting for the interval.
// 'condition' will always be invoked at least once.
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
ctx, cancel := contextForChannel(stopCh)
ctx, cancel := ContextForChannel(stopCh)
defer cancel()
return PollImmediateUntilWithContext(ctx, interval, condition.WithContext())
}
Expand Down Expand Up @@ -629,7 +629,7 @@ type WaitWithContextFunc func(ctx context.Context) <-chan struct{}
// "uniform pseudo-random", the `fn` might still run one or multiple time,
// though eventually `WaitFor` will return.
func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
ctx, cancel := contextForChannel(done)
ctx, cancel := ContextForChannel(done)
defer cancel()
return WaitForWithContext(ctx, wait.WithContext(), fn.WithContext())
}
Expand Down
2 changes: 1 addition & 1 deletion staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func TestContextForChannel(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := contextForChannel(parentCh)
ctx, cancel := ContextForChannel(parentCh)
defer cancel()
<-ctx.Done()
}()
Expand Down
5 changes: 4 additions & 1 deletion staging/src/k8s.io/cloud-provider/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ type Config struct {
// the rest config for the master
Kubeconfig *restclient.Config

// the event sink
// EventBroadcaster is broadcaster events to all sinks.
EventBroadcaster record.EventBroadcaster

// EventRecord is a sink for events.
EventRecorder record.EventRecorder

// ClientBuilder will provide a client for this controller to use
Expand Down
18 changes: 14 additions & 4 deletions staging/src/k8s.io/cloud-provider/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/apiserver/pkg/server/healthz"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/informers"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/restmapper"
Expand Down Expand Up @@ -142,6 +143,11 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())

// Start events processing pipeline.
c.EventBroadcaster.StartStructuredLogging(0)
c.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1().Events("")})
defer c.EventBroadcaster.Shutdown()

// setup /configz endpoint
if cz, err := configz.New(ConfigzName); err == nil {
cz.Set(c.ComponentConfig)
Expand Down Expand Up @@ -182,8 +188,10 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
}

if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO(), controllerInitializers)
panic("unreachable")
ctx, _ := wait.ContextForChannel(stopCh)
run(ctx, controllerInitializers)
<-stopCh
return nil
}

// Identity used to distinguish between multiple cloud controller manager instances
Expand Down Expand Up @@ -251,7 +259,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface
})
}

select {}
<-stopCh
return nil
}

// startControllers starts the cloud specific controller loops.
Expand Down Expand Up @@ -304,7 +313,8 @@ func startControllers(ctx context.Context, cloud cloudprovider.Interface, contro
c.SharedInformers.Start(stopCh)
controllerContext.InformerFactory.Start(controllerContext.Stop)

select {}
<-stopCh
return nil
}

// InitCloudFunc is used to initialize cloud
Expand Down
11 changes: 2 additions & 9 deletions staging/src/k8s.io/cloud-provider/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -175,7 +174,8 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, userAgent stri
return err
}

c.EventRecorder = createRecorder(c.Client, userAgent)
c.EventBroadcaster = record.NewBroadcaster()
c.EventRecorder = c.EventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: userAgent})

rootClientBuilder := clientbuilder.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
Expand Down Expand Up @@ -241,10 +241,3 @@ func (o *CloudControllerManagerOptions) Config(allControllers, disabledByDefault

return c, nil
}

func createRecorder(kubeClient clientset.Interface, userAgent string) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
return eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: userAgent})
}
5 changes: 4 additions & 1 deletion test/integration/scheduler/preemption/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,10 @@ func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...schedule
f := testCtx.Scheduler.NextPod
testCtx.Scheduler.NextPod = func() (podInfo *framework.QueuedPodInfo) {
podInfo = f()
podInfo.Pod.Status.NominatedNodeName = "node-1"
// Scheduler.Next() may return nil when scheduler is shutting down.
if podInfo != nil {
podInfo.Pod.Status.NominatedNodeName = "node-1"
}
return podInfo
}
go testCtx.Scheduler.Run(testCtx.Ctx)
Expand Down

0 comments on commit 029b1bb

Please sign in to comment.