Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

operator: LeaderElectionReleaseOnCancel #556

Merged
merged 3 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deploy/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ spec:
image: $SRIOV_NETWORK_OPERATOR_IMAGE
command:
- sriov-network-operator
args:
- --leader-elect=$OPERATOR_LEADER_ELECTION_ENABLE
SchSeba marked this conversation as resolved.
Show resolved Hide resolved
resources:
requests:
cpu: 100m
Expand Down
6 changes: 6 additions & 0 deletions deploy/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ rules:
- get
- list
- watch
- apiGroups:
- 'coordination.k8s.io'
resources:
- 'leases'
verbs:
- '*'
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
Expand Down
6 changes: 6 additions & 0 deletions deployment/sriov-network-operator/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ rules:
- get
- list
- watch
- apiGroups:
- 'coordination.k8s.io'
resources:
- 'leases'
verbs:
- '*'
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
Expand Down
3 changes: 2 additions & 1 deletion hack/deploy-wait.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ done

if ! $ready; then
echo "Timed out waiting for features to be ready"
oc get nodes
kubectl get nodes
kubectl cluster-info dump -n ${NAMESPACE}
exit 1
fi
1 change: 1 addition & 0 deletions hack/env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ export ADMISSION_CONTROLLERS_CERTIFICATES_CERT_MANAGER_ENABLED=${ADMISSION_CONTR
export ADMISSION_CONTROLLERS_CERTIFICATES_OPERATOR_CA_CRT=${ADMISSION_CONTROLLERS_CERTIFICATES_OPERATOR_CA_CRT:-""}
export ADMISSION_CONTROLLERS_CERTIFICATES_INJECTOR_CA_CRT=${ADMISSION_CONTROLLERS_CERTIFICATES_INJECTOR_CA_CRT:-""}
export DEV_MODE=${DEV_MODE:-"FALSE"}
export OPERATOR_LEADER_ELECTION_ENABLE=${OPERATOR_LEADER_ELECTION_ENABLE:-"false"}
1 change: 1 addition & 0 deletions hack/run-e2e-conformance-virtual-ocp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ export OPERATOR_EXEC=kubectl
export CLUSTER_TYPE=openshift
export DEV_MODE=TRUE
export CLUSTER_HAS_EMULATED_PF=TRUE
export OPERATOR_LEADER_ELECTION_ENABLE=true
SchSeba marked this conversation as resolved.
Show resolved Hide resolved

export SRIOV_NETWORK_OPERATOR_IMAGE="$registry/$NAMESPACE/sriov-network-operator:latest"
export SRIOV_NETWORK_CONFIG_DAEMON_IMAGE="$registry/$NAMESPACE/sriov-network-config-daemon:latest"
Expand Down
130 changes: 104 additions & 26 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/k8snetworkplumbingwg/sriov-network-operator/controllers"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/leaderelection"

"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
snolog "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/log"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/platforms"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
Expand Down Expand Up @@ -100,22 +101,42 @@ func main() {

le := leaderelection.GetLeaderElectionConfig(kubeClient, enableLeaderElection)

leaderElectionMgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
HealthProbeBindAddress: probeAddr,
Metrics: server.Options{BindAddress: "0"},
LeaderElection: enableLeaderElection,
LeaseDuration: &le.LeaseDuration,
LeaderElectionReleaseOnCancel: true,
RenewDeadline: &le.RenewDeadline,
RetryPeriod: &le.RetryPeriod,
LeaderElectionID: consts.LeaderElectionID,
})
if err != nil {
setupLog.Error(err, "unable to start leader election manager")
os.Exit(1)
}

if err := leaderElectionMgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := leaderElectionMgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}

mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
Metrics: server.Options{BindAddress: metricsAddr},
WebhookServer: webhook.NewServer(webhook.Options{Port: 9443}),
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaseDuration: &le.LeaseDuration,
RenewDeadline: &le.RenewDeadline,
RetryPeriod: &le.RetryPeriod,
LeaderElectionID: "a56def2a.openshift.io",
Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{vars.Namespace: {}}},
Scheme: scheme,
Metrics: server.Options{BindAddress: metricsAddr},
WebhookServer: webhook.NewServer(webhook.Options{Port: 9443}),
Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{vars.Namespace: {}}},
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

