From b09f05849a0d786f299a62e301a305f5ed002056 Mon Sep 17 00:00:00 2001 From: Peng Liu Date: Mon, 22 Mar 2021 22:55:59 -0400 Subject: [PATCH 1/3] Pause MCP before draining/rebooting node The config daemon instance on each node need to compete for the 'Draining' lock before it can draining/rebooting a node. After a config daemon instance gets the lock, it checks its MachineConfigPool conditions. If the MCP is in ready state, it will pause the MCP then process, otherwise, it wait for the MCP getting ready. The MCP will be resumed after the config daemon and release the lock after the config dameon finishes its work. --- cmd/sriov-network-config-daemon/start.go | 6 + deploy/clusterrole.yaml | 3 + pkg/daemon/daemon.go | 163 ++++++++++++++++++++--- pkg/plugins/generic/generic_plugin.go | 20 +-- 4 files changed, 164 insertions(+), 28 deletions(-) diff --git a/cmd/sriov-network-config-daemon/start.go b/cmd/sriov-network-config-daemon/start.go index 8659a1489..8efc7a6b9 100644 --- a/cmd/sriov-network-config-daemon/start.go +++ b/cmd/sriov-network-config-daemon/start.go @@ -23,6 +23,9 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/connrotation" + + mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" + mcclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" ) var ( @@ -124,9 +127,11 @@ func runStartCmd(cmd *cobra.Command, args []string) { } sriovnetworkv1.AddToScheme(scheme.Scheme) + mcfgv1.AddToScheme(scheme.Scheme) snclient := snclientset.NewForConfigOrDie(config) kubeclient := kubernetes.NewForConfigOrDie(config) + mcclient := mcclientset.NewForConfigOrDie(config) config.Timeout = 5 * time.Second writerclient := snclientset.NewForConfigOrDie(config) @@ -161,6 +166,7 @@ func runStartCmd(cmd *cobra.Command, args []string) { startOpts.nodeName, snclient, kubeclient, + mcclient, exitCh, stopCh, syncCh, diff --git a/deploy/clusterrole.yaml b/deploy/clusterrole.yaml index b7aba3e22..77869f52c 100644 --- a/deploy/clusterrole.yaml +++ b/deploy/clusterrole.yaml @@ -48,3 +48,6 @@ rules: - apiGroups: [""] resources: ["pods/eviction"] verbs: ["create"] +- apiGroups: ["machineconfiguration.openshift.io"] + resources: ["*"] + verbs: ["*"] diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 12facee24..0fe09d748 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -34,12 +34,14 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/kubectl/pkg/drain" - // "k8s.io/client-go/kubernetes/scheme" sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils" + mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" + mcclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" + mcfginformers "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions" ) const ( @@ -69,6 +71,8 @@ type Daemon struct { // kubeClient allows interaction with Kubernetes, including the node we are running on. kubeClient *kubernetes.Clientset + mcClient *mcclientset.Clientset + nodeState *sriovnetworkv1.SriovNetworkNodeState LoadedPlugins map[string]VendorPlugin @@ -98,6 +102,8 @@ type Daemon struct { nodeLister listerv1.NodeLister workqueue workqueue.RateLimitingInterface + + mcpName string } type workItem struct { @@ -105,10 +111,11 @@ type workItem struct { } const ( - scriptsPath = "/bindata/scripts/enable-rdma.sh" - annoKey = "sriovnetwork.openshift.io/state" - annoIdle = "Idle" - annoDraining = "Draining" + scriptsPath = "/bindata/scripts/enable-rdma.sh" + annoKey = "sriovnetwork.openshift.io/state" + annoIdle = "Idle" + annoDraining = "Draining" + annoMcpPaused = "Draining_MCP_Paused" ) var namespace = os.Getenv("NAMESPACE") @@ -129,6 +136,7 @@ func New( nodeName string, client snclientset.Interface, kubeClient *kubernetes.Clientset, + mcClient *mcclientset.Clientset, exitCh chan<- error, stopCh <-chan struct{}, syncCh <-chan struct{}, @@ -140,11 +148,11 @@ func New( platform: platformType, client: client, kubeClient: kubeClient, + mcClient: mcClient, exitCh: exitCh, stopCh: stopCh, syncCh: syncCh, refreshCh: refreshCh, - drainable: true, nodeState: &sriovnetworkv1.SriovNetworkNodeState{}, drainer: &drain.Helper{ Client: kubeClient, @@ -398,13 +406,11 @@ func (dn *Daemon) nodeUpdateHandler(old, new interface{}) { return } for _, node := range nodes { - if node.GetName() != dn.name && node.Annotations[annoKey] == annoDraining { - glog.V(2).Infof("nodeUpdateHandler(): node %s is draining", node.Name) + if node.GetName() != dn.name && (node.Annotations[annoKey] == annoDraining || node.Annotations[annoKey] == annoMcpPaused) { dn.drainable = false return } } - glog.V(2).Infof("nodeUpdateHandler(): no other node is draining") dn.drainable = true } @@ -507,6 +513,10 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error { } return nil } + if err = dn.getNodeMachinePool(); err != nil { + return err + } + if reqDrain && !dn.disableDrain { glog.Info("nodeStateSyncHandler(): drain node") if err := dn.drainNode(dn.name); err != nil { @@ -538,7 +548,7 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error { } } - if anno, ok := dn.node.Annotations[annoKey]; ok && anno == annoDraining { + if anno, ok := dn.node.Annotations[annoKey]; ok && (anno == annoDraining || anno == annoMcpPaused) { if err := dn.completeDrain(); err != nil { glog.Errorf("nodeStateSyncHandler(): failed to complete draining: %v", err) return err @@ -565,8 +575,17 @@ func (dn *Daemon) completeDrain() error { return err } + if utils.ClusterType == utils.ClusterTypeOpenshift { + glog.Infof("completeDrain(): resume MCP %s", dn.mcpName) + pausePatch := []byte("{\"spec\":{\"paused\":false}}") + if _, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}); err != nil { + glog.Errorf("completeDrain(): failed to resume MCP %s: %v", dn.mcpName, err) + return err + } + } + if err := dn.annotateNode(dn.name, annoIdle); err != nil { - glog.Errorf("drainNode(): failed to annotate node: %v", err) + glog.Errorf("completeDrain(): failed to annotate node: %v", err) return err } return nil @@ -730,21 +749,127 @@ func (dn *Daemon) annotateNode(node, value string) error { return nil } +func (dn *Daemon) getNodeMachinePool() error { + mcpList, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + glog.Errorf("getNodeMachinePool(): Failed to list Machine Config Pools: %v", err) + return err + } + var mcp mcfgv1.MachineConfigPool + for _, mcp = range mcpList.Items { + selector, err := metav1.LabelSelectorAsSelector(mcp.Spec.NodeSelector) + if err != nil { + glog.Errorf("getNodeMachinePool(): Machine Config Pool %s invalid label selector: %v", mcp.GetName(), err) + return err + } + + if selector.Matches(labels.Set(dn.node.Labels)) { + dn.mcpName = mcp.GetName() + glog.Infof("getNodeMachinePool(): find node in MCP %s", dn.mcpName) + return nil + } + } + return fmt.Errorf("getNodeMachinePool(): Failed to find the MCP of the node") +} + func (dn *Daemon) drainNode(name string) error { glog.Info("drainNode(): Update prepared") var err error + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() // wait a random time to avoid all the nodes drain at the same time - wait.PollUntil(time.Duration(rand.Intn(15)+1)*time.Second, func() (bool, error) { + time.Sleep(wait.Jitter(3*time.Second, 3)) + wait.JitterUntil(func() { if !dn.drainable { - glog.Info("drainNode(): other node is draining, waiting...") + glog.V(2).Info("drainNode(): other node is draining") + return + } + glog.V(2).Info("drainNode(): no other node is draining") + err = dn.annotateNode(dn.name, annoDraining) + if err != nil { + glog.Errorf("drainNode(): Failed to annotate node: %v", err) + return + } + cancel() + }, 3*time.Second, 3, true, ctx.Done()) + + if utils.ClusterType == utils.ClusterTypeOpenshift { + mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient, + time.Second*30, + ) + mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer() + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + paused := dn.node.Annotations[annoKey] == annoMcpPaused + + mcpEventHandler := func(obj interface{}) { + mcp := obj.(*mcfgv1.MachineConfigPool) + if mcp.GetName() != dn.mcpName { + return + } + // Always get the latest object + newMcp, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{}) + if err != nil { + glog.V(2).Infof("drainNode(): Failed to get MCP %s: %v", dn.mcpName, err) + return + } + if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) && + mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) && + mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) { + glog.V(2).Infof("drainNode(): MCP %s is ready", dn.mcpName) + if paused { + glog.V(2).Info("drainNode(): stop MCP informer", dn.mcpName) + cancel() + return + } + if newMcp.Spec.Paused { + glog.V(2).Infof("drainNode(): MCP %s was paused by other, wait...", dn.mcpName) + return + } + glog.Infof("drainNode(): pause MCP %s", dn.mcpName) + pausePatch := []byte("{\"spec\":{\"paused\":true}}") + _, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}) + if err != nil { + glog.V(2).Infof("drainNode(): Failed to pause MCP %s: %v", dn.mcpName, err) + return + } + err = dn.annotateNode(dn.name, annoMcpPaused) + if err != nil { + glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err) + return + } + paused = true + return + } + if paused { + glog.Infof("drainNode(): MCP is processing, resume MCP %s", dn.mcpName) + pausePatch := []byte("{\"spec\":{\"paused\":false}}") + _, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}) + if err != nil { + glog.V(2).Infof("drainNode(): fail to resume MCP %s: %v", dn.mcpName, err) + return + } + err = dn.annotateNode(dn.name, annoDraining) + if err != nil { + glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err) + return + } + paused = false + } + glog.Infof("drainNode():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions) } - return dn.drainable, nil - }, dn.stopCh) - err = dn.annotateNode(dn.name, annoDraining) - if err != nil { - glog.Errorf("drainNode(): Failed to annotate node: %v", err) - return err + mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: mcpEventHandler, + UpdateFunc: func(old, new interface{}) { + mcpEventHandler(new) + }, + }) + mcpInformerFactory.Start(ctx.Done()) + mcpInformerFactory.WaitForCacheSync(ctx.Done()) + <-ctx.Done() } backoff := wait.Backoff{ diff --git a/pkg/plugins/generic/generic_plugin.go b/pkg/plugins/generic/generic_plugin.go index 1b51c6813..966fdc8e0 100644 --- a/pkg/plugins/generic/generic_plugin.go +++ b/pkg/plugins/generic/generic_plugin.go @@ -190,15 +190,17 @@ func needDrainNode(desired sriovnetworkv1.Interfaces, current sriovnetworkv1.Int // ignore swichdev device break } - if iface.NumVfs != ifaceStatus.NumVfs { - glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect NumVfs %v, current NumVfs %v", iface.NumVfs, ifaceStatus.NumVfs) - needDrain = true - return - } - if iface.Mtu != 0 && iface.Mtu != ifaceStatus.Mtu { - glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect MTU %v, current MTU %v", iface.Mtu, ifaceStatus.Mtu) - needDrain = true - return + if ifaceStatus.NumVfs != 0 { + if iface.NumVfs != ifaceStatus.NumVfs { + glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect NumVfs %v, current NumVfs %v", iface.NumVfs, ifaceStatus.NumVfs) + needDrain = true + return + } + if iface.Mtu != 0 && iface.Mtu != ifaceStatus.Mtu { + glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect MTU %v, current MTU %v", iface.Mtu, ifaceStatus.Mtu) + needDrain = true + return + } } } } From fbdb84b70212385f7f1d84dd6453ea6eea9ef595 Mon Sep 17 00:00:00 2001 From: Peng Liu Date: Fri, 7 May 2021 08:25:31 -0400 Subject: [PATCH 2/3] Fix setup-envtest.sh source in Makefile --- Makefile | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 5ab313c8f..91e75ead7 100644 --- a/Makefile +++ b/Makefile @@ -75,7 +75,7 @@ image: ; $(info Building image...) ENVTEST_ASSETS_DIR=$(shell pwd)/testbin test: generate vet manifests mkdir -p ${ENVTEST_ASSETS_DIR} - test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/master/hack/setup-envtest.sh + test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); go test ./... -coverprofile cover.out -v # Build manager binary @@ -201,12 +201,15 @@ test-e2e-conformance: test-e2e: generate vet manifests skopeo mkdir -p ${ENVTEST_ASSETS_DIR} - test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/master/hack/setup-envtest.sh - source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); source hack/env.sh; go test ./test/e2e/... -coverprofile cover.out -v + test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh + source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); source hack/env.sh; go test ./test/e2e/... -timeout 60m -coverprofile cover.out -v + +test-e2e-k8s: export NAMESPACE=sriov-network-operator +test-e2e-k8s: test-e2e test-%: generate vet manifests mkdir -p ${ENVTEST_ASSETS_DIR} - test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/master/hack/setup-envtest.sh + test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); go test ./$*/... -coverprofile cover.out -v # deploy-setup-k8s: export NAMESPACE=sriov-network-operator From f879032944116472bd2f58d2732ba778afb01826 Mon Sep 17 00:00:00 2001 From: Peng Liu Date: Thu, 13 May 2021 05:07:27 -0400 Subject: [PATCH 3/3] Add machineconfiguration cluster rule to CSV manifest --- .../sriov-network-operator.v4.7.0.clusterserviceversion.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/manifests/4.7/sriov-network-operator.v4.7.0.clusterserviceversion.yaml b/manifests/4.7/sriov-network-operator.v4.7.0.clusterserviceversion.yaml index ff2e2b32e..8b3288090 100644 --- a/manifests/4.7/sriov-network-operator.v4.7.0.clusterserviceversion.yaml +++ b/manifests/4.7/sriov-network-operator.v4.7.0.clusterserviceversion.yaml @@ -201,6 +201,9 @@ spec: - apiGroups: [""] resources: ["pods/eviction"] verbs: ["create"] + - apiGroups: ["machineconfiguration.openshift.io"] + resources: ["*"] + verbs: ["*"] serviceAccountName: sriov-network-config-daemon permissions: - rules: