-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
drain.go
63 lines (53 loc) · 1.68 KB
/
drain.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package nodegroup
import (
"context"
"time"
"k8s.io/client-go/kubernetes"
"github.com/kris-nova/logger"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"github.com/weaveworks/eksctl/pkg/drain"
"github.com/weaveworks/eksctl/pkg/eks"
)
type DrainInput struct {
NodeGroups []eks.KubeNodeGroup
Plan bool
MaxGracePeriod time.Duration
NodeDrainWaitPeriod time.Duration
PodEvictionWaitPeriod time.Duration
Undo bool
DisableEviction bool
Parallel int
}
// A Drainer drains nodegroups.
type Drainer struct {
ClientSet kubernetes.Interface
}
// Drain drains nodegroups.
func (d *Drainer) Drain(ctx context.Context, input *DrainInput) error {
parallelLimit := int64(input.Parallel)
sem := semaphore.NewWeighted(parallelLimit)
logger.Info("starting parallel draining, max in-flight of %d", parallelLimit)
if input.Plan {
return nil
}
g, ctx := errgroup.WithContext(ctx)
for _, nodegroup := range input.NodeGroups {
nodegroup := nodegroup
g.Go(func() error {
nodeGroupDrainer := drain.NewNodeGroupDrainer(d.ClientSet, nodegroup, input.MaxGracePeriod, input.NodeDrainWaitPeriod, input.PodEvictionWaitPeriod, input.Undo, input.DisableEviction, input.Parallel)
return nodeGroupDrainer.Drain(ctx, sem)
})
}
err := g.Wait()
if err != nil {
logger.Critical("Node group drain failed: %v", err)
}
waitForAllRoutinesToFinish(ctx, sem, parallelLimit)
return err
}
func waitForAllRoutinesToFinish(ctx context.Context, sem *semaphore.Weighted, size int64) {
if err := sem.Acquire(ctx, size); err != nil {
logger.Critical("failed to acquire semaphore while waiting for all routines to finish: %w", err)
}
}