Skip to content

Commit

Permalink
Pause MCP before draining/rebooting node
Browse files Browse the repository at this point in the history
The config daemon instance on each node need to compete for the 'Draining' lock before
it can draining/rebooting a node.

After a config daemon instance gets the lock, it checks its MachineConfigPool conditions.
If the MCP is in ready state, it will pause the MCP then process, otherwise, it wait for
the MCP getting ready. The MCP will be resumed after the config daemon and release the
lock after the config dameon finishes its work.
  • Loading branch information
pliurh committed May 13, 2021
1 parent 199eef0 commit 3b6aeed
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 28 deletions.
6 changes: 6 additions & 0 deletions cmd/sriov-network-config-daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/connrotation"

mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
mcclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned"
)

var (
Expand Down Expand Up @@ -124,9 +127,11 @@ func runStartCmd(cmd *cobra.Command, args []string) {
}

sriovnetworkv1.AddToScheme(scheme.Scheme)
mcfgv1.AddToScheme(scheme.Scheme)

snclient := snclientset.NewForConfigOrDie(config)
kubeclient := kubernetes.NewForConfigOrDie(config)
mcclient := mcclientset.NewForConfigOrDie(config)

config.Timeout = 5 * time.Second
writerclient := snclientset.NewForConfigOrDie(config)
Expand Down Expand Up @@ -161,6 +166,7 @@ func runStartCmd(cmd *cobra.Command, args []string) {
startOpts.nodeName,
snclient,
kubeclient,
mcclient,
exitCh,
stopCh,
syncCh,
Expand Down
3 changes: 3 additions & 0 deletions deploy/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ rules:
- apiGroups: [""]
resources: ["pods/eviction"]
verbs: ["create"]
- apiGroups: ["machineconfiguration.openshift.io"]
resources: ["*"]
verbs: ["*"]
163 changes: 144 additions & 19 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubectl/pkg/drain"

// "k8s.io/client-go/kubernetes/scheme"
sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
sninformer "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/informers/externalversions"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
mcclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned"
mcfginformers "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions"
)

const (
Expand Down Expand Up @@ -69,6 +71,8 @@ type Daemon struct {
// kubeClient allows interaction with Kubernetes, including the node we are running on.
kubeClient *kubernetes.Clientset

mcClient *mcclientset.Clientset

nodeState *sriovnetworkv1.SriovNetworkNodeState

LoadedPlugins map[string]VendorPlugin
Expand Down Expand Up @@ -98,17 +102,20 @@ type Daemon struct {
nodeLister listerv1.NodeLister

workqueue workqueue.RateLimitingInterface

mcpName string
}

type workItem struct {
old, new *sriovnetworkv1.SriovNetworkNodeState
}

const (
scriptsPath = "/bindata/scripts/enable-rdma.sh"
annoKey = "sriovnetwork.openshift.io/state"
annoIdle = "Idle"
annoDraining = "Draining"
scriptsPath = "/bindata/scripts/enable-rdma.sh"
annoKey = "sriovnetwork.openshift.io/state"
annoIdle = "Idle"
annoDraining = "Draining"
annoMcpPaused = "Draining_MCP_Paused"
)

var namespace = os.Getenv("NAMESPACE")
Expand All @@ -129,6 +136,7 @@ func New(
nodeName string,
client snclientset.Interface,
kubeClient *kubernetes.Clientset,
mcClient *mcclientset.Clientset,
exitCh chan<- error,
stopCh <-chan struct{},
syncCh <-chan struct{},
Expand All @@ -140,11 +148,11 @@ func New(
platform: platformType,
client: client,
kubeClient: kubeClient,
mcClient: mcClient,
exitCh: exitCh,
stopCh: stopCh,
syncCh: syncCh,
refreshCh: refreshCh,
drainable: true,
nodeState: &sriovnetworkv1.SriovNetworkNodeState{},
drainer: &drain.Helper{
Client: kubeClient,
Expand Down Expand Up @@ -398,13 +406,11 @@ func (dn *Daemon) nodeUpdateHandler(old, new interface{}) {
return
}
for _, node := range nodes {
if node.GetName() != dn.name && node.Annotations[annoKey] == annoDraining {
glog.V(2).Infof("nodeUpdateHandler(): node %s is draining", node.Name)
if node.GetName() != dn.name && (node.Annotations[annoKey] == annoDraining || node.Annotations[annoKey] == annoMcpPaused) {
dn.drainable = false
return
}
}
glog.V(2).Infof("nodeUpdateHandler(): no other node is draining")
dn.drainable = true
}

Expand Down Expand Up @@ -507,6 +513,10 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}
return nil
}
if err = dn.getNodeMachinePool(); err != nil {
return err
}

if reqDrain && !dn.disableDrain {
glog.Info("nodeStateSyncHandler(): drain node")
if err := dn.drainNode(dn.name); err != nil {
Expand Down Expand Up @@ -538,7 +548,7 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}
}

if anno, ok := dn.node.Annotations[annoKey]; ok && anno == annoDraining {
if anno, ok := dn.node.Annotations[annoKey]; ok && (anno == annoDraining || anno == annoMcpPaused) {
if err := dn.completeDrain(); err != nil {
glog.Errorf("nodeStateSyncHandler(): failed to complete draining: %v", err)
return err
Expand All @@ -565,8 +575,17 @@ func (dn *Daemon) completeDrain() error {
return err
}

if utils.ClusterType == utils.ClusterTypeOpenshift {
glog.Infof("completeDrain(): resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
if _, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}); err != nil {
glog.Errorf("completeDrain(): failed to resume MCP %s: %v", dn.mcpName, err)
return err
}
}

if err := dn.annotateNode(dn.name, annoIdle); err != nil {
glog.Errorf("drainNode(): failed to annotate node: %v", err)
glog.Errorf("completeDrain(): failed to annotate node: %v", err)
return err
}
return nil
Expand Down Expand Up @@ -730,21 +749,127 @@ func (dn *Daemon) annotateNode(node, value string) error {
return nil
}

func (dn *Daemon) getNodeMachinePool() error {
mcpList, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().List(context.TODO(), metav1.ListOptions{})
if err != nil {
glog.Errorf("getNodeMachinePool(): Failed to list Machine Config Pools: %v", err)
return err
}
var mcp mcfgv1.MachineConfigPool
for _, mcp = range mcpList.Items {
selector, err := metav1.LabelSelectorAsSelector(mcp.Spec.NodeSelector)
if err != nil {
glog.Errorf("getNodeMachinePool(): Machine Config Pool %s invalid label selector: %v", mcp.GetName(), err)
return err
}

if selector.Matches(labels.Set(dn.node.Labels)) {
dn.mcpName = mcp.GetName()
glog.Infof("getNodeMachinePool(): find node in MCP %s", dn.mcpName)
return nil
}
}
return fmt.Errorf("getNodeMachinePool(): Failed to find the MCP of the node")
}

func (dn *Daemon) drainNode(name string) error {
glog.Info("drainNode(): Update prepared")
var err error

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
// wait a random time to avoid all the nodes drain at the same time
wait.PollUntil(time.Duration(rand.Intn(15)+1)*time.Second, func() (bool, error) {
time.Sleep(wait.Jitter(3*time.Second, 3))
wait.JitterUntil(func() {
if !dn.drainable {
glog.Info("drainNode(): other node is draining, waiting...")
glog.V(2).Info("drainNode(): other node is draining")
return
}
glog.V(2).Info("drainNode(): no other node is draining")
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.Errorf("drainNode(): Failed to annotate node: %v", err)
return
}
cancel()
}, 3*time.Second, 3, true, ctx.Done())

if utils.ClusterType == utils.ClusterTypeOpenshift {
mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient,
time.Second*30,
)
mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
paused := dn.node.Annotations[annoKey] == annoMcpPaused

mcpEventHandler := func(obj interface{}) {
mcp := obj.(*mcfgv1.MachineConfigPool)
if mcp.GetName() != dn.mcpName {
return
}
// Always get the latest object
newMcp, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to get MCP %s: %v", dn.mcpName, err)
return
}
if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) &&
mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) &&
mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) {
glog.V(2).Infof("drainNode(): MCP %s is ready", dn.mcpName)
if paused {
glog.V(2).Info("drainNode(): stop MCP informer", dn.mcpName)
cancel()
return
}
if newMcp.Spec.Paused {
glog.V(2).Infof("drainNode(): MCP %s was paused by other, wait...", dn.mcpName)
return
}
glog.Infof("drainNode(): pause MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":true}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to pause MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoMcpPaused)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = true
return
}
if paused {
glog.Infof("drainNode(): MCP is processing, resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): fail to resume MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = false
}
glog.Infof("drainNode():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions)
}
return dn.drainable, nil
}, dn.stopCh)

err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.Errorf("drainNode(): Failed to annotate node: %v", err)
return err
mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: mcpEventHandler,
UpdateFunc: func(old, new interface{}) {
mcpEventHandler(new)
},
})
mcpInformerFactory.Start(ctx.Done())
mcpInformerFactory.WaitForCacheSync(ctx.Done())
<-ctx.Done()
}