mgrGlobal, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
Metrics: server.Options{BindAddress: "0"},
Expand Down Expand Up @@ -225,29 +246,86 @@ func main() {
}
// +kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
leaderElectionErr := make(chan error)
leaderElectionContext, cancelLeaderElection := context.WithCancel(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to create this WithCancel context using the stopCh context (created below) instead of a context.Background

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That way leaderElectionContext would stop as soon as the stop context is Done. Which is what I'm trying to achieve with this PR.

I tried using nested contexts here
https://go.dev/play/p/6dFfBQyXlW1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by using the stop context as parent of this one, the leaderElectionMgr.Start function will be cancelled if a singint or sigterm is received.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I don't want to happen. Here's the ending sequence

  • sigint arrive
  • stopCh is Done
  • mgr and mgrGlobals finish their work and return
  • as mgr is not on a go routing, when it returns it complete the function, triggering all the defers
  • utils.Shutdown() defer goes first. do the cleanup (the leader election is still running here, so we have the lock lease)
  • cancelLeaderElection() defer goes second, stopping the manager which in turn release the lock

Am I missing something here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it would be the same as using the ReleaseOnCancel option.

My concern here is that after calling cancelLeaderElection() (with the defer), the program exits, so nothing ensures that leaderElectionMgr really finishes. There is a race condition.

The internal implementation of the ReleaseOnCancel option uses a channel to ensure that the routine has stopped before continuing with the shutdown. Perhaps you can do the same here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern here is that after calling cancelLeaderElection() (with the defer), the program exits, so nothing ensures that leaderElectionMgr really finishes. There is a race condition.

Added a wait group to ensure leaderElectionMgr is correctly stopped

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems to me that this never exits. The leader election manager is stopped with a defer but before that, you are waiting for this to stop, so, it won't happen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

go func() {
setupLog.Info("starting leader election manager")
leaderElectionErr <- leaderElectionMgr.Start(leaderElectionContext)
}()

select {
case <-leaderElectionMgr.Elected():
case err := <-leaderElectionErr:
setupLog.Error(err, "Leader Election Manager error")
os.Exit(1)
}

stopCh := ctrl.SetupSignalHandler()
setupLog.Info("acquired lease")

stopSignalCh := ctrl.SetupSignalHandler()

globalManagerErr := make(chan error)
globalManagerCtx, globalManagerCancel := context.WithCancel(context.Background())
go func() {
if err := mgrGlobal.Start(stopCh); err != nil {
setupLog.Error(err, "Manager Global exited non-zero")
os.Exit(1)
}
setupLog.Info("starting global manager")
globalManagerErr <- mgrGlobal.Start(globalManagerCtx)
}()

// Remove all finalizers after controller is shut down
defer utils.Shutdown()
namespacedManagerErr := make(chan error)
namespacedManagerCtx, namespacedManagerCancel := context.WithCancel(context.Background())
go func() {
setupLog.Info("starting namespaced manager")
namespacedManagerErr <- mgr.Start(namespacedManagerCtx)
}()

select {
// Wait for a stop signal
case <-stopSignalCh.Done():
setupLog.Info("Stop signal received")

globalManagerCancel()
namespacedManagerCancel()
<-globalManagerErr
<-namespacedManagerErr

utils.Shutdown()

cancelLeaderElection()
<-leaderElectionErr

case err := <-leaderElectionErr:
setupLog.Error(err, "Leader Election Manager error")
globalManagerCancel()
namespacedManagerCancel()
<-globalManagerErr
<-namespacedManagerErr

os.Exit(1)

case err := <-globalManagerErr:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a shutdown should be included here as well since the namespace manager might be up, and perhaps also in the case above. Not sure since the operator won't be elected anymore for that case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with including utils.Shutdown() in case of globalManager and namespacedManager error. I wouldn't do it in case of leaderelection error

setupLog.Error(err, "Global Manager error")

namespacedManagerCancel()
<-namespacedManagerErr

utils.Shutdown()

cancelLeaderElection()
<-leaderElectionErr

os.Exit(1)

case err := <-namespacedManagerErr:
setupLog.Error(err, "Namsepaced Manager error")

globalManagerCancel()
<-globalManagerErr

utils.Shutdown()

cancelLeaderElection()
<-leaderElectionErr

setupLog.Info("starting manager")
if err := mgr.Start(stopCh); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/consts/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
ServiceAccount = "ServiceAccount"
DPConfigFileName = "config.json"
OVSHWOLMachineConfigNameSuffix = "ovs-hw-offload"
LeaderElectionID = "a56def2a.openshift.io"

LinkTypeEthernet = "ether"
LinkTypeInfiniband = "infiniband"
Expand Down
46 changes: 43 additions & 3 deletions test/conformance/tests/test_sriov_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

sriovv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
"github.com/k8snetworkplumbingwg/sriov-network-operator/test/util/clean"
"github.com/k8snetworkplumbingwg/sriov-network-operator/test/util/cluster"
"github.com/k8snetworkplumbingwg/sriov-network-operator/test/util/discovery"
Expand Down Expand Up @@ -276,6 +277,35 @@ var _ = Describe("[sriov] operator", func() {
}, 3*time.Minute, 5*time.Second).Should(Succeed())
})
})

