Skip to content

Commit

Permalink
Implement MaxUnavailable
Browse files Browse the repository at this point in the history
  • Loading branch information
johngmyers committed Jan 5, 2020
1 parent 0952374 commit 0c3651c
Show file tree
Hide file tree
Showing 3 changed files with 336 additions and 11 deletions.
1 change: 1 addition & 0 deletions pkg/instancegroups/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_test(
"//upup/pkg/fi/cloudup/awsup:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/autoscaling:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
Expand Down
85 changes: 75 additions & 10 deletions pkg/instancegroups/instancegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting b

// TODO: Temporarily increase size of ASG?
// TODO: Remove from ASG first so status is immediately updated?
// TODO: Batch termination, like a rolling-update

// RollingUpdate performs a rolling update on a list of ec2 instances.
func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpdateCluster, cluster *api.Cluster, isBastion bool, sleepAfterTerminate time.Duration, validationTimeout time.Duration) (err error) {
Expand All @@ -118,6 +117,8 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
return fmt.Errorf("rollingUpdate is missing a k8s client")
}

noneReady := len(r.CloudGroup.Ready) == 0
numInstances := len(r.CloudGroup.Ready) + len(r.CloudGroup.NeedUpdate)
update := r.CloudGroup.NeedUpdate
if rollingUpdateData.Force {
update = append(update, r.CloudGroup.Ready...)
Expand Down Expand Up @@ -148,15 +149,40 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
}
}

for _, u := range update {
err = r.drainTerminateAndWait(u, rollingUpdateData, isBastion, sleepAfterTerminate)
settings := resolveSettings(cluster, r.CloudGroup.InstanceGroup, numInstances)

concurrency := 0
maxConcurrency := 1

if r.CloudGroup.InstanceGroup.Spec.Role == api.InstanceGroupRoleNode && !rollingUpdateData.Interactive {
maxConcurrency = settings.MaxUnavailable.IntValue()
if maxConcurrency == 0 {
klog.Infof("Rolling updates for InstanceGroup %s are disabled", r.CloudGroup.InstanceGroup.Name)
return nil
}
}

terminateChan := make(chan error, maxConcurrency)

for uIdx, u := range update {
go r.drainTerminateAndWait(u, rollingUpdateData, terminateChan, isBastion, sleepAfterTerminate)
concurrency++

// Wait until after one node is deleted and its replacement validates before the concurrent draining
// in case the current spec does not result in usable nodes.
if concurrency < maxConcurrency && (!noneReady || uIdx > 0) {
continue
}

err = <-terminateChan
concurrency--
if err != nil {
return err
return waitForPendingBeforeReturningError(concurrency, terminateChan, err)
}

err = r.maybeValidate(rollingUpdateData, validationTimeout)
if err != nil {
return err
return waitForPendingBeforeReturningError(concurrency, terminateChan, err)
}

if rollingUpdateData.Interactive {
Expand All @@ -174,11 +200,47 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
rollingUpdateData.Interactive = false
}
}

sweep:
for concurrency > 0 {
select {
case err = <-terminateChan:
concurrency--
if err != nil {
return waitForPendingBeforeReturningError(concurrency, terminateChan, err)
}
default:
break sweep
}
}
}

if concurrency > 0 {
for concurrency > 0 {
err = <-terminateChan
concurrency--
if err != nil {
return waitForPendingBeforeReturningError(concurrency, terminateChan, err)
}
}

err = r.maybeValidate(rollingUpdateData, validationTimeout)
if err != nil {
return err
}
}

return nil
}

func waitForPendingBeforeReturningError(concurrency int, terminateChan chan error, err error) error {
for concurrency > 0 {
<-terminateChan
concurrency--
}
return err
}

func (r *RollingUpdateInstanceGroup) taintAllNeedUpdate(update []*cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster) error {
var toTaint []*corev1.Node
for _, u := range update {
Expand Down Expand Up @@ -237,7 +299,7 @@ func (r *RollingUpdateInstanceGroup) patchTaint(rollingUpdateData *RollingUpdate
return err
}

func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster, isBastion bool, sleepAfterTerminate time.Duration) error {
func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster, terminateChan chan error, isBastion bool, sleepAfterTerminate time.Duration) {
instanceId := u.ID

nodeName := ""
Expand All @@ -258,7 +320,8 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo

if err := r.DrainNode(u, rollingUpdateData); err != nil {
if rollingUpdateData.FailOnDrainError {
return fmt.Errorf("failed to drain node %q: %v", nodeName, err)
terminateChan <- fmt.Errorf("failed to drain node %q: %v", nodeName, err)
return
}
klog.Infof("Ignoring error draining node %q: %v", nodeName, err)
}
Expand All @@ -275,21 +338,23 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
} else {
klog.Infof("deleting node %q from kubernetes", nodeName)
if err := r.deleteNode(u.Node, rollingUpdateData); err != nil {
return fmt.Errorf("error deleting node %q: %v", nodeName, err)
terminateChan <- fmt.Errorf("error deleting node %q: %v", nodeName, err)
return
}
}
}

if err := r.DeleteInstance(u); err != nil {
klog.Errorf("error deleting instance %q, node %q: %v", instanceId, nodeName, err)
return err
terminateChan <- err
return
}

// Wait for the minimum interval
klog.Infof("waiting for %v after terminating instance", sleepAfterTerminate)
time.Sleep(sleepAfterTerminate)

return nil
terminateChan <- nil
}

func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpdateCluster, validationTimeout time.Duration) error {
Expand Down
Loading

0 comments on commit 0c3651c

Please sign in to comment.