Skip to content

Commit

Permalink
fix: improve the drain function
Browse files Browse the repository at this point in the history
Critical bug (I believe) was that drain code entered the loop to evict
the pod after wait for pod to be deleted returned success effectively
evicting pod once again once it got rescheduled to a different node.

Add a global timeout to prevent draining code from running forever.

Filter more pod types which shouldn't be ever drained.

Fixes #3124

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira authored and talos-bot committed Feb 25, 2021
1 parent f24c815 commit 779ac74
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 37 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/talos-systems/go-cmd v0.0.0-20210216164758-68eb0067e0f0
github.com/talos-systems/go-loadbalancer v0.1.0
github.com/talos-systems/go-procfs v0.0.0-20210108152626-8cbc42d3dc24
github.com/talos-systems/go-retry v0.2.0
github.com/talos-systems/go-retry v0.2.1-0.20210119124456-b9dc1a990133
github.com/talos-systems/go-smbios v0.0.0-20200807005123-80196199691e
github.com/talos-systems/grpc-proxy v0.2.0
github.com/talos-systems/net v0.2.1-0.20210204205549-52c750994376
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,8 @@ github.com/talos-systems/go-retry v0.1.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lI
github.com/talos-systems/go-retry v0.1.1-0.20201113203059-8c63d290a688/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM=
github.com/talos-systems/go-retry v0.2.0 h1:YpQHmtTZ2k0i/bBYRIasdVmF0XaiISVJUOrmZ6FzgLU=
github.com/talos-systems/go-retry v0.2.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM=
github.com/talos-systems/go-retry v0.2.1-0.20210119124456-b9dc1a990133 h1:mHnKEViee9x2A6YbsUykwqh7L+tLpm5HTlos2QDlqts=
github.com/talos-systems/go-retry v0.2.1-0.20210119124456-b9dc1a990133/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM=
github.com/talos-systems/go-smbios v0.0.0-20200807005123-80196199691e h1:uCp8BfH4Ky2R1XkOKA5pSZpeMMyt0AbH29PIrkoBlaM=
github.com/talos-systems/go-smbios v0.0.0-20200807005123-80196199691e/go.mod h1:HxhrzAoTZ7ed5Z5VvtCvnCIrOxyXDS7V2B5hCetAMW8=
github.com/talos-systems/grpc-proxy v0.2.0 h1:DN75bLfaW4xfhq0r0mwFRnfGhSB+HPhK1LNzuMEs9Pw=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ func CordonAndDrainNode(seq runtime.Sequence, data interface{}) (runtime.TaskExe
return err
}

