Skip to content

Commit

Permalink
cordon and drain azuremachinepoolmachines prior to delete
Browse files Browse the repository at this point in the history
  • Loading branch information
devigned committed Jun 16, 2021
1 parent 43b8486 commit 7ba7d48
Show file tree
Hide file tree
Showing 13 changed files with 574 additions and 27 deletions.
170 changes: 170 additions & 0 deletions azure/scope/machinepoolmachine.go
Expand Up @@ -18,18 +18,26 @@ package scope

import (
"context"
"fmt"
"reflect"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
"k8s.io/utils/pointer"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
"sigs.k8s.io/cluster-api/controllers/remote"
capierrors "sigs.k8s.io/cluster-api/errors"
capiv1exp "sigs.k8s.io/cluster-api/exp/api/v1alpha4"
drain "sigs.k8s.io/cluster-api/third_party/kubernetes-drain"
"sigs.k8s.io/cluster-api/util/conditions"
utilkubeconfig "sigs.k8s.io/cluster-api/util/kubeconfig"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -40,6 +48,11 @@ import (
"sigs.k8s.io/cluster-api-provider-azure/util/tele"
)

const (
// MachinePoolMachineScopeName is the sourceName, or more specifically the UserAgent, of client used in cordon and drain
MachinePoolMachineScopeName = "azuremachinepoolmachine-scope"
)

type (
nodeGetter interface {
GetNodeByProviderID(ctx context.Context, providerID string) (*corev1.Node, error)
Expand Down Expand Up @@ -261,6 +274,152 @@ func (s *MachinePoolMachineScope) UpdateStatus(ctx context.Context) error {
return nil
}

// CordonAndDrain will cordon and drain the Kubernetes node associated with this AzureMachinePoolMachine
func (s *MachinePoolMachineScope) CordonAndDrain(ctx context.Context) error {
ctx, span := tele.Tracer().Start(ctx, "scope.MachinePoolMachineScope.CordonAndDrain")
defer span.End()

var (
nodeRef = s.AzureMachinePoolMachine.Status.NodeRef
node *corev1.Node
err error
)
if nodeRef == nil || nodeRef.Name == "" {
node, err = s.workloadNodeGetter.GetNodeByProviderID(ctx, s.ProviderID())
} else {
node, err = s.workloadNodeGetter.GetNodeByObjectReference(ctx, *nodeRef)
}

if err != nil && apierrors.IsNotFound(err) {
return nil // node was already gone, so no need to cordon and drain
} else if err != nil {
return errors.Wrap(err, "failed to find node")
}

// Drain node before deletion and issue a patch in order to make this operation visible to the users.
if s.isNodeDrainAllowed() {
patchHelper, err := patch.NewHelper(s.AzureMachinePoolMachine, s.client)
if err != nil {
return errors.Wrap(err, "failed to build a patchHelper when draining node")
}

s.V(4).Info("Draining node", "node", s.AzureMachinePoolMachine.Status.NodeRef.Name)
// The DrainingSucceededCondition never exists before the node is drained for the first time,
// so its transition time can be used to record the first time draining.
// This `if` condition prevents the transition time to be changed more than once.
if conditions.Get(s.AzureMachinePoolMachine, clusterv1.DrainingSucceededCondition) == nil {
conditions.MarkFalse(s.AzureMachinePoolMachine, clusterv1.DrainingSucceededCondition, clusterv1.DrainingReason, clusterv1.ConditionSeverityInfo, "Draining the node before deletion")
}

if err := patchHelper.Patch(ctx, s.AzureMachinePoolMachine); err != nil {
return errors.Wrap(err, "failed to patch AzureMachinePoolMachine")
}

if err := s.drainNode(ctx, node); err != nil {
conditions.MarkFalse(s.AzureMachinePoolMachine, clusterv1.DrainingSucceededCondition, clusterv1.DrainingFailedReason, clusterv1.ConditionSeverityWarning, err.Error())
return err
}

conditions.MarkTrue(s.AzureMachinePoolMachine, clusterv1.DrainingSucceededCondition)
}

return nil
}

func (s *MachinePoolMachineScope) drainNode(ctx context.Context, node *corev1.Node) error {
ctx, span := tele.Tracer().Start(ctx, "scope.MachinePoolMachineScope.drainNode")
defer span.End()

restConfig, err := remote.RESTConfig(ctx, MachinePoolMachineScopeName, s.client, client.ObjectKey{
Name: s.ClusterName(),
Namespace: s.AzureMachinePoolMachine.Namespace,
})

if err != nil {
s.Error(err, "Error creating a remote client while deleting Machine, won't retry")
return nil
}

kubeClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
s.Error(err, "Error creating a remote client while deleting Machine, won't retry")
return nil
}

drainer := &drain.Helper{
Client: kubeClient,
Force: true,
IgnoreAllDaemonSets: true,
DeleteLocalData: true,
GracePeriodSeconds: -1,
// If a pod is not evicted in 20 seconds, retry the eviction next time the
// machine gets reconciled again (to allow other machines to be reconciled).
Timeout: 20 * time.Second,
OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) {
verbStr := "Deleted"
if usingEviction {
verbStr = "Evicted"
}
s.V(4).Info(fmt.Sprintf("%s pod from Node", verbStr),
"pod", fmt.Sprintf("%s/%s", pod.Name, pod.Namespace))
},
Out: writer{klog.Info},
ErrOut: writer{klog.Error},
DryRun: false,
}

if noderefutil.IsNodeUnreachable(node) {
// When the node is unreachable and some pods are not evicted for as long as this timeout, we ignore them.
drainer.SkipWaitForDeleteTimeoutSeconds = 60 * 5 // 5 minutes
}

if err := drain.RunCordonOrUncordon(ctx, drainer, node, true); err != nil {
// Machine will be re-reconciled after a cordon failure.
return azure.WithTransientError(errors.Errorf("unable to cordon node %s: %v", node.Name, err), 20*time.Second)
}

if err := drain.RunNodeDrain(ctx, drainer, node.Name); err != nil {
// Machine will be re-reconciled after a drain failure.
return azure.WithTransientError(errors.Wrap(err, "Drain failed, retry in 20s"), 20*time.Second)
}

s.V(4).Info("Drain successful")
return nil
}

// isNodeDrainAllowed checks to see the node is excluded from draining or if the NodeDrainTimeout has expired
func (s *MachinePoolMachineScope) isNodeDrainAllowed() bool {
if _, exists := s.AzureMachinePoolMachine.ObjectMeta.Annotations[clusterv1.ExcludeNodeDrainingAnnotation]; exists {
return false
}

if s.nodeDrainTimeoutExceeded() {
return false
}

return true
}

// nodeDrainTimeoutExceeded will check to see if the AzureMachinePool's NodeDrainTimeout is exceeded for the
// AzureMachinePoolMachine
func (s *MachinePoolMachineScope) nodeDrainTimeoutExceeded() bool {
// if the NodeDrainTineout type is not set by user
pool := s.AzureMachinePool
if pool == nil || pool.Spec.NodeDrainTimeout == nil || pool.Spec.NodeDrainTimeout.Seconds() <= 0 {
return false
}

// if the draining succeeded condition does not exist
if conditions.Get(s.AzureMachinePoolMachine, clusterv1.DrainingSucceededCondition) == nil {
return false
}

now := time.Now()
firstTimeDrain := conditions.GetLastTransitionTime(s.AzureMachinePoolMachine, clusterv1.DrainingSucceededCondition)
diff := now.Sub(firstTimeDrain.Time)
return diff.Seconds() >= s.AzureMachinePool.Spec.NodeDrainTimeout.Seconds()
}

func (s *MachinePoolMachineScope) hasLatestModelApplied() (bool, error) {
if s.instance == nil {
return false, errors.New("instance must not be nil")
Expand Down Expand Up @@ -365,3 +524,14 @@ func getWorkloadClient(ctx context.Context, c client.Client, cluster client.Obje

return client.New(restConfig, client.Options{})
}

// writer implements io.Writer interface as a pass-through for klog.
type writer struct {
logFunc func(args ...interface{})
}

// Write passes string(p) into writer's logFunc and always returns len(p).
func (w writer) Write(p []byte) (n int, err error) {
w.logFunc(string(p))
return len(p), nil
}
Expand Up @@ -462,6 +462,9 @@ spec:
location:
description: Location is the Azure region location e.g. westus2
type: string
nodeDrainTimeout:
description: 'NodeDrainTimeout is the total amount of time that the controller will spend on draining a node. The default value is 0, meaning that the node can be drained without any time limitations. NOTE: NodeDrainTimeout is different from `kubectl drain --timeout`'
type: string
providerID:
description: ProviderID is the identification ID of the Virtual Machine Scale Set
type: string
Expand Down
4 changes: 4 additions & 0 deletions exp/api/v1alpha3/azuremachinepool_conversion.go
Expand Up @@ -56,6 +56,10 @@ func (src *AzureMachinePool) ConvertTo(dstRaw conversion.Hub) error { // nolint
dst.Spec.Strategy.RollingUpdate.DeletePolicy = restored.Spec.Strategy.RollingUpdate.DeletePolicy
}

if restored.Spec.NodeDrainTimeout != nil {
dst.Spec.NodeDrainTimeout = restored.Spec.NodeDrainTimeout
}

if restored.Status.Image != nil {
dst.Status.Image = restored.Status.Image
}
Expand Down
1 change: 1 addition & 0 deletions exp/api/v1alpha3/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions exp/api/v1alpha4/azuremachinepool_types.go
Expand Up @@ -134,6 +134,12 @@ type (
// +optional
// +kubebuilder:default={type: "RollingUpdate", rollingUpdate: {maxSurge: 1, maxUnavailable: 0, deletePolicy: Oldest}}
Strategy AzureMachinePoolDeploymentStrategy `json:"strategy,omitempty"`

// NodeDrainTimeout is the total amount of time that the controller will spend on draining a node.
// The default value is 0, meaning that the node can be drained without any time limitations.
// NOTE: NodeDrainTimeout is different from `kubectl drain --timeout`
// +optional
NodeDrainTimeout *metav1.Duration `json:"nodeDrainTimeout,omitempty"`
}

// AzureMachinePoolDeploymentStrategyType is the type of deployment strategy employed to rollout a new version of
Expand Down
10 changes: 8 additions & 2 deletions exp/api/v1alpha4/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions exp/controllers/azuremachinepoolmachine_controller.go
Expand Up @@ -377,8 +377,12 @@ func (r *azureMachinePoolMachineReconciler) Delete(ctx context.Context) error {
}
}()

err := r.scalesetVMsService.Delete(ctx)
if err != nil {
// cordon and drain stuff
if err := r.Scope.CordonAndDrain(ctx); err != nil {
return errors.Wrap(err, "failed to cordon and drain the scalesetVMs")
}

if err := r.scalesetVMsService.Delete(ctx); err != nil {
return errors.Wrap(err, "failed to reconcile scalesetVMs")
}

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/azure_lb.go
Expand Up @@ -78,7 +78,7 @@ func AzureLBSpec(ctx context.Context, inputGetter func() AzureLBSpecInput) {
deploymentName = "web-windows" + util.RandomString(6)
}

webDeployment := deploymentBuilder.CreateDeployment("httpd", deploymentName, corev1.NamespaceDefault)
webDeployment := deploymentBuilder.Create("httpd", deploymentName, corev1.NamespaceDefault)
webDeployment.AddContainerPort("http", "http", 80, corev1.ProtocolTCP)

if input.Windows {
Expand Down

0 comments on commit 7ba7d48

Please sign in to comment.