Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 2085997: brings back the etcd scaling test #27176

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
191 changes: 131 additions & 60 deletions test/extended/etcd/helpers/helpers.go
Expand Up @@ -16,9 +16,11 @@ import (
bmhelper "github.com/openshift/origin/test/extended/baremetal"
exutil "github.com/openshift/origin/test/extended/util"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -96,11 +98,11 @@ func EnsureMasterMachine(ctx context.Context, t TestingT, machineName string, ma

// EnsureInitialClusterState makes sure the cluster state is expected, that is, has only 3 running machines and exactly 3 voting members
// otherwise it attempts to recover the cluster by removing any excessive machines
func EnsureInitialClusterState(ctx context.Context, t TestingT, etcdClientFactory EtcdClientCreator, machineClient machinev1beta1client.MachineInterface) error {
func EnsureInitialClusterState(ctx context.Context, t TestingT, etcdClientFactory EtcdClientCreator, machineClient machinev1beta1client.MachineInterface, kubeClient kubernetes.Interface) error {
if err := recoverClusterToInitialStateIfNeeded(ctx, t, machineClient); err != nil {
return err
}
if err := EnsureVotingMembersCount(t, etcdClientFactory, 3); err != nil {
if err := EnsureVotingMembersCount(ctx, t, etcdClientFactory, kubeClient, 3); err != nil {
return err
}
return EnsureMasterMachinesAndCount(ctx, t, machineClient)
Expand All @@ -115,15 +117,7 @@ func EnsureMasterMachinesAndCount(ctx context.Context, t TestingT, machineClient
return wait.Poll(waitPollInterval, waitPollTimeout, func() (bool, error) {
machineList, err := machineClient.List(ctx, metav1.ListOptions{LabelSelector: masterMachineLabelSelector})
if err != nil {
// we tolerate some disruption until https://bugzilla.redhat.com/show_bug.cgi?id=2082778
// is fixed and rely on the monitor for reporting (p99).
// this is okay since we observe disruption during the upgrade jobs too,
// the only difference is that during the upgrade job we don’t access the API except from the monitor.
if transientAPIError(err) {
t.Logf("ignoring %v for now, the error is considered a transient error (will retry)", err)
return false, nil
}
return false, err
return isTransientAPIError(t, err)
}

if len(machineList.Items) != 3 {
Expand All @@ -149,33 +143,43 @@ func EnsureMasterMachinesAndCount(ctx context.Context, t TestingT, machineClient
}

func recoverClusterToInitialStateIfNeeded(ctx context.Context, t TestingT, machineClient machinev1beta1client.MachineInterface) error {
machineList, err := machineClient.List(ctx, metav1.ListOptions{LabelSelector: masterMachineLabelSelector})
if err != nil {
return err
}
waitPollInterval := 15 * time.Second
waitPollTimeout := 5 * time.Minute
t.Logf("Trying up to %s to recover the cluster to its initial state", waitPollTimeout.String())

var machineNames []string
for _, machine := range machineList.Items {
machineNames = append(machineNames, machine.Name)
}
return wait.Poll(waitPollInterval, waitPollTimeout, func() (bool, error) {
machineList, err := machineClient.List(ctx, metav1.ListOptions{LabelSelector: masterMachineLabelSelector})
if err != nil {
return isTransientAPIError(t, err)
}

t.Logf("checking if there are any excessive machines in the cluster (created by a previous test), expected cluster size is 3, found %v machines: %v", len(machineList.Items), machineNames)
for _, machine := range machineList.Items {
if strings.HasSuffix(machine.Name, "-clone") {
err := machineClient.Delete(ctx, machine.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed removing the machine: %q, err: %v", machine.Name, err)
}
t.Logf("successfully deleted an excessive machine %q from the API (perhaps, created by a previous test)", machine.Name)
var machineNames []string
for _, machine := range machineList.Items {
machineNames = append(machineNames, machine.Name)
}
}

return nil
t.Logf("checking if there are any excessive machines in the cluster (created by a previous test), expected cluster size is 3, found %v machines: %v", len(machineList.Items), machineNames)
for _, machine := range machineList.Items {
if strings.HasSuffix(machine.Name, "-clone") {
// first forcefully remove the hooks
machine.Spec.LifecycleHooks = machinev1beta1.LifecycleHooks{}
if _, err := machineClient.Update(ctx, &machine, metav1.UpdateOptions{}); err != nil {
return isTransientAPIError(t, err)
}
// then the machine
if err := machineClient.Delete(ctx, machine.Name, metav1.DeleteOptions{}); err != nil {
return isTransientAPIError(t, err)
}
t.Logf("successfully deleted an excessive machine %q from the API (perhaps, created by a previous test)", machine.Name)
}
}
return true, nil
})
}

// EnsureVotingMembersCount counts the number of voting etcd members, it doesn't evaluate health conditions or any other attributes (i.e. name) of individual members
// this method won't fail immediately on errors, this is useful during scaling down operation until the feature can ensure this operation to be graceful
func EnsureVotingMembersCount(t TestingT, etcdClientFactory EtcdClientCreator, expectedMembersCount int) error {
func EnsureVotingMembersCount(ctx context.Context, t TestingT, etcdClientFactory EtcdClientCreator, kubeClient kubernetes.Interface, expectedMembersCount int) error {
waitPollInterval := 15 * time.Second
waitPollTimeout := 10 * time.Minute
t.Logf("Waiting up to %s for the cluster to reach the expected member count of %v", waitPollTimeout.String(), expectedMembersCount)
Expand All @@ -188,7 +192,7 @@ func EnsureVotingMembersCount(t TestingT, etcdClientFactory EtcdClientCreator, e
}
defer closeFn()

ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
memberList, err := etcdClient.MemberList(ctx)
if err != nil {
Expand All @@ -206,32 +210,53 @@ func EnsureVotingMembersCount(t TestingT, etcdClientFactory EtcdClientCreator, e
t.Logf("unexpected number of voting etcd members, expected exactly %d, got: %v, current members are: %v", expectedMembersCount, len(votingMemberNames), votingMemberNames)
return false, nil
}

t.Logf("cluster has reached the expected number of %v voting members, the members are: %v", expectedMembersCount, votingMemberNames)

t.Logf("ensuring that the openshift-etcd/etcd-endpoints cm has the expected number of %v voting members", expectedMembersCount)
etcdEndpointsConfigMap, err := kubeClient.CoreV1().ConfigMaps("openshift-etcd").Get(ctx, "etcd-endpoints", metav1.GetOptions{})
if err != nil {
return false, err
}
currentVotingMemberIPListSet := sets.NewString()
for _, votingMemberIP := range etcdEndpointsConfigMap.Data {
currentVotingMemberIPListSet.Insert(votingMemberIP)
}
if currentVotingMemberIPListSet.Len() != expectedMembersCount {
t.Logf("unexpected number of voting members in the openshift-etcd/etcd-endpoints cm, expected exactly %d, got: %v, current members are: %v", expectedMembersCount, currentVotingMemberIPListSet.Len(), currentVotingMemberIPListSet.List())
return false, nil
}
return true, nil
})
}

func EnsureMemberRemoved(etcdClientFactory EtcdClientCreator, memberName string) error {
etcdClient, closeFn, err := etcdClientFactory.NewEtcdClient()
if err != nil {
return err
}
defer closeFn()
func EnsureMemberRemoved(t TestingT, etcdClientFactory EtcdClientCreator, memberName string) error {
waitPollInterval := 15 * time.Second
waitPollTimeout := 1 * time.Minute
t.Logf("Waiting up to %s for %v member to be removed from the cluster", waitPollTimeout.String(), memberName)

ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
defer cancel()
rsp, err := etcdClient.MemberList(ctx)
if err != nil {
return err
}
return wait.Poll(waitPollInterval, waitPollTimeout, func() (bool, error) {
etcdClient, closeFn, err := etcdClientFactory.NewEtcdClient()
if err != nil {
t.Logf("failed to get etcd client, will retry, err: %v", err)
return false, nil
}
defer closeFn()

for _, member := range rsp.Members {
if member.Name == memberName {
return fmt.Errorf("member %v hasn't been removed", spew.Sdump(member))
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
defer cancel()
rsp, err := etcdClient.MemberList(ctx)
if err != nil {
t.Logf("failed to get member list, will retry, err: %v", err)
return false, nil
}
}
return nil

for _, member := range rsp.Members {
if member.Name == memberName {
return false, fmt.Errorf("member %v hasn't been removed", spew.Sdump(member))
}
}
return true, nil
})
}

func EnsureHealthyMember(t TestingT, etcdClientFactory EtcdClientCreator, memberName string) error {
Expand All @@ -255,7 +280,9 @@ func EnsureHealthyMember(t TestingT, etcdClientFactory EtcdClientCreator, member

// MachineNameToEtcdMemberName finds an etcd member name that corresponds to the given machine name
// first it looks up a node that corresponds to the machine by comparing the ProviderID field
// next, it returns the node name as it is used to name an etcd member
// next, it returns the node name as it is used to name an etcd member.
//
// In cases the ProviderID is empty it will try to find a node that matches an internal IP address
//
// note:
// it will exit and report an error in case the node was not found
Expand All @@ -264,29 +291,51 @@ func MachineNameToEtcdMemberName(ctx context.Context, kubeClient kubernetes.Inte
if err != nil {
return "", err
}
machineProviderID := pointer.StringDeref(machine.Spec.ProviderID, "")
if len(machineProviderID) == 0 {
return "", fmt.Errorf("failed to get the providerID for %q machine", machineName)
}

// find corresponding node, match on providerID
masterNodes, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/master"})
if err != nil {
return "", err
}

machineProviderID := pointer.StringDeref(machine.Spec.ProviderID, "")
if len(machineProviderID) != 0 {
// case 1: find corresponding node, match on providerID
var nodeNames []string
for _, masterNode := range masterNodes.Items {
if masterNode.Spec.ProviderID == machineProviderID {
return masterNode.Name, nil
}
nodeNames = append(nodeNames, masterNode.Name)
}

return "", fmt.Errorf("unable to find a node for the corresponding %q machine on ProviderID: %v, checked: %v", machineName, machineProviderID, nodeNames)
}

// case 2: match on an internal ip address
machineIPListSet := sets.NewString()
for _, addr := range machine.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
machineIPListSet.Insert(addr.Address)
}
}

var nodeNames []string
for _, masterNode := range masterNodes.Items {
if masterNode.Spec.ProviderID == machineProviderID {
return masterNode.Name, nil
for _, addr := range masterNode.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
if machineIPListSet.Has(addr.Address) {
return masterNode.Name, nil
}
}
nodeNames = append(nodeNames, masterNode.Name)
}
nodeNames = append(nodeNames, masterNode.Name)
}

return "", fmt.Errorf("unable to find a node for the corresponding %q machine on ProviderID: %v, checked: %v", machineName, machineProviderID, nodeNames)
return "", fmt.Errorf("unable to find a node for the corresponding %q machine on the following machine's IPs: %v, checked: %v", machineName, machineIPListSet.List(), nodeNames)
}

func InitPlatformSpecificConfiguration(oc *exutil.CLI) func() {
SkipIfUnsupportedPlatform(context.TODO(), oc)

infra, err := oc.AdminConfigClient().ConfigV1().Infrastructures().Get(context.Background(), "cluster", metav1.GetOptions{})
o.Expect(err).NotTo(o.HaveOccurred())

Expand All @@ -312,6 +361,7 @@ func SkipIfUnsupportedPlatform(ctx context.Context, oc *exutil.CLI) {
skipUnlessFunctionalMachineAPI(ctx, machineClient)
skipIfAzure(oc)
skipIfSingleNode(oc)
skipIfBareMetal(oc)
}

func skipUnlessFunctionalMachineAPI(ctx context.Context, machineClient machinev1beta1client.MachineInterface) {
Expand Down Expand Up @@ -358,6 +408,15 @@ func skipIfSingleNode(oc *exutil.CLI) {
}
}

func skipIfBareMetal(oc *exutil.CLI) {
infra, err := oc.AdminConfigClient().ConfigV1().Infrastructures().Get(context.Background(), "cluster", metav1.GetOptions{})
o.Expect(err).NotTo(o.HaveOccurred())

if infra.Status.PlatformStatus.Type == configv1.BareMetalPlatformType {
e2eskipper.Skipf("this test is currently broken on the metal platform and needs to be fixed")
}
}

func hasMachineDeletionHook(machine *machinev1beta1.Machine) bool {
for _, hook := range machine.Spec.LifecycleHooks.PreDrain {
if hook.Name == machineDeletionHookName && hook.Owner == machineDeletionHookOwner {
Expand All @@ -379,6 +438,18 @@ func transientAPIError(err error) bool {
}
}

func isTransientAPIError(t TestingT, err error) (bool, error) {
// we tolerate some disruption until https://bugzilla.redhat.com/show_bug.cgi?id=2082778
// is fixed and rely on the monitor for reporting (p99).
// this is okay since we observe disruption during the upgrade jobs too,
// the only difference is that during the upgrade job we don’t access the API except from the monitor.
if transientAPIError(err) {
t.Logf("ignoring %v for now, the error is considered a transient error (will retry)", err)
return false, nil
}
return false, err
}

func isClientConnectionLost(err error) bool {
return strings.Contains(err.Error(), "client connection lost")
}
96 changes: 96 additions & 0 deletions test/extended/etcd/vertical_scaling.go
@@ -0,0 +1,96 @@
package etcd

import (
"context"

g "github.com/onsi/ginkgo"
o "github.com/onsi/gomega"

machineclient "github.com/openshift/client-go/machine/clientset/versioned"
testlibraryapi "github.com/openshift/library-go/test/library/apiserver"
scalingtestinglibrary "github.com/openshift/origin/test/extended/etcd/helpers"
exutil "github.com/openshift/origin/test/extended/util"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/test/e2e/framework"
)

var _ = g.Describe("[sig-etcd][Serial] etcd", func() {
defer g.GinkgoRecover()
oc := exutil.NewCLIWithoutNamespace("etcd-scaling").AsAdmin()

cleanupPlatformSpecificConfiguration := func() { /*noop*/ }

g.BeforeEach(func() {
cleanupPlatformSpecificConfiguration = scalingtestinglibrary.InitPlatformSpecificConfiguration(oc)
})

g.AfterEach(func() {
cleanupPlatformSpecificConfiguration()
})

// The following test covers a basic vertical scaling scenario.
// It starts by adding a new master machine to the cluster
// next it validates the size of etcd cluster and makes sure the new member is healthy.
// The test ends by removing the newly added machine and validating the size of the cluster
// and asserting the member was removed from the etcd cluster by contacting MemberList API.
g.It("is able to vertically scale up and down with a single node", func() {
// set up
ctx := context.TODO()
etcdClientFactory := scalingtestinglibrary.NewEtcdClientFactory(oc.KubeClient())
machineClientSet, err := machineclient.NewForConfig(oc.KubeFramework().ClientConfig())
o.Expect(err).ToNot(o.HaveOccurred())
machineClient := machineClientSet.MachineV1beta1().Machines("openshift-machine-api")
kubeClient := oc.KubeClient()

// make sure it can be run on the current platform
scalingtestinglibrary.SkipIfUnsupportedPlatform(ctx, oc)

// assert the cluster state before we run the test
err = scalingtestinglibrary.EnsureInitialClusterState(ctx, g.GinkgoT(), etcdClientFactory, machineClient, kubeClient)
o.Expect(err).ToNot(o.HaveOccurred())

// step 0: ensure clean state after the test
defer func() {
// since the deletion triggers a new rollout
// we need to make sure that the API is stable after the test
// so that other e2e test won't hit an API that undergoes a termination (write request might fail)
g.GinkgoT().Log("cleaning routine: ensuring initial cluster state and waiting for api servers to stabilize on the same revision")
err = scalingtestinglibrary.EnsureInitialClusterState(ctx, g.GinkgoT(), etcdClientFactory, machineClient, kubeClient)
o.Expect(err).ToNot(o.HaveOccurred())
err = testlibraryapi.WaitForAPIServerToStabilizeOnTheSameRevision(g.GinkgoT(), oc.KubeClient().CoreV1().Pods("openshift-kube-apiserver"))
o.Expect(err).ToNot(o.HaveOccurred())
}()

// step 1: add a new master node and wait until it is in Running state
machineName, err := scalingtestinglibrary.CreateNewMasterMachine(ctx, g.GinkgoT(), machineClient)
o.Expect(err).ToNot(o.HaveOccurred())
err = scalingtestinglibrary.EnsureMasterMachine(ctx, g.GinkgoT(), machineName, machineClient)
o.Expect(err).ToNot(o.HaveOccurred())

// step 2: wait until a new member shows up and check if it is healthy
// and until all kube-api servers have reached the same revision
// this additional step is the best-effort of ensuring they
// have observed the new member before disruption
err = scalingtestinglibrary.EnsureVotingMembersCount(ctx, g.GinkgoT(), etcdClientFactory, kubeClient, 4)
o.Expect(err).ToNot(o.HaveOccurred())
memberName, err := scalingtestinglibrary.MachineNameToEtcdMemberName(ctx, oc.KubeClient(), machineClient, machineName)
o.Expect(err).ToNot(o.HaveOccurred())
err = scalingtestinglibrary.EnsureHealthyMember(g.GinkgoT(), etcdClientFactory, memberName)
o.Expect(err).ToNot(o.HaveOccurred())
g.GinkgoT().Log("waiting for api servers to stabilize on the same revision")
err = testlibraryapi.WaitForAPIServerToStabilizeOnTheSameRevision(g.GinkgoT(), oc.KubeClient().CoreV1().Pods("openshift-kube-apiserver"))
o.Expect(err).ToNot(o.HaveOccurred())

// step 3: clean-up: delete the machine and wait until etcd member is removed from the etcd cluster
err = machineClient.Delete(ctx, machineName, metav1.DeleteOptions{})
o.Expect(err).ToNot(o.HaveOccurred())
framework.Logf("successfully deleted the machine %q from the API", machineName)
err = scalingtestinglibrary.EnsureVotingMembersCount(ctx, g.GinkgoT(), etcdClientFactory, kubeClient, 3)
o.Expect(err).ToNot(o.HaveOccurred())
err = scalingtestinglibrary.EnsureMemberRemoved(g.GinkgoT(), etcdClientFactory, memberName)
o.Expect(err).ToNot(o.HaveOccurred())
err = scalingtestinglibrary.EnsureMasterMachinesAndCount(ctx, g.GinkgoT(), machineClient)
o.Expect(err).ToNot(o.HaveOccurred())
})
})