if err = kubeHelper.CordonAndDrain(nodename); err != nil {
if err = kubeHelper.CordonAndDrain(ctx, nodename); err != nil {
return err
}

Expand Down
89 changes: 54 additions & 35 deletions pkg/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
"fmt"
"log"
"net/url"
"sync"
"time"

"github.com/talos-systems/crypto/x509"
"github.com/talos-systems/go-retry/retry"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -31,6 +32,11 @@ import (
"github.com/talos-systems/talos/pkg/machinery/constants"
)

const (
// DrainTimeout is maximum time to wait for the node to be drained.
DrainTimeout = 5 * time.Minute
)

// Client represents a set of helper methods for interacting with the
// Kubernetes API.
type Client struct {
Expand Down Expand Up @@ -279,18 +285,18 @@ func (h *Client) WaitUntilReady(name string) error {
}

// CordonAndDrain cordons and drains a node in one call.
func (h *Client) CordonAndDrain(node string) (err error) {
if err = h.Cordon(node); err != nil {
func (h *Client) CordonAndDrain(ctx context.Context, node string) (err error) {
if err = h.Cordon(ctx, node); err != nil {
return err
}

return h.Drain(node)
return h.Drain(ctx, node)
}

// Cordon marks a node as unschedulable.
func (h *Client) Cordon(name string) error {
err := retry.Exponential(30*time.Second, retry.WithUnits(250*time.Millisecond), retry.WithJitter(50*time.Millisecond)).Retry(func() error {
node, err := h.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
func (h *Client) Cordon(ctx context.Context, name string) error {
err := retry.Exponential(30*time.Second, retry.WithUnits(250*time.Millisecond), retry.WithJitter(50*time.Millisecond)).RetryWithContext(ctx, func(ctx context.Context) error {
node, err := h.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
if err != nil {
return retry.UnexpectedError(err)
}
Expand All @@ -302,7 +308,7 @@ func (h *Client) Cordon(name string) error {
node.Annotations[constants.AnnotationCordonedKey] = constants.AnnotationCordonedValue
node.Spec.Unschedulable = true

if _, err := h.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}); err != nil {
if _, err := h.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}); err != nil {
return retry.ExpectedError(err)
}

Expand Down Expand Up @@ -352,58 +358,69 @@ func (h *Client) Uncordon(name string, force bool) error {
}

// Drain evicts all pods on a given node.
func (h *Client) Drain(node string) error {
func (h *Client) Drain(ctx context.Context, node string) error {
ctx, cancel := context.WithTimeout(ctx, DrainTimeout)
defer cancel()

opts := metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node}).String(),
}

pods, err := h.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), opts)
pods, err := h.CoreV1().Pods(metav1.NamespaceAll).List(ctx, opts)
if err != nil {
return fmt.Errorf("cannot get pods for node %s: %w", node, err)
}

var wg sync.WaitGroup

wg.Add(len(pods.Items))
var eg errgroup.Group

// Evict each pod.

for _, pod := range pods.Items {
go func(p corev1.Pod) {
defer wg.Done()
p := pod

for _, ref := range p.ObjectMeta.OwnerReferences {
if ref.Kind == "DaemonSet" {
log.Printf("skipping DaemonSet pod %s\n", p.GetName())
eg.Go(func() error {
if _, ok := p.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; ok {
log.Printf("skipping mirror pod %s/%s\n", p.GetNamespace(), p.GetName())

return
}
return nil
}

if ref.Kind == "Node" {
log.Printf("skipping StaticPod pod %s\n", p.GetName())
controllerRef := metav1.GetControllerOf(&p)

return
}
if controllerRef == nil {
log.Printf("skipping unmanaged pod %s/%s\n", p.GetNamespace(), p.GetName())

return nil
}

if controllerRef.Kind == appsv1.SchemeGroupVersion.WithKind("DaemonSet").Kind {
log.Printf("skipping DaemonSet pod %s/%s\n", p.GetNamespace(), p.GetName())

return nil
}

if err := h.evict(p, int64(60)); err != nil {
if !p.DeletionTimestamp.IsZero() {
log.Printf("skipping deleted pod %s/%s\n", p.GetNamespace(), p.GetName())
}

if err := h.evict(ctx, p, int64(60)); err != nil {
log.Printf("WARNING: failed to evict pod: %v", err)
}
}(pod)
}

wg.Wait()
return nil
})
}

return nil
return eg.Wait()
}

func (h *Client) evict(p corev1.Pod, gracePeriod int64) error {
func (h *Client) evict(ctx context.Context, p corev1.Pod, gracePeriod int64) error {
for {
pol := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{Namespace: p.GetNamespace(), Name: p.GetName()},
DeleteOptions: &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod},
}
err := h.CoreV1().Pods(p.GetNamespace()).Evict(context.TODO(), pol)
err := h.CoreV1().Pods(p.GetNamespace()).Evict(ctx, pol)

switch {
case apierrors.IsTooManyRequests(err):
Expand All @@ -413,16 +430,18 @@ func (h *Client) evict(p corev1.Pod, gracePeriod int64) error {
case err != nil:
return fmt.Errorf("failed to evict pod %s/%s: %w", p.GetNamespace(), p.GetName(), err)
default:
if err = h.waitForPodDeleted(&p); err != nil {
if err = h.waitForPodDeleted(ctx, &p); err != nil {
return fmt.Errorf("failed waiting on pod %s/%s to be deleted: %w", p.GetNamespace(), p.GetName(), err)
}

return nil
}
}
}

func (h *Client) waitForPodDeleted(p *corev1.Pod) error {
return retry.Constant(time.Minute, retry.WithUnits(3*time.Second)).Retry(func() error {
pod, err := h.CoreV1().Pods(p.GetNamespace()).Get(context.TODO(), p.GetName(), metav1.GetOptions{})
func (h *Client) waitForPodDeleted(ctx context.Context, p *corev1.Pod) error {
return retry.Constant(time.Minute, retry.WithUnits(3*time.Second)).RetryWithContext(ctx, func(ctx context.Context) error {
pod, err := h.CoreV1().Pods(p.GetNamespace()).Get(ctx, p.GetName(), metav1.GetOptions{})
switch {
case apierrors.IsNotFound(err):
return nil
Expand Down

0 comments on commit 779ac74

Please sign in to comment.