Skip to content

Commit

Permalink
Implement CordonAllNeedUpdate
Browse files Browse the repository at this point in the history
  • Loading branch information
johngmyers committed Nov 27, 2019
1 parent e914af1 commit b0258b5
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 8 deletions.
3 changes: 2 additions & 1 deletion pkg/drain/cordon.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type CordonHelper struct {
// NewCordonHelper returns a new CordonHelper
func NewCordonHelper(node *corev1.Node) *CordonHelper {
return &CordonHelper{
node: node,
node: node,
desired: true,
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/featureflag/featureflag.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ var (
VSphereCloudProvider = New("VSphereCloudProvider", Bool(false))
// SkipEtcdVersionCheck will bypass the check that etcd-manager is using a supported etcd version
SkipEtcdVersionCheck = New("SkipEtcdVersionCheck", Bool(false))
// ConfigurableRollingUpdate enables the RollingUpdate strategy configuration settings
ConfigurableRollingUpdate = New("ConfigurableRollingUpdate", Bool(false))
)

// FeatureFlag defines a feature flag
Expand Down
3 changes: 3 additions & 0 deletions pkg/instancegroups/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_test(
"//cloudmock/aws/mockautoscaling:go_default_library",
"//pkg/apis/kops:go_default_library",
"//pkg/cloudinstances:go_default_library",
"//pkg/featureflag:go_default_library",
"//pkg/validation:go_default_library",
"//upup/pkg/fi/cloudup/awsup:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
Expand All @@ -45,5 +46,7 @@ go_test(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)
53 changes: 52 additions & 1 deletion pkg/instancegroups/instancegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
return fmt.Errorf("rollingUpdate is missing a k8s client")
}

noneReady := len(r.CloudGroup.Ready) == 0
update := r.CloudGroup.NeedUpdate
if rollingUpdateData.Force {
update = append(update, r.CloudGroup.Ready...)
Expand All @@ -137,7 +138,16 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
}
}

for _, u := range update {
settings := resolveSettings(cluster, r.CloudGroup.InstanceGroup)

for uIdx, u := range update {
if featureflag.ConfigurableRollingUpdate.Enabled() && *settings.CordonAllNeedUpdate {
err := r.maybeCordonAllNeedUpdate(update, rollingUpdateData, noneReady, uIdx)
if err != nil {
return err
}
}

instanceId := u.ID

nodeName := ""
Expand Down Expand Up @@ -225,6 +235,47 @@ func (r *RollingUpdateInstanceGroup) RollingUpdate(rollingUpdateData *RollingUpd
return nil
}

func (r *RollingUpdateInstanceGroup) maybeCordonAllNeedUpdate(update []*cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster, noneReady bool, uIdx int) error {
if r.CloudGroup.InstanceGroup.Spec.Role != api.InstanceGroupRoleNode || rollingUpdateData.CloudOnly {
return nil
}

if noneReady {
// Wait until after one node is deleted and its replacement validates before the mass-cordoning
// in case the current spec does not result in usable nodes.
if uIdx != 1 || len(update) < 2 {
return nil
}
} else {
if uIdx != 0 || len(update) < 1 {
return nil
}
}

var toCordon []*corev1.Node
for _, u := range update {
if u.Node != nil && !u.Node.Spec.Unschedulable {
toCordon = append(toCordon, u.Node)
}
}
if len(toCordon) > 0 {
noun := "nodes"
if len(toCordon) == 1 {
noun = "node"
}
klog.Infof("Cordoning %d %s in %q instancegroup.", len(toCordon), noun, r.CloudGroup.InstanceGroup.Name)
for _, n := range toCordon {
if err, _ := drain.NewCordonHelper(n).PatchOrReplace(rollingUpdateData.K8sClient); err != nil {
if rollingUpdateData.FailOnDrainError {
return fmt.Errorf("failed to cordon node %q: %v", n, err)
}
klog.Infof("Ignoring error cordoning node %q: %v", n, err)
}
}
}
return nil
}

// validateClusterWithDuration runs validation.ValidateCluster until either we get positive result or the timeout expires
func (r *RollingUpdateInstanceGroup) validateClusterWithDuration(rollingUpdateData *RollingUpdateCluster, duration time.Duration) error {
// TODO should we expose this to the UI?
Expand Down
171 changes: 165 additions & 6 deletions pkg/instancegroups/rollingupdate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,25 @@ package instancegroups
import (
"errors"
"fmt"
"strings"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/stretchr/testify/assert"
"k8s.io/kops/pkg/featureflag"

v1 "k8s.io/api/core/v1"
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
testingclient "k8s.io/client-go/testing"
"k8s.io/kops/cloudmock/aws/mockautoscaling"
kopsapi "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/validation"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
"k8s.io/utils/pointer"
)

func getTestSetup() (*RollingUpdateCluster, awsup.AWSCloud, *kopsapi.Cluster, map[string]*cloudinstances.CloudInstanceGroup) {
Expand Down Expand Up @@ -215,18 +219,17 @@ func getGroups(k8sClient *fake.Clientset) map[string]*cloudinstances.CloudInstan

func markNeedUpdate(group *cloudinstances.CloudInstanceGroup, nodeIds ...string) {
for _, nodeId := range nodeIds {
var newReady []*cloudinstances.CloudInstanceGroupMember
found := false
for _, member := range group.Ready {
if member.ID == nodeId {
group.NeedUpdate = append(group.NeedUpdate, &cloudinstances.CloudInstanceGroupMember{
ID: member.ID,
Node: member.Node,
CloudInstanceGroup: member.CloudInstanceGroup,
})
group.NeedUpdate = append(group.NeedUpdate, member)
found = true
break
} else {
newReady = append(newReady, member)
}
}
group.Ready = newReady
if !found {
panic(fmt.Sprintf("didn't find nodeId %s in ready list", nodeId))
}
Expand All @@ -243,10 +246,38 @@ func markAllNeedUpdate(groups map[string]*cloudinstances.CloudInstanceGroup) {
func TestRollingUpdateAllNeedUpdate(t *testing.T) {
c, cloud, cluster, groups := getTestSetup()

featureflag.ParseFlags("+ConfigurableRollingUpdate")
defer featureflag.ParseFlags("-ConfigurableRollingUpdate")

markAllNeedUpdate(groups)
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")

cordoned := ""
deleted := map[string]bool{}
for _, action := range c.K8sClient.(*fake.Clientset).Actions() {
switch a := action.(type) {
case testingclient.PatchAction: // cordon
assertCordon(t, a)
assert.Equal(t, "", cordoned, "at most one node cordoned at a time")
cordoned = a.GetName()
case testingclient.DeleteAction:
assert.Equal(t, "nodes", a.GetResource().Resource)
assert.Equal(t, cordoned, a.GetName(), "node was cordoned before delete")
assert.False(t, deleted[a.GetName()], "node", a.GetName(), "already deleted")
if !strings.HasPrefix(a.GetName(), "master-") {
assert.True(t, deleted["master-1a.local"], "master-1a was deleted before node", a.GetName())
assert.True(t, deleted["master-1b.local"], "master-1b was deleted before node", a.GetName())
}
deleted[a.GetName()] = true
cordoned = ""
case testingclient.ListAction:
// Don't care
default:
t.Errorf("unexpected action %v", a)
}
}

asgGroups, _ := cloud.Autoscaling().DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{})
for _, group := range asgGroups.AutoScalingGroups {
assert.Emptyf(t, group.Instances, "Not all instances terminated in group %s", group.AutoScalingGroupName)
Expand All @@ -263,6 +294,8 @@ func TestRollingUpdateAllNeedUpdateCloudonly(t *testing.T) {
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")

assert.Empty(t, c.K8sClient.(*fake.Clientset).Actions())

asgGroups, _ := cloud.Autoscaling().DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{})
for _, group := range asgGroups.AutoScalingGroups {
assert.Emptyf(t, group.Instances, "Not all instances terminated in group %s", group.AutoScalingGroupName)
Expand Down Expand Up @@ -291,6 +324,8 @@ func TestRollingUpdateNoneNeedUpdate(t *testing.T) {
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")

assert.Empty(t, c.K8sClient.(*fake.Clientset).Actions())

assertGroupInstanceCount(t, cloud, "node-1", 3)
assertGroupInstanceCount(t, cloud, "node-2", 3)
assertGroupInstanceCount(t, cloud, "master-1", 2)
Expand Down Expand Up @@ -492,6 +527,130 @@ func TestRollingUpdateClusterErrorsValidationAfterOneNode(t *testing.T) {
assertGroupInstanceCount(t, cloud, "node-1", 2)
}

func TestRollingUpdateCordonAllNeedUpdate(t *testing.T) {
c, cloud, cluster, groups := getTestSetup()

featureflag.ParseFlags("+ConfigurableRollingUpdate")
defer featureflag.ParseFlags("-ConfigurableRollingUpdate")

cluster.Spec.RollingUpdateDefault = &kopsapi.RollingUpdateDefault{
CordonAllNeedUpdate: pointer.BoolPtr(true),
}

markNeedUpdate(groups["node-1"], "node-1a", "node-1b", "node-1c")
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")

cordoned := map[string]bool{}
deleted := map[string]bool{}
for _, action := range c.K8sClient.(*fake.Clientset).Actions() {
switch a := action.(type) {
case testingclient.PatchAction: // cordon
assertCordon(t, a)
assert.LessOrEqual(t, len(deleted), 1, "not cordoning after more than one node deleted")
assert.False(t, cordoned[a.GetName()], "node", a.GetName(), "already cordoned")
cordoned[a.GetName()] = true
case testingclient.DeleteAction:
assert.Equal(t, "nodes", a.GetResource().Resource)
if len(deleted) == 0 {
assert.Len(t, cordoned, 1, "first node was deleted before the rest were cordoned")
assert.Contains(t, cordoned, a.GetName(), "node was cordoned before delete")
} else {
assert.Len(t, cordoned, 3, "all nodes cordoned before any subsequent delete")
}
assert.False(t, deleted[a.GetName()], "node", a.GetName(), "already deleted")
deleted[a.GetName()] = true
case testingclient.ListAction:
// Don't care
default:
t.Errorf("unexpected action %v", a)
}
}

assertGroupInstanceCount(t, cloud, "node-1", 0)
}

func TestRollingUpdateCordonAllButOneNeedUpdate(t *testing.T) {
c, cloud, cluster, groups := getTestSetup()

featureflag.ParseFlags("+ConfigurableRollingUpdate")
defer featureflag.ParseFlags("-ConfigurableRollingUpdate")

cluster.Spec.RollingUpdateDefault = &kopsapi.RollingUpdateDefault{
CordonAllNeedUpdate: pointer.BoolPtr(true),
}

markNeedUpdate(groups["node-1"], "node-1a", "node-1b")
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")

cordoned := map[string]bool{}
deleted := map[string]bool{}
for _, action := range c.K8sClient.(*fake.Clientset).Actions() {
switch a := action.(type) {
case testingclient.PatchAction: // cordon
assertCordon(t, a)
assert.Empty(t, deleted, "not cordoning after any node deleted")
assert.False(t, cordoned[a.GetName()], "node", a.GetName(), "already cordoned")
cordoned[a.GetName()] = true
case testingclient.DeleteAction:
assert.Equal(t, "nodes", a.GetResource().Resource)
assert.Len(t, cordoned, 2, "all nodes cordoned before any delete")
assert.False(t, deleted[a.GetName()], "node", a.GetName(), "already deleted")
deleted[a.GetName()] = true
case testingclient.ListAction:
// Don't care
default:
t.Errorf("unexpected action %v", a)
}
}

assertGroupInstanceCount(t, cloud, "node-1", 1)
}

func TestRollingUpdateCordonAllNeedUpdateIgnoredForMaster(t *testing.T) {
c, cloud, cluster, groups := getTestSetup()

featureflag.ParseFlags("+ConfigurableRollingUpdate")
defer featureflag.ParseFlags("-ConfigurableRollingUpdate")

cluster.Spec.RollingUpdateDefault = &kopsapi.RollingUpdateDefault{
CordonAllNeedUpdate: pointer.BoolPtr(true),
}

markNeedUpdate(groups["master-1"], "master-1a", "master-1b")
err := c.RollingUpdate(groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")

cordoned := ""
deleted := map[string]bool{}
for _, action := range c.K8sClient.(*fake.Clientset).Actions() {
switch a := action.(type) {
case testingclient.PatchAction: // cordon
assertCordon(t, a)
assert.Equal(t, "", cordoned, "at most one node cordoned at a time")
cordoned = a.GetName()
case testingclient.DeleteAction:
assert.Equal(t, "nodes", a.GetResource().Resource)
assert.Equal(t, cordoned, a.GetName(), "node was cordoned before delete")
assert.False(t, deleted[a.GetName()], "node", a.GetName(), "already deleted")
deleted[a.GetName()] = true
cordoned = ""
case testingclient.ListAction:
// Don't care
default:
t.Errorf("unexpected action %v", a)
}
}

assertGroupInstanceCount(t, cloud, "master-1", 0)
}

func assertCordon(t *testing.T, action testingclient.PatchAction) {
assert.Equal(t, "nodes", action.GetResource().Resource)
assert.Equal(t, "{\"spec\":{\"unschedulable\":true}}", string(action.GetPatch()))
}

func assertGroupInstanceCount(t *testing.T, cloud awsup.AWSCloud, groupName string, expected int) {
asgGroups, _ := cloud.Autoscaling().DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: []*string{aws.String(groupName)},
Expand Down

0 comments on commit b0258b5

Please sign in to comment.