Skip to content

Commit

Permalink
Merge pull request #504 from pliurh/release-4.7
Browse files Browse the repository at this point in the history
[release-4.7] Bug 1960103: Pause MCP before draining/rebooting node
  • Loading branch information
openshift-merge-robot committed May 21, 2021
2 parents 199eef0 + f879032 commit 6ee0cd3
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 32 deletions.
11 changes: 7 additions & 4 deletions Makefile
Expand Up @@ -75,7 +75,7 @@ image: ; $(info Building image...)
ENVTEST_ASSETS_DIR=$(shell pwd)/testbin
test: generate vet manifests
mkdir -p ${ENVTEST_ASSETS_DIR}
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/master/hack/setup-envtest.sh
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh
source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); go test ./... -coverprofile cover.out -v

# Build manager binary
Expand Down Expand Up @@ -201,12 +201,15 @@ test-e2e-conformance:

test-e2e: generate vet manifests skopeo
mkdir -p ${ENVTEST_ASSETS_DIR}
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/master/hack/setup-envtest.sh
source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); source hack/env.sh; go test ./test/e2e/... -coverprofile cover.out -v
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh
source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); source hack/env.sh; go test ./test/e2e/... -timeout 60m -coverprofile cover.out -v

test-e2e-k8s: export NAMESPACE=sriov-network-operator
test-e2e-k8s: test-e2e

test-%: generate vet manifests
mkdir -p ${ENVTEST_ASSETS_DIR}
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/master/hack/setup-envtest.sh
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh
source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); go test ./$*/... -coverprofile cover.out -v

# deploy-setup-k8s: export NAMESPACE=sriov-network-operator
Expand Down
6 changes: 6 additions & 0 deletions cmd/sriov-network-config-daemon/start.go
Expand Up @@ -23,6 +23,9 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/connrotation"

mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
mcclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned"
)