backoff := wait.Backoff{
Expand Down
20 changes: 11 additions & 9 deletions pkg/plugins/generic/generic_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,17 @@ func needDrainNode(desired sriovnetworkv1.Interfaces, current sriovnetworkv1.Int
// ignore swichdev device
break
}
if iface.NumVfs != ifaceStatus.NumVfs {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect NumVfs %v, current NumVfs %v", iface.NumVfs, ifaceStatus.NumVfs)
needDrain = true
return
}
if iface.Mtu != 0 && iface.Mtu != ifaceStatus.Mtu {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect MTU %v, current MTU %v", iface.Mtu, ifaceStatus.Mtu)
needDrain = true
return
if ifaceStatus.NumVfs != 0 {
if iface.NumVfs != ifaceStatus.NumVfs {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect NumVfs %v, current NumVfs %v", iface.NumVfs, ifaceStatus.NumVfs)
needDrain = true
return
}
if iface.Mtu != 0 && iface.Mtu != ifaceStatus.Mtu {
glog.V(2).Infof("generic-plugin needDrainNode(): need drain, expect MTU %v, current MTU %v", iface.Mtu, ifaceStatus.Mtu)
needDrain = true
return
}
}
}
}
Expand Down

0 comments on commit 3b6aeed

Please sign in to comment.