Skip to content

Commit

Permalink
[release-4.12] OCPBUGS-6841: Update the vertical scaling test to acco…
Browse files Browse the repository at this point in the history
…unt for CPMSO (#27907)

* manual cherrypick of #27788

* bindata diff

* go1.19 diff
  • Loading branch information
Elbehery committed May 23, 2023
1 parent a5a9705 commit 06127b0
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 3 deletions.
124 changes: 123 additions & 1 deletion test/extended/etcd/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package helpers
import (
"context"
"fmt"
"sort"
"strings"
"time"

"github.com/davecgh/go-spew/spew"
o "github.com/onsi/gomega"

configv1 "github.com/openshift/api/config/v1"
machinev1 "github.com/openshift/api/machine/v1"
machinev1beta1 "github.com/openshift/api/machine/v1beta1"
machineclient "github.com/openshift/client-go/machine/clientset/versioned"
machinev1client "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1"
machinev1beta1client "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1beta1"

bmhelper "github.com/openshift/origin/test/extended/baremetal"
Expand All @@ -25,13 +28,15 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
"k8s.io/utils/pointer"
)

const masterMachineLabelSelector = "machine.openshift.io/cluster-api-machine-role" + "=" + "master"
const machineDeletionHookName = "EtcdQuorumOperator"
const machineDeletionHookOwner = "clusteroperator/etcd"
const masterNodeRoleLabel = "node-role.kubernetes.io/master"

type TestingT interface {
Logf(format string, args ...interface{})
Expand Down Expand Up @@ -180,6 +185,123 @@ func recoverClusterToInitialStateIfNeeded(ctx context.Context, t TestingT, machi
})
}

func DeleteSingleMachine(ctx context.Context, t TestingT, machineClient machinev1beta1client.MachineInterface) (string, error) {
machineToDelete := ""
// list master machines
machineList, err := machineClient.List(ctx, metav1.ListOptions{LabelSelector: masterMachineLabelSelector})
if err != nil {
return "", fmt.Errorf("error listing master machines: '%w'", err)
}
// Machine names are suffixed with an index number (e.g "ci-op-xlbdrkvl-6a467-qcbkh-master-0")
// so we sort to pick the lowest index, e.g master-0 in this example
machineNames := []string{}
for _, m := range machineList.Items {
machineNames = append(machineNames, m.Name)
}
sort.Strings(machineNames)
machineToDelete = machineNames[0]

t.Logf("attempting to delete machine '%q'", machineToDelete)
if err := machineClient.Delete(ctx, machineToDelete, metav1.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) {
t.Logf("machine '%q' was listed but not found or already deleted", machineToDelete)
return "", nil
}
return "", err
}
t.Logf("successfully deleted machine '%q'", machineToDelete)
return machineToDelete, nil
}

// IsCPMSActive returns true if the current platform's has an active CPMS
// Not all platforms are supported (as of 4.12 only AWS and Azure)
// See https://github.com/openshift/cluster-control-plane-machine-set-operator/tree/main/docs/user#supported-platforms
func IsCPMSActive(ctx context.Context, t TestingT, cpmsClient machinev1client.ControlPlaneMachineSetInterface) (bool, error) {
// The CPMS singleton in the "openshift-machine-api" namespace is named "cluster"
// https://github.com/openshift/cluster-control-plane-machine-set-operator/blob/bba395abab62fc12de4a9b9b030700546f4b822e/pkg/controllers/controlplanemachineset/controller.go#L50-L53
cpms, err := cpmsClient.Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}

// The CPMS state must be active in order for the platform to be supported
// See https://github.com/openshift/cluster-control-plane-machine-set-operator/blob/7961d1457c6aef26d3b1dafae962da2a2aba18ef/docs/user/installation.md#anatomy-of-a-controlplanemachineset
if cpms.Spec.State != machinev1.ControlPlaneMachineSetStateActive {
return false, nil
}

return true, nil
}

