Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-4.7] Bug 1960103: Pause MCP before draining/rebooting node #504

Merged
merged 3 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ image: ; $(info Building image...)
ENVTEST_ASSETS_DIR=$(shell pwd)/testbin
test: generate vet manifests
mkdir -p ${ENVTEST_ASSETS_DIR}
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/master/hack/setup-envtest.sh
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh
source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); go test ./... -coverprofile cover.out -v

# Build manager binary
Expand Down Expand Up @@ -201,12 +201,15 @@ test-e2e-conformance:

test-e2e: generate vet manifests skopeo
mkdir -p ${ENVTEST_ASSETS_DIR}
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/master/hack/setup-envtest.sh
source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); source hack/env.sh; go test ./test/e2e/... -coverprofile cover.out -v
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh
source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); source hack/env.sh; go test ./test/e2e/... -timeout 60m -coverprofile cover.out -v

test-e2e-k8s: export NAMESPACE=sriov-network-operator
test-e2e-k8s: test-e2e

test-%: generate vet manifests
mkdir -p ${ENVTEST_ASSETS_DIR}
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/master/hack/setup-envtest.sh
test -f ${ENVTEST_ASSETS_DIR}/setup-envtest.sh || curl -sSLo ${ENVTEST_ASSETS_DIR}/setup-envtest.sh https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/v0.7.2/hack/setup-envtest.sh
source ${ENVTEST_ASSETS_DIR}/setup-envtest.sh; fetch_envtest_tools $(ENVTEST_ASSETS_DIR); setup_envtest_env $(ENVTEST_ASSETS_DIR); go test ./$*/... -coverprofile cover.out -v

# deploy-setup-k8s: export NAMESPACE=sriov-network-operator
Expand Down
6 changes: 6 additions & 0 deletions cmd/sriov-network-config-daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/connrotation"

mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
mcclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned"
)