var (
Expand Down Expand Up @@ -124,9 +127,11 @@ func runStartCmd(cmd *cobra.Command, args []string) {
}

sriovnetworkv1.AddToScheme(scheme.Scheme)
mcfgv1.AddToScheme(scheme.Scheme)

snclient := snclientset.NewForConfigOrDie(config)
kubeclient := kubernetes.NewForConfigOrDie(config)
mcclient := mcclientset.NewForConfigOrDie(config)

config.Timeout = 5 * time.Second
writerclient := snclientset.NewForConfigOrDie(config)
Expand Down Expand Up @@ -161,6 +166,7 @@ func runStartCmd(cmd *cobra.Command, args []string) {
startOpts.nodeName,
snclient,
kubeclient,
mcclient,
exitCh,
stopCh,
syncCh,
Expand Down
3 changes: 3 additions & 0 deletions deploy/clusterrole.yaml
Expand Up @@ -48,3 +48,6 @@ rules:
- apiGroups: [""]
resources: ["pods/eviction"]
verbs: ["create"]
- apiGroups: ["machineconfiguration.openshift.io"]
resources: ["*"]
verbs: ["*"]
Expand Up @@ -201,6 +201,9 @@ spec:
- apiGroups: [""]
resources: ["pods/eviction"]
verbs: ["create"]
- apiGroups: ["machineconfiguration.openshift.io"]
resources: ["*"]
verbs: ["*"]
serviceAccountName: sriov-network-config-daemon
permissions:
- rules:
Expand Down
163 changes: 144 additions & 19 deletions pkg/daemon/daemon.go
Expand Up @@ -34,12 +34,14 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubectl/pkg/drain"

// "k8s.io/client-go/kubernetes/scheme"
sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
mcclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned"
mcfginformers "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions"
)

const (
Expand Down Expand Up @@ -69,6 +71,8 @@ type Daemon struct {
// kubeClient allows interaction with Kubernetes, including the node we are running on.
kubeClient *kubernetes.Clientset

mcClient *mcclientset.Clientset

nodeState *sriovnetworkv1.SriovNetworkNodeState

LoadedPlugins map[string]VendorPlugin
Expand Down Expand Up @@ -98,17 +102,20 @@ type Daemon struct {
nodeLister listerv1.NodeLister

workqueue workqueue.RateLimitingInterface

mcpName string
}

type workItem struct {
old, new *sriovnetworkv1.SriovNetworkNodeState
}

const (
scriptsPath = "/bindata/scripts/enable-rdma.sh"
annoKey = "sriovnetwork.openshift.io/state"
annoIdle = "Idle"
annoDraining = "Draining"
scriptsPath = "/bindata/scripts/enable-rdma.sh"
annoKey = "sriovnetwork.openshift.io/state"
annoIdle = "Idle"
annoDraining = "Draining"
annoMcpPaused = "Draining_MCP_Paused"
)

var namespace = os.Getenv("NAMESPACE")
Expand All @@ -129,6 +136,7 @@ func New(
nodeName string,
client snclientset.Interface,
kubeClient *kubernetes.Clientset,
mcClient *mcclientset.Clientset,
exitCh chan<- error,
stopCh <-chan struct{},
syncCh <-chan struct{},
Expand All @@ -140,11 +148,11 @@ func New(
platform: platformType,
client: client,
kubeClient: kubeClient,
mcClient: mcClient,
exitCh: exitCh,
stopCh: stopCh,
syncCh: syncCh,
refreshCh: refreshCh,
drainable: true,
nodeState: &sriovnetworkv1.SriovNetworkNodeState{},
drainer: &drain.Helper{
Client: kubeClient,
Expand Down Expand Up @@ -398,13 +406,11 @@ func (dn *Daemon) nodeUpdateHandler(old, new interface{}) {
return
}
for _, node := range nodes {
if node.GetName() != dn.name && node.Annotations[annoKey] == annoDraining {
glog.V(2).Infof("nodeUpdateHandler(): node %s is draining", node.Name)
if node.GetName() != dn.name && (node.Annotations[annoKey] == annoDraining || node.Annotations[annoKey] == annoMcpPaused) {
dn.drainable = false
return
}
}
glog.V(2).Infof("nodeUpdateHandler(): no other node is draining")
dn.drainable = true
}

Expand Down Expand Up @@ -507,6 +513,10 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}
return nil
}
if err = dn.getNodeMachinePool(); err != nil {
return err
}

if reqDrain && !dn.disableDrain {
glog.Info("nodeStateSyncHandler(): drain node")
if err := dn.drainNode(dn.name); err != nil {
Expand Down Expand Up @@ -538,7 +548,7 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}
}

if anno, ok := dn.node.Annotations[annoKey]; ok && anno == annoDraining {
if anno, ok := dn.node.Annotations[annoKey]; ok && (anno == annoDraining || anno == annoMcpPaused) {
if err := dn.completeDrain(); err != nil {
glog.Errorf("nodeStateSyncHandler(): failed to complete draining: %v", err)
return err
Expand All @@ -565,8 +575,17 @@ func (dn *Daemon) completeDrain() error {
return err
}

if utils.ClusterType == utils.ClusterTypeOpenshift {
glog.Infof("completeDrain(): resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
if _, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}); err != nil {
glog.Errorf("completeDrain(): failed to resume MCP %s: %v", dn.mcpName, err)
return err
}
}

if err := dn.annotateNode(dn.name, annoIdle); err != nil {
glog.Errorf("drainNode(): failed to annotate node: %v", err)
glog.Errorf("completeDrain(): failed to annotate node: %v", err)
return err
}
return nil
Expand Down Expand Up @@ -730,21 +749,127 @@ func (dn *Daemon) annotateNode(node, value string) error {
return nil
}

func (dn *Daemon) getNodeMachinePool() error {
mcpList, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().List(context.TODO(), metav1.ListOptions{})
if err != nil {
glog.Errorf("getNodeMachinePool(): Failed to list Machine Config Pools: %v", err)
return err
}
var mcp mcfgv1.MachineConfigPool
for _, mcp = range mcpList.Items {
selector, err := metav1.LabelSelectorAsSelector(mcp.Spec.NodeSelector)
if err != nil {
glog.Errorf("getNodeMachinePool(): Machine Config Pool %s invalid label selector: %v", mcp.GetName(), err)
return err
}

if selector.Matches(labels.Set(dn.node.Labels)) {
dn.mcpName = mcp.GetName()
glog.Infof("getNodeMachinePool(): find node in MCP %s", dn.mcpName)
return nil
}
}
return fmt.Errorf("getNodeMachinePool(): Failed to find the MCP of the node")
}

func (dn *Daemon) drainNode(name string) error {
glog.Info("drainNode(): Update prepared")
var err error

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
// wait a random time to avoid all the nodes drain at the same time
wait.PollUntil(time.Duration(rand.Intn(15)+1)*time.Second, func() (bool, error) {
time.Sleep(wait.Jitter(3*time.Second, 3))
wait.JitterUntil(func() {
if !dn.drainable {
glog.Info("drainNode(): other node is draining, waiting...")
glog.V(2).Info("drainNode(): other node is draining")
return
}
glog.V(2).Info("drainNode(): no other node is draining")
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.Errorf("drainNode(): Failed to annotate node: %v", err)
return
}
cancel()
}, 3*time.Second, 3, true, ctx.Done())

if utils.ClusterType == utils.ClusterTypeOpenshift {
mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient,
time.Second*30,
)
mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
paused := dn.node.Annotations[annoKey] == annoMcpPaused

mcpEventHandler := func(obj interface{}) {
mcp := obj.(*mcfgv1.MachineConfigPool)
if mcp.GetName() != dn.mcpName {
return
}
// Always get the latest object
newMcp, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to get MCP %s: %v", dn.mcpName, err)
return
}
if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) &&
mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) &&
mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) {
glog.V(2).Infof("drainNode(): MCP %s is ready", dn.mcpName)
if paused {
glog.V(2).Info("drainNode(): stop MCP informer", dn.mcpName)
cancel()
return
}
if newMcp.Spec.Paused {
glog.V(2).Infof("drainNode(): MCP %s was paused by other, wait...", dn.mcpName)
return
}
glog.Infof("drainNode(): pause MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":true}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to pause MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoMcpPaused)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = true
return
}
if paused {
glog.Infof("drainNode(): MCP is processing, resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): fail to resume MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = false
}
glog.Infof("drainNode():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions)
}
return dn.drainable, nil
}, dn.stopCh)

err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.Errorf("drainNode(): Failed to annotate node: %v", err)
return err
mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: mcpEventHandler,
UpdateFunc: func(old, new interface{}) {
mcpEventHandler(new)
},
})
mcpInformerFactory.Start(ctx.Done())
mcpInformerFactory.WaitForCacheSync(ctx.Done())
<-ctx.Done()
}

backoff := wait.Backoff{
Expand Down
20 changes: 11 additions & 9 deletions pkg/plugins/generic/generic_plugin.go
Expand Up @@ -190,15 +190,17 @@ func needDrainNode(desired sriovnetworkv1.Interfaces, current sriovnetworkv1.Int
// ignore swichdev device
break
}
if iface.NumVfs != ifaceStatus.NumVfs {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect NumVfs %v, current NumVfs %v", iface.NumVfs, ifaceStatus.NumVfs)
needDrain = true
return
}
if iface.Mtu != 0 && iface.Mtu != ifaceStatus.Mtu {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect MTU %v, current MTU %v", iface.Mtu, ifaceStatus.Mtu)
needDrain = true
return
if ifaceStatus.NumVfs != 0 {
if iface.NumVfs != ifaceStatus.NumVfs {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect NumVfs %v, current NumVfs %v", iface.NumVfs, ifaceStatus.NumVfs)
needDrain = true
return
}
if iface.Mtu != 0 && iface.Mtu != ifaceStatus.Mtu {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect MTU %v, current MTU %v", iface.Mtu, ifaceStatus.Mtu)
needDrain = true
return
}
}
}
}
Expand Down

0 comments on commit 6ee0cd3

Please sign in to comment.