// EnsureReadyReplicasOnCPMS checks if status.readyReplicas on the cluster CPMS is n
// this effectively counts the number of control-plane machines with the provider state as running
func EnsureReadyReplicasOnCPMS(ctx context.Context, t TestingT, expectedReplicaCount int, cpmsClient machinev1client.ControlPlaneMachineSetInterface, nodeClient v1.NodeInterface) error {
waitPollInterval := 5 * time.Second
waitPollTimeout := 30 * time.Minute
t.Logf("Waiting up to %s for the CPMS to have status.readyReplicas = %v", waitPollTimeout.String(), expectedReplicaCount)

return wait.Poll(waitPollInterval, waitPollTimeout, func() (bool, error) {
cpms, err := cpmsClient.Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
return isTransientAPIError(t, err)
}

if cpms.Status.ReadyReplicas != int32(expectedReplicaCount) {
t.Logf("expected %d ready replicas on CPMS, got: %v,", expectedReplicaCount, cpms.Status.ReadyReplicas)
return false, nil
}
t.Logf("CPMS has reached the desired number of ready replicas: %v,", cpms.Status.ReadyReplicas)

err = EnsureReadyMasterNodes(ctx, expectedReplicaCount, nodeClient)
if err != nil {
t.Logf("expected number of master nodes is not ready yet: '%w'", err)
return false, nil
}

return true, nil
})
}

// EnsureReadyMasterNodes checks if the current master nodes matches the expected number of master nodes,
// and that all master nodes' are Ready
func EnsureReadyMasterNodes(ctx context.Context, expectedReplicaCount int, nodeClient v1.NodeInterface) error {
masterNodes, err := nodeClient.List(ctx, metav1.ListOptions{LabelSelector: masterNodeRoleLabel})
if err != nil {
return fmt.Errorf("failed to list master nodes:'%w'", err)
}

if len(masterNodes.Items) != expectedReplicaCount {
return fmt.Errorf("expected number of master nodes is '%d', but got '%d' instead", expectedReplicaCount, len(masterNodes.Items))
}

for _, node := range masterNodes.Items {
for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady && condition.Status != corev1.ConditionTrue {
return fmt.Errorf("master node '%v' is not ready", node)
}
}
}

return nil
}

// EnsureCPMSReplicasConverged returns error if the number of expected master machines not equals the number of actual master machines
// otherwise it returns nil
func EnsureCPMSReplicasConverged(ctx context.Context, cpmsClient machinev1client.ControlPlaneMachineSetInterface) error {
cpms, err := cpmsClient.Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get controlPlaneMachineSet object: '%w'", err)
}

if *cpms.Spec.Replicas != cpms.Status.ReadyReplicas {
return fmt.Errorf("CPMS replicas failed to converge, expected status.readyReplicas '%d' to be equal to spec.replicas '%v'", cpms.Status.ReadyReplicas, cpms.Spec.Replicas)
}
return nil
}

// EnsureVotingMembersCount counts the number of voting etcd members, it doesn't evaluate health conditions or any other attributes (i.e. name) of individual members
// this method won't fail immediately on errors, this is useful during scaling down operation until the feature can ensure this operation to be graceful
func EnsureVotingMembersCount(ctx context.Context, t TestingT, etcdClientFactory EtcdClientCreator, kubeClient kubernetes.Interface, expectedMembersCount int) error {
Expand Down Expand Up @@ -295,7 +417,7 @@ func MachineNameToEtcdMemberName(ctx context.Context, kubeClient kubernetes.Inte
return "", err
}

masterNodes, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/master"})
masterNodes, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: masterNodeRoleLabel})
if err != nil {
return "", err
}
Expand Down
67 changes: 66 additions & 1 deletion test/extended/etcd/vertical_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
)

var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd/scaling] etcd [apigroup:config.openshift.io]", func() {
var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd/scaling] etcd", func() {
defer g.GinkgoRecover()
oc := exutil.NewCLIWithoutNamespace("etcd-scaling").AsAdmin()

Expand All @@ -44,6 +44,8 @@ var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd
machineClientSet, err := machineclient.NewForConfig(oc.KubeFramework().ClientConfig())
o.Expect(err).ToNot(o.HaveOccurred())
machineClient := machineClientSet.MachineV1beta1().Machines("openshift-machine-api")
nodeClient := oc.KubeClient().CoreV1().Nodes()
cpmsClient := machineClientSet.MachineV1().ControlPlaneMachineSets("openshift-machine-api")
kubeClient := oc.KubeClient()

// make sure it can be run on the current platform
Expand All @@ -54,6 +56,69 @@ var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd
err = errors.Wrap(err, "pre-test: timed out waiting for initial cluster state to have 3 running machines and 3 voting members")
o.Expect(err).ToNot(o.HaveOccurred())