var (
Expand Down Expand Up @@ -124,9 +127,11 @@ func runStartCmd(cmd *cobra.Command, args []string) {
}

sriovnetworkv1.AddToScheme(scheme.Scheme)
mcfgv1.AddToScheme(scheme.Scheme)

snclient := snclientset.NewForConfigOrDie(config)
kubeclient := kubernetes.NewForConfigOrDie(config)
mcclient := mcclientset.NewForConfigOrDie(config)

config.Timeout = 5 * time.Second
writerclient := snclientset.NewForConfigOrDie(config)
Expand Down Expand Up @@ -161,6 +166,7 @@ func runStartCmd(cmd *cobra.Command, args []string) {
startOpts.nodeName,
snclient,
kubeclient,
mcclient,
exitCh,
stopCh,
syncCh,
Expand Down
3 changes: 3 additions & 0 deletions deploy/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ rules:
- apiGroups: [""]
resources: ["pods/eviction"]
verbs: ["create"]
- apiGroups: ["machineconfiguration.openshift.io"]
resources: ["*"]
verbs: ["*"]
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ spec:
- apiGroups: [""]
resources: ["pods/eviction"]
verbs: ["create"]
- apiGroups: ["machineconfiguration.openshift.io"]
resources: ["*"]
verbs: ["*"]
serviceAccountName: sriov-network-config-daemon
permissions:
- rules:
Expand Down
163 changes: 144 additions & 19 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubectl/pkg/drain"

// "k8s.io/client-go/kubernetes/scheme"
sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
mcclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned"
mcfginformers "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions"
)

const (
Expand Down Expand Up @@ -69,6 +71,8 @@ type Daemon struct {
// kubeClient allows interaction with Kubernetes, including the node we are running on.
kubeClient *kubernetes.Clientset

mcClient *mcclientset.Clientset

nodeState *sriovnetworkv1.SriovNetworkNodeState

LoadedPlugins map[string]VendorPlugin
Expand Down Expand Up @@ -98,17 +102,20 @@ type Daemon struct {
nodeLister listerv1.NodeLister

workqueue workqueue.RateLimitingInterface

mcpName string
}

type workItem struct {
old, new *sriovnetworkv1.SriovNetworkNodeState
}

const (
scriptsPath = "/bindata/scripts/enable-rdma.sh"
annoKey = "sriovnetwork.openshift.io/state"
annoIdle = "Idle"
annoDraining = "Draining"
scriptsPath = "/bindata/scripts/enable-rdma.sh"
annoKey = "sriovnetwork.openshift.io/state"
annoIdle = "Idle"
annoDraining = "Draining"
annoMcpPaused = "Draining_MCP_Paused"
)

var namespace = os.Getenv("NAMESPACE")
Expand All @@ -129,6 +136,7 @@ func New(
nodeName string,
client snclientset.Interface,
kubeClient *kubernetes.Clientset,
mcClient *mcclientset.Clientset,
exitCh chan<- error,
stopCh <-chan struct{},
syncCh <-chan struct{},
Expand All @@ -140,11 +148,11 @@ func New(
platform: platformType,
client: client,
kubeClient: kubeClient,
mcClient: mcClient,
exitCh: exitCh,
stopCh: stopCh,
syncCh: syncCh,
refreshCh: refreshCh,
drainable: true,
nodeState: &sriovnetworkv1.SriovNetworkNodeState{},
drainer: &drain.Helper{
Client: kubeClient,
Expand Down Expand Up @@ -398,13 +406,11 @@ func (dn *Daemon) nodeUpdateHandler(old, new interface{}) {
return
}
for _, node := range nodes {
if node.GetName() != dn.name && node.Annotations[annoKey] == annoDraining {
glog.V(2).Infof("nodeUpdateHandler(): node %s is draining", node.Name)
if node.GetName() != dn.name && (node.Annotations[annoKey] == annoDraining || node.Annotations[annoKey] == annoMcpPaused) {
dn.drainable = false
return
}
}
glog.V(2).Infof("nodeUpdateHandler(): no other node is draining")
dn.drainable = true
}

Expand Down Expand Up @@ -507,6 +513,10 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}
return nil
}
if err = dn.getNodeMachinePool(); err != nil {
return err
}

if reqDrain && !dn.disableDrain {
glog.Info("nodeStateSyncHandler(): drain node")
if err := dn.drainNode(dn.name); err != nil {
Expand Down Expand Up @@ -538,7 +548,7 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}
}

if anno, ok := dn.node.Annotations[annoKey]; ok && anno == annoDraining {
if anno, ok := dn.node.Annotations[annoKey]; ok && (anno == annoDraining || anno == annoMcpPaused) {
if err := dn.completeDrain(); err != nil {
glog.Errorf("nodeStateSyncHandler(): failed to complete draining: %v", err)
return err
Expand All @@ -565,8 +575,17 @@ func (dn *Daemon) completeDrain() error {
return err
}

if utils.ClusterType == utils.ClusterTypeOpenshift {
glog.Infof("completeDrain(): resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
if _, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}); err != nil {
glog.Errorf("completeDrain(): failed to resume MCP %s: %v", dn.mcpName, err)
return err
}
}

if err := dn.annotateNode(dn.name, annoIdle); err != nil {
glog.Errorf("drainNode(): failed to annotate node: %v", err)
glog.Errorf("completeDrain(): failed to annotate node: %v", err)
return err
}
return nil
Expand Down Expand Up @@ -730,21 +749,127 @@ func (dn *Daemon) annotateNode(node, value string) error {
return nil
}

func (dn *Daemon) getNodeMachinePool() error {
mcpList, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().List(context.TODO(), metav1.ListOptions{})
if err != nil {
glog.Errorf("getNodeMachinePool(): Failed to list Machine Config Pools: %v", err)
return err
}
var mcp mcfgv1.MachineConfigPool
for _, mcp = range mcpList.Items {
selector, err := metav1.LabelSelectorAsSelector(mcp.Spec.NodeSelector)
if err != nil {
glog.Errorf("getNodeMachinePool(): Machine Config Pool %s invalid label selector: %v", mcp.GetName(), err)
return err
}

if selector.Matches(labels.Set(dn.node.Labels)) {
dn.mcpName = mcp.GetName()
glog.Infof("getNodeMachinePool(): find node in MCP %s", dn.mcpName)
return nil
}
}
return fmt.Errorf("getNodeMachinePool(): Failed to find the MCP of the node")
}

func (dn *Daemon) drainNode(name string) error {
glog.Info("drainNode(): Update prepared")
var err error

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
// wait a random time to avoid all the nodes drain at the same time
wait.PollUntil(time.Duration(rand.Intn(15)+1)*time.Second, func() (bool, error) {
time.Sleep(wait.Jitter(3*time.Second, 3))
wait.JitterUntil(func() {
if !dn.drainable {
glog.Info("drainNode(): other node is draining, waiting...")
glog.V(2).Info("drainNode(): other node is draining")
return
}
glog.V(2).Info("drainNode(): no other node is draining")
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.Errorf("drainNode(): Failed to annotate node: %v", err)
return
}
cancel()
}, 3*time.Second, 3, true, ctx.Done())

if utils.ClusterType == utils.ClusterTypeOpenshift {
mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient,
time.Second*30,
)
mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
paused := dn.node.Annotations[annoKey] == annoMcpPaused

mcpEventHandler := func(obj interface{}) {
mcp := obj.(*mcfgv1.MachineConfigPool)
if mcp.GetName() != dn.mcpName {
return
}
// Always get the latest object
newMcp, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to get MCP %s: %v", dn.mcpName, err)
return
}
if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) &&
mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) &&
mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) {
glog.V(2).Infof("drainNode(): MCP %s is ready", dn.mcpName)
if paused {
glog.V(2).Info("drainNode(): stop MCP informer", dn.mcpName)
cancel()
return
}
if newMcp.Spec.Paused {
glog.V(2).Infof("drainNode(): MCP %s was paused by other, wait...", dn.mcpName)
return
}
glog.Infof("drainNode(): pause MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":true}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to pause MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoMcpPaused)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = true
return
}
if paused {
glog.Infof("drainNode(): MCP is processing, resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): fail to resume MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = false
}
glog.Infof("drainNode():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions)
}
return dn.drainable, nil
}, dn.stopCh)

err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.Errorf("drainNode(): Failed to annotate node: %v", err)
return err
mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: mcpEventHandler,
UpdateFunc: func(old, new interface{}) {
mcpEventHandler(new)
},
})
mcpInformerFactory.Start(ctx.Done())
mcpInformerFactory.WaitForCacheSync(ctx.Done())
<-ctx.Done()
}

backoff := wait.Backoff{
Expand Down
20 changes: 11 additions & 9 deletions pkg/plugins/generic/generic_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,17 @@ func needDrainNode(desired sriovnetworkv1.Interfaces, current sriovnetworkv1.Int
// ignore swichdev device
break
}
if iface.NumVfs != ifaceStatus.NumVfs {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect NumVfs %v, current NumVfs %v", iface.NumVfs, ifaceStatus.NumVfs)
needDrain = true
return
}
if iface.Mtu != 0 && iface.Mtu != ifaceStatus.Mtu {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect MTU %v, current MTU %v", iface.Mtu, ifaceStatus.Mtu)
needDrain = true
return
if ifaceStatus.NumVfs != 0 {
if iface.NumVfs != ifaceStatus.NumVfs {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect NumVfs %v, current NumVfs %v", iface.NumVfs, ifaceStatus.NumVfs)
needDrain = true
return
}
if iface.Mtu != 0 && iface.Mtu != ifaceStatus.Mtu {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect MTU %v, current MTU %v", iface.Mtu, ifaceStatus.Mtu)
needDrain = true
return
}
}
}
}
Expand Down