Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to increase concurrency of rolling update within instancegroup #8271

Merged
merged 11 commits into from
Jan 28, 2020
1 change: 1 addition & 0 deletions pkg/instancegroups/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_test(
"//upup/pkg/fi/cloudup/awsup:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/autoscaling:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
Expand Down
85 changes: 75 additions & 10 deletions pkg/instancegroups/instancegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func promptInteractive(upgradedHostId, upgradedHostName string) (stopPrompting b

// TODO: Temporarily increase size of ASG?
// TODO: Remove from ASG first so status is immediately updated?
// TODO: Batch termination, like a rolling-update

// RollingUpdate performs a rolling update on a list of ec2 instances.
func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpdateCluster, cluster *api.Cluster, isBastion bool, sleepAfterTerminate time.Duration, validationTimeout time.Duration) (err error) {
Expand All @@ -118,6 +117,8 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
return fmt.Errorf("rollingUpdate is missing a k8s client")
}

noneReady := len(r.CloudGroup.Ready) == 0
numInstances := len(r.CloudGroup.Ready) + len(r.CloudGroup.NeedUpdate)
update := r.CloudGroup.NeedUpdate
if rollingUpdateData.Force {
update = append(update, r.CloudGroup.Ready...)
Expand Down Expand Up @@ -148,15 +149,40 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
}
}

for _, u := range update {
err = r.drainTerminateAndWait(u, rollingUpdateData, isBastion, sleepAfterTerminate)
settings := resolveSettings(cluster, r.CloudGroup.InstanceGroup, numInstances)

concurrency := 0
johngmyers marked this conversation as resolved.
Show resolved Hide resolved
maxConcurrency := 1

if r.CloudGroup.InstanceGroup.Spec.Role == api.InstanceGroupRoleNode && !rollingUpdateData.Interactive {
maxConcurrency = settings.MaxUnavailable.IntValue()
if maxConcurrency == 0 {
johngmyers marked this conversation as resolved.
Show resolved Hide resolved
klog.Infof("Rolling updates for InstanceGroup %s are disabled", r.CloudGroup.InstanceGroup.Name)
return nil
}
}

terminateChan := make(chan error, maxConcurrency)

for uIdx, u := range update {
go r.drainTerminateAndWait(u, rollingUpdateData, terminateChan, isBastion, sleepAfterTerminate)
concurrency++

// Wait until after one node is deleted and its replacement validates before the concurrent draining
// in case the current spec does not result in usable nodes.
if concurrency < maxConcurrency && (!noneReady || uIdx > 0) {
continue
}

err = <-terminateChan
concurrency--
if err != nil {
return err
return waitForPendingBeforeReturningError(concurrency, terminateChan, err)
}

err = r.maybeValidate(rollingUpdateData, validationTimeout)
if err != nil {
return err
return waitForPendingBeforeReturningError(concurrency, terminateChan, err)
}

if rollingUpdateData.Interactive {
Expand All @@ -174,11 +200,47 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
rollingUpdateData.Interactive = false
}
}

sweep:
for concurrency > 0 {
johngmyers marked this conversation as resolved.
Show resolved Hide resolved
select {
case err = <-terminateChan:
concurrency--
if err != nil {
return waitForPendingBeforeReturningError(concurrency, terminateChan, err)
}
default:
break sweep
}
}
}

if concurrency > 0 {
for concurrency > 0 {
err = <-terminateChan
concurrency--
if err != nil {
return waitForPendingBeforeReturningError(concurrency, terminateChan, err)
}
}

err = r.maybeValidate(rollingUpdateData, validationTimeout)
if err != nil {
return err
}
}

return nil
}

func waitForPendingBeforeReturningError(concurrency int, terminateChan chan error, err error) error {
for concurrency > 0 {
<-terminateChan
concurrency--
}
return err
}

func (r *RollingUpdateInstanceGroup) taintAllNeedUpdate(update []*cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster) error {
var toTaint []*corev1.Node
for _, u := range update {
Expand Down Expand Up @@ -237,7 +299,7 @@ func (r *RollingUpdateInstanceGroup) patchTaint(rollingUpdateData *RollingUpdate
return err
}

func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster, isBastion bool, sleepAfterTerminate time.Duration) error {
func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster, terminateChan chan error, isBastion bool, sleepAfterTerminate time.Duration) {
instanceId := u.ID

nodeName := ""
Expand All @@ -258,7 +320,8 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo

if err := r.DrainNode(u, rollingUpdateData); err != nil {
if rollingUpdateData.FailOnDrainError {
return fmt.Errorf("failed to drain node %q: %v", nodeName, err)
terminateChan <- fmt.Errorf("failed to drain node %q: %v", nodeName, err)
return
}
klog.Infof("Ignoring error draining node %q: %v", nodeName, err)
}
Expand All @@ -275,21 +338,23 @@ func (r *RollingUpdateInstanceGroup) drainTerminateAndWait(u *cloudinstances.Clo
} else {
klog.Infof("deleting node %q from kubernetes", nodeName)
if err := r.deleteNode(u.Node, rollingUpdateData); err != nil {
return fmt.Errorf("error deleting node %q: %v", nodeName, err)
terminateChan <- fmt.Errorf("error deleting node %q: %v", nodeName, err)
return
}
}
}

if err := r.DeleteInstance(u); err != nil {
klog.Errorf("error deleting instance %q, node %q: %v", instanceId, nodeName, err)
return err
terminateChan <- err
return
}

// Wait for the minimum interval
klog.Infof("waiting for %v after terminating instance", sleepAfterTerminate)
time.Sleep(sleepAfterTerminate)

return nil
terminateChan <- nil
johngmyers marked this conversation as resolved.
Show resolved Hide resolved
}

func (r *RollingUpdateInstanceGroup) maybeValidate(rollingUpdateData *RollingUpdateCluster, validationTimeout time.Duration) error {
Expand Down
Loading