Skip to content

Commit

Permalink
UPSTREAM: <carry>: Release lock on KCM and KS termination
Browse files Browse the repository at this point in the history
UPSTREAM: <carry>: Force releasing the lock on exit for KS

squash with UPSTREAM: <carry>: Release lock on KCM and KS termination

OpenShift-Rebase-Source: fc91252
  • Loading branch information
tnozicka authored and bertinatto committed Mar 27, 2023
1 parent 44fd7d4 commit 8c7ad1f
Showing 1 changed file with 49 additions and 12 deletions.
61 changes: 49 additions & 12 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
genericfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -154,7 +155,9 @@ controller, and serviceaccounts controller.`,

// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
return Run(context.Background(), c.Complete())

stopCh := server.SetupSignalHandler()
return Run(context.Background(), c.Complete(), stopCh)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
Expand Down Expand Up @@ -192,7 +195,7 @@ func ResyncPeriod(c *config.CompletedConfig) func() time.Duration {
}

// Run runs the KubeControllerManagerOptions.
func Run(ctx context.Context, c *config.CompletedConfig) error {
func Run(ctx context.Context, c *config.CompletedConfig, stopCh2 <-chan struct{}) error {
logger := klog.FromContext(ctx)
stopCh := ctx.Done()

Expand All @@ -217,8 +220,12 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
hmCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-stopCh
cancel()
select {
case <-stopCh:
cancel()
case <-stopCh2:
cancel()
}
}()
go c.OpenShiftContext.PreferredHostHealthMonitor.Run(hmCtx)
}
Expand Down Expand Up @@ -321,10 +328,22 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
run(ctx, startSATokenController, initializersFunc)
},
OnStoppedLeading: func() {
logger.Error(nil, "leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
select {
case <-stopCh:
// We were asked to terminate. Exit 0.
klog.Info("Requested to terminate. Exiting.")
os.Exit(0)
case <-stopCh2:
// We were asked to terminate. Exit 0.
klog.Info("Requested to terminate. Exiting.")
os.Exit(0)
default:
// We lost the lock.
logger.Error(nil, "leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
})
}, stopCh)

// If Leader Migration is enabled, proceed to attempt the migration lock.
if leaderMigrator != nil {
Expand All @@ -345,10 +364,22 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
run(ctx, nil, createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerMigrated))
},
OnStoppedLeading: func() {
logger.Error(nil, "migration leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
select {
case <-stopCh:
// We were asked to terminate. Exit 0.
klog.Info("Requested to terminate. Exiting.")
os.Exit(0)
case <-stopCh2:
// We were asked to terminate. Exit 0.
klog.Info("Requested to terminate. Exiting.")
os.Exit(0)
default:
// We lost the lock.
logger.Error(nil, "migration leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
})
}, stopCh)
}

<-stopCh
Expand Down Expand Up @@ -768,7 +799,7 @@ func createClientBuilders(logger klog.Logger, c *config.CompletedConfig) (client

// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
// TODO: extract this function into staging/controller-manager
func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks, stopCh <-chan struct{}) {
logger := klog.FromContext(ctx)
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
Expand All @@ -784,7 +815,13 @@ func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdent
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
leCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-stopCh
cancel()
}()
leaderelection.RunOrDie(leCtx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
Expand Down

0 comments on commit 8c7ad1f

Please sign in to comment.