Skip to content

Commit

Permalink
UPSTREAM: <carry>: Release lock on KCM and KS termination
Browse files Browse the repository at this point in the history
UPSTREAM: <carry>: Force releasing the lock on exit for KS

squash with UPSTREAM: <carry>: Release lock on KCM and KS termination

OpenShift-Rebase-Source: fc91252

UPSTREAM: <carry>: Release lock on KCM and KS termination
  • Loading branch information
tnozicka authored and soltysh committed Dec 8, 2023
1 parent 8e695e7 commit f1b5339
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 12 deletions.
47 changes: 36 additions & 11 deletions cmd/kube-controller-manager/app/controllermanager.go
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -154,7 +155,9 @@ controller, and serviceaccounts controller.`,

// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
return Run(context.Background(), c.Complete())

stopCh := server.SetupSignalHandler()
return Run(context.Background(), c.Complete(), stopCh)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
Expand Down Expand Up @@ -192,9 +195,9 @@ func ResyncPeriod(c *config.CompletedConfig) func() time.Duration {
}

// Run runs the KubeControllerManagerOptions.
func Run(ctx context.Context, c *config.CompletedConfig) error {
func Run(ctx context.Context, c *config.CompletedConfig, stopCh2 <-chan struct{}) error {
logger := klog.FromContext(ctx)
stopCh := ctx.Done()
stopCh := mergeCh(ctx.Done(), stopCh2)

// To help debugging, immediately log version
logger.Info("Starting", "version", version.Get())
Expand Down Expand Up @@ -323,10 +326,18 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
run(ctx, controllerDescriptors)
},
OnStoppedLeading: func() {
logger.Error(nil, "leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
select {
case <-stopCh:
// We were asked to terminate. Exit 0.
klog.Info("Requested to terminate. Exiting.")
os.Exit(0)
default:
// We lost the lock.
logger.Error(nil, "leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
})
}, stopCh)

// If Leader Migration is enabled, proceed to attempt the migration lock.
if leaderMigrator != nil {
Expand All @@ -350,10 +361,18 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
run(ctx, controllerDescriptors)
},
OnStoppedLeading: func() {
logger.Error(nil, "migration leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
select {
case <-stopCh:
// We were asked to terminate. Exit 0.
klog.Info("Requested to terminate. Exiting.")
os.Exit(0)
default:
// We lost the lock.
logger.Error(nil, "migration leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
})
}, stopCh)
}

<-stopCh
Expand Down Expand Up @@ -860,7 +879,7 @@ func createClientBuilders(logger klog.Logger, c *config.CompletedConfig) (client

// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
// TODO: extract this function into staging/controller-manager
func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks, stopCh <-chan struct{}) {
logger := klog.FromContext(ctx)
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
Expand All @@ -876,7 +895,13 @@ func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdent
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
leCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-stopCh
cancel()
}()
leaderelection.RunOrDie(leCtx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
Expand Down
14 changes: 14 additions & 0 deletions cmd/kube-controller-manager/app/patch.go
Expand Up @@ -164,3 +164,17 @@ func (rt *rejectIfNotReadyHeaderRT) RoundTrip(r *http.Request) (*http.Response,
}
return rt.baseRT.RoundTrip(r)
}

// mergeCh takes two stop channels and return a single one that
// closes as soon as one of the inputs closes or receives data.
func mergeCh(stopCh1, stopCh2 <-chan struct{}) <-chan struct{} {
merged := make(chan struct{})
go func() {
defer close(merged)
select {
case <-stopCh1:
case <-stopCh2:
}
}()
return merged
}
53 changes: 53 additions & 0 deletions cmd/kube-controller-manager/app/patch_test.go
Expand Up @@ -72,3 +72,56 @@ type fakeRTFunc func(r *http.Request) (*http.Response, error)
func (rt fakeRTFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return rt(r)
}

func TestMergeCh(t *testing.T) {
testCases := []struct {
name string
chan1 chan struct{}
chan2 chan struct{}
closeFn func(chan struct{}, chan struct{})
}{
{
name: "chan1 gets closed",
chan1: make(chan struct{}),
chan2: make(chan struct{}),
closeFn: func(a, b chan struct{}) {
close(a)
},
},
{
name: "chan2 gets closed",
chan1: make(chan struct{}),
chan2: make(chan struct{}),
closeFn: func(a, b chan struct{}) {
close(b)
},
},
{
name: "both channels get closed",
chan1: make(chan struct{}),
chan2: make(chan struct{}),
closeFn: func(a, b chan struct{}) {
close(a)
close(b)
},
},
{
name: "channel receives data and returned channel is closed",
chan1: make(chan struct{}),
chan2: make(chan struct{}),
closeFn: func(a, b chan struct{}) {
a <- struct{}{}
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
go tc.closeFn(tc.chan1, tc.chan2)
merged := mergeCh(tc.chan1, tc.chan2)
if _, ok := <-merged; ok {
t.Fatalf("expected closed channel, got data")
}
})
}
}
3 changes: 2 additions & 1 deletion cmd/kube-controller-manager/app/testing/testserver.go
Expand Up @@ -122,7 +122,8 @@ func StartTestServer(ctx context.Context, customFlags []string) (result TestServ
go func(ctx context.Context) {
defer close(errCh)

if err := app.Run(ctx, config.Complete()); err != nil {
stopCh := make(chan struct{})
if err := app.Run(ctx, config.Complete(), stopCh); err != nil {
errCh <- err
}
}(ctx)
Expand Down

0 comments on commit f1b5339

Please sign in to comment.