cpmsActive, err := scalingtestinglibrary.IsCPMSActive(ctx, g.GinkgoT(), cpmsClient)
err = errors.Wrap(err, "pre-test: failed to determine if ControlPlaneMachineSet is present and active")
o.Expect(err).ToNot(o.HaveOccurred())

if cpmsActive {
// TODO: Add cleanup step to recover back to 3 running machines and members if the test fails

framework.Logf("CPMS is active. Relying on CPMSO to replace the machine during vertical scaling")

// step 1: delete a running machine to trigger the CPMSO to create a new one to replace it
machineName, err := scalingtestinglibrary.DeleteSingleMachine(ctx, g.GinkgoT(), machineClient)
o.Expect(err).ToNot(o.HaveOccurred())
framework.Logf("Waiting for machine %q pending deletion to be replaced", machineName)

memberName, err := scalingtestinglibrary.MachineNameToEtcdMemberName(ctx, oc.KubeClient(), machineClient, machineName)
err = errors.Wrapf(err, "failed to get etcd member name for deleted machine: %v", machineName)
o.Expect(err).ToNot(o.HaveOccurred())

// step 2: wait until the CPMSO scales-up by creating a new machine
// We need to check the cpms' status.readyReplicas because the phase of one machine will always be Deleting
// so we can't use EnsureMasterMachinesAndCount() since that counts for machines that aren't pending deletion
err = scalingtestinglibrary.EnsureReadyReplicasOnCPMS(ctx, g.GinkgoT(), 4, cpmsClient, nodeClient)
err = errors.Wrap(err, "scale-up: timed out waiting for CPMS to show 4 ready replicas")
o.Expect(err).ToNot(o.HaveOccurred())

// We can't check for 4 members here as the clustermemberremoval controller will race to
// remove the old member (from the machine pending deletion) as soon as the new machine's member
// is promoted to a voting member.
// Instead we just wait until the CPMS shows 3 replicas again which indicates that the new member was added
// successfully

// step 3: wait for automatic scale-down as the replica count goes back down to 3
err = scalingtestinglibrary.EnsureReadyReplicasOnCPMS(ctx, g.GinkgoT(), 3, cpmsClient, nodeClient)
err = errors.Wrap(err, "scale-down: timed out waiting for CPMS to show 3 ready replicas")
o.Expect(err).ToNot(o.HaveOccurred())

err = scalingtestinglibrary.EnsureVotingMembersCount(ctx, g.GinkgoT(), etcdClientFactory, kubeClient, 3)
err = errors.Wrap(err, "scale-down: timed out waiting for 3 voting members in the etcd cluster and etcd-endpoints configmap")
o.Expect(err).ToNot(o.HaveOccurred())

err = scalingtestinglibrary.EnsureMemberRemoved(g.GinkgoT(), etcdClientFactory, memberName)
err = errors.Wrapf(err, "scale-down: timed out waiting for member (%v) to be removed", memberName)
o.Expect(err).ToNot(o.HaveOccurred())

err = scalingtestinglibrary.EnsureMasterMachinesAndCount(ctx, g.GinkgoT(), machineClient)
err = errors.Wrap(err, "scale-down: timed out waiting for only 3 Running master machines")
o.Expect(err).ToNot(o.HaveOccurred())

// step 4: Wait for apiserver revision rollout to stabilize
g.GinkgoT().Log("waiting for api servers to stabilize on the same revision")
err = testlibraryapi.WaitForAPIServerToStabilizeOnTheSameRevision(g.GinkgoT(), oc.KubeClient().CoreV1().Pods("openshift-kube-apiserver"))
err = errors.Wrap(err, "scale-up: timed out waiting for APIServer pods to stabilize on the same revision")
o.Expect(err).ToNot(o.HaveOccurred())

err = scalingtestinglibrary.EnsureCPMSReplicasConverged(ctx, cpmsClient)
o.Expect(err).ToNot(o.HaveOccurred())

return
}

// For a non-CPMS supported platform the test resorts to manually creating and deleting a machine
framework.Logf("CPMS is inactive. The test will manually add and remove a machine for vertical scaling")

// step 0: ensure clean state after the test
defer func() {
// since the deletion triggers a new rollout
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 06127b0

Please sign in to comment.