It("should gracefully restart quickly", func() {
// This test case ensure leader election process runs smoothly when the operator's pod is restarted.
oldLease, err := clients.CoordinationV1Interface.Leases(operatorNamespace).Get(context.Background(), consts.LeaderElectionID, metav1.GetOptions{})
if k8serrors.IsNotFound(err) {
Skip("Leader Election is not enabled on the cluster. Skipping")
}
Expect(err).ToNot(HaveOccurred())

oldOperatorPod := getOperatorPod()

By("Delete the operator's pod")
deleteOperatorPod()

By("Wait the new operator's pod to start")
Eventually(func(g Gomega) {
newOperatorPod := getOperatorPod()
Expect(newOperatorPod.Name).ToNot(Equal(oldOperatorPod.Name))
Expect(newOperatorPod.Status.Phase).To(Equal(corev1.PodRunning))
}, 45*time.Second, 5*time.Second)

By("Assert the new operator's pod acquire the lease before 30 seconds")
Eventually(func(g Gomega) {
newLease, err := clients.CoordinationV1Interface.Leases(operatorNamespace).Get(context.Background(), consts.LeaderElectionID, metav1.GetOptions{})
g.Expect(err).ToNot(HaveOccurred())

g.Expect(newLease.Spec.HolderIdentity).ToNot(Equal(oldLease.Spec.HolderIdentity))
}, 30*time.Second, 5*time.Second).Should(Succeed())
})
})

Describe("Generic SriovNetworkNodePolicy", func() {
Expand Down Expand Up @@ -2627,14 +2657,17 @@ func getOperatorConfigLogLevel() int {
return cfg.Spec.LogLevel
}

func getOperatorLogs(since time.Time) []string {
func getOperatorPod() corev1.Pod {
podList, err := clients.Pods(operatorNamespace).List(context.Background(), metav1.ListOptions{
LabelSelector: "name=sriov-network-operator",
})
ExpectWithOffset(1, err).ToNot(HaveOccurred())
ExpectWithOffset(1, podList.Items).To(HaveLen(1), "One operator pod expected")
ExpectWithOffset(1, podList.Items).ToNot(HaveLen(0), "At least one operator pod expected")
return podList.Items[0]
}

pod := podList.Items[0]
func getOperatorLogs(since time.Time) []string {
pod := getOperatorPod()
logStart := metav1.NewTime(since)
rawLogs, err := clients.Pods(pod.Namespace).
GetLogs(pod.Name, &corev1.PodLogOptions{
Expand All @@ -2647,6 +2680,13 @@ func getOperatorLogs(since time.Time) []string {
return strings.Split(string(rawLogs), "\n")
}

func deleteOperatorPod() {
pod := getOperatorPod()

err := clients.Pods(operatorNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
ExpectWithOffset(1, err).ToNot(HaveOccurred())
}

func assertObjectIsNotFound(name string, obj runtimeclient.Object) {
Eventually(func() bool {
err := clients.Get(context.Background(), runtimeclient.ObjectKey{Name: name, Namespace: operatorNamespace}, obj)
Expand Down
3 changes: 3 additions & 0 deletions test/util/client/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
discovery "k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
appsv1client "k8s.io/client-go/kubernetes/typed/apps/v1"
coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand All @@ -37,6 +38,7 @@ type ClientSet struct {
clientsriovv1.SriovnetworkV1Interface
Config *rest.Config
runtimeclient.Client
coordinationv1.CoordinationV1Interface
}

// New returns a *ClientBuilder with the given kubeconfig.
Expand Down Expand Up @@ -67,6 +69,7 @@ func New(kubeconfig string) *ClientSet {
clientSet.AppsV1Interface = appsv1client.NewForConfigOrDie(config)
clientSet.DiscoveryInterface = discovery.NewDiscoveryClientForConfigOrDie(config)
clientSet.SriovnetworkV1Interface = clientsriovv1.NewForConfigOrDie(config)
clientSet.CoordinationV1Interface = coordinationv1.NewForConfigOrDie(config)
clientSet.Config = config

crScheme := runtime.NewScheme()
Expand Down
Loading