-
Notifications
You must be signed in to change notification settings - Fork 157
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
wip: e2e for testing the fallback scenario
- Loading branch information
1 parent
92984b3
commit d9c6e3b
Showing
2 changed files
with
282 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package e2e_sno_disruptive | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
configv1client "github.com/openshift/client-go/config/clientset/versioned" | ||
configv1 "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1" | ||
operatorv1client "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1" | ||
libgotest "github.com/openshift/library-go/test/library" | ||
|
||
"k8s.io/client-go/kubernetes" | ||
) | ||
|
||
type clientSet struct { | ||
Infra configv1.InfrastructureInterface | ||
Operator operatorv1client.KubeAPIServerInterface | ||
Kube kubernetes.Interface | ||
} | ||
|
||
func getClients(t testing.TB) clientSet { | ||
t.Helper() | ||
|
||
kubeConfig, err := libgotest.NewClientConfigForTest() | ||
require.NoError(t, err) | ||
kubeClient := kubernetes.NewForConfigOrDie(kubeConfig) | ||
|
||
operatorClient, err := operatorv1client.NewForConfig(kubeConfig) | ||
require.NoError(t, err) | ||
|
||
configClient, err := configv1client.NewForConfig(kubeConfig) | ||
require.NoError(t, err) | ||
|
||
return clientSet{Infra: configClient.ConfigV1().Infrastructures(), Operator: operatorClient.KubeAPIServers(), Kube: kubeClient} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,250 @@ | ||
package e2e_sno_disruptive | ||
|
||
import "testing" | ||
import ( | ||
"context" | ||
"encoding/json" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
func TestFallback(t *testing.T) { | ||
t.Log("implement me") | ||
"github.com/davecgh/go-spew/spew" | ||
"github.com/stretchr/testify/require" | ||
|
||
configv1 "github.com/openshift/api/config/v1" | ||
"github.com/openshift/library-go/pkg/operator/staticpod/startupmonitor/annotations" | ||
"github.com/openshift/library-go/pkg/operator/v1helpers" | ||
commontesthelpers "github.com/openshift/library-go/test/library/encryption" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"k8s.io/client-go/util/retry" | ||
) | ||
|
||
func TestFallback(tt *testing.T) { | ||
t := commontesthelpers.NewE(tt) | ||
cs := getClients(t) | ||
t.Log("Starting the fallback test") | ||
|
||
clusterStateWaitPollTimeout, clusterMustBeReadyFor, waitForFallbackDegradedConditionTimeout := fallbackTimeoutsForCurrentPlatform(t, cs) | ||
|
||
// before starting a new test make sure the current state of the cluster is good | ||
ensureClusterInGoodState(t, cs, clusterStateWaitPollTimeout, clusterMustBeReadyFor) | ||
|
||
// cause a disruption | ||
cfg := getDefaultUnsupportedConfigForCurrentPlatform(t, cs) | ||
cfg["apiServerArguments"] = map[string][]string{"non-existing-flag": {"true"}} | ||
setUnsupportedConfig(t, cs, cfg) | ||
|
||
// validate if the fallback condition is reported and the cluster is stable | ||
waitForFallbackDegradedCondition(t, cs, waitForFallbackDegradedConditionTimeout) | ||
nodeName, failedRevision := assertFallbackOnNodeStatus(t, cs) | ||
assertKasPodAnnotatedOnNode(t, cs, failedRevision, "*", "*", nodeName) | ||
// TODO: try kas - get pod/ns ? | ||
// TODO: check if pod is ready ? | ||
|
||
// clean up | ||
setUnsupportedConfig(t, cs, getDefaultUnsupportedConfigForCurrentPlatform(t, cs)) | ||
if err := clusterInGoodState(t, cs, clusterStateWaitPollTimeout, clusterMustBeReadyFor); err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
// ensureClusterInGoodState makes sure the cluster is stable: is not progressing and doesn't have a failed revision for mustBeReadyFor period | ||
// otherwise after waitPollTimeout the unsupportedConfigOverrides will be reset to bring the cluster into a stable state. | ||
func ensureClusterInGoodState(t testing.TB, cs clientSet, waitPollTimeout, mustBeReadyFor time.Duration) { | ||
if err := clusterInGoodState(t, cs, waitPollTimeout, mustBeReadyFor); err != nil { | ||
t.Logf("Trying to bring the cluster back to normal state by removing all unsupportedConfigOverrides entries, err: %v", err) | ||
setUnsupportedConfig(t, cs, getDefaultUnsupportedConfigForCurrentPlatform(t, cs)) | ||
err := clusterInGoodState(t, cs, waitPollTimeout, mustBeReadyFor) | ||
require.NoError(t, err) | ||
} | ||
} | ||
|
||
// clusterInGoodState checks if the cluster is stable: is not progressing and doesn't have a failed revision for mustBeReadyFor period | ||
func clusterInGoodState(t testing.TB, cs clientSet, waitPollTimeout, mustBeReadyFor time.Duration) error { | ||
t.Helper() | ||
|
||
startTs := time.Now() | ||
t.Logf("Waiting %s for the cluster to be in a good condition, interval = 10s, timeout %v", mustBeReadyFor.String(), waitPollTimeout) | ||
|
||
return wait.Poll(10*time.Second, waitPollTimeout, func() (bool, error) { | ||
ckaso, err := cs.Operator.Get(context.TODO(), "cluster", metav1.GetOptions{}) | ||
require.NoError(t, err) | ||
|
||
for _, ns := range ckaso.Status.NodeStatuses { | ||
if ckaso.Status.LatestAvailableRevision != ns.CurrentRevision || ns.LastFallbackCount > 0 || ns.LastFailedRevision > 0 || ns.TargetRevision > 0 { | ||
t.Logf("Node %s is either progressing or has a failed revision, latestAvailableRevision: %v, nodeStatus: %v", ns.NodeName, ckaso.Status.LatestAvailableRevision, spew.Sdump(ns)) | ||
return false, nil /*retry*/ | ||
} | ||
} | ||
|
||
if time.Since(startTs) > mustBeReadyFor { | ||
t.Logf("The cluster has been in good condition for %s", mustBeReadyFor.String()) | ||
return true, nil /*done*/ | ||
} | ||
return false, nil /*wait a bit more*/ | ||
}) | ||
} | ||
|
||
// setUnsupportedConfig simply sets UnsupportedConfigOverrides config to the provided cfg | ||
func setUnsupportedConfig(t testing.TB, cs clientSet, cfg map[string]interface{}) { | ||
t.Helper() | ||
|
||
t.Logf("Setting UnsupportedConfigOverrides to %v", cfg) | ||
err := retry.OnError(retry.DefaultRetry, func(error) bool { return true }, func() error { | ||
raw, err := json.Marshal(cfg) | ||
if err != nil { | ||
t.Log(err) | ||
return err | ||
} | ||
ckaso, err := cs.Operator.Get(context.TODO(), "cluster", metav1.GetOptions{}) | ||
if err != nil { | ||
t.Log(err) | ||
return err | ||
} | ||
ckaso.Spec.UnsupportedConfigOverrides.Raw = raw | ||
_, err = cs.Operator.Update(context.TODO(), ckaso, metav1.UpdateOptions{}) | ||
if err != nil { | ||
t.Log(err) | ||
} | ||
return err | ||
}) | ||
require.NoError(t, err) | ||
} | ||
|
||
// waitForFallbackDegradedCondition waits until StaticPodFallbackRevisionDegraded condition is set to true | ||
func waitForFallbackDegradedCondition(t testing.TB, cs clientSet, waitPollTimeout time.Duration) { | ||
t.Helper() | ||
|
||
t.Log("Waiting for StaticPodFallbackRevisionDegraded condition, interval = 20s, timeout = 6min") | ||
err := wait.Poll(20*time.Second, waitPollTimeout, func() (bool, error) { | ||
ckaso, err := cs.Operator.Get(context.TODO(), "cluster", metav1.GetOptions{}) | ||
if err != nil { | ||
t.Logf("unable to get kube-apiserver-operator resource: %v", err) | ||
return false, nil /*retry*/ | ||
} | ||
|
||
if v1helpers.IsOperatorConditionFalse(ckaso.Status.Conditions, "StaticPodFallbackRevisionDegraded") { | ||
t.Logf("StaticPodFallbackRevisionDegraded condition hasn't been set yet") | ||
return false, nil /*retry*/ | ||
} | ||
|
||
return true, nil /*done*/ | ||
}) | ||
require.NoError(t, err) | ||
} | ||
|
||
func assertFallbackOnNodeStatus(t testing.TB, cs clientSet) (string, int32) { | ||
t.Helper() | ||
|
||
t.Log("Checking if a NodeStatus has been updated to report the fallback condition") | ||
|
||
ckaso, err := cs.Operator.Get(context.TODO(), "cluster", metav1.GetOptions{}) | ||
require.NoError(t, err) | ||
|
||
var nodeName string | ||
var failedRevision int32 | ||
for _, ns := range ckaso.Status.NodeStatuses { | ||
if ns.LastFallbackCount != 0 && len(nodeName) > 0 { | ||
t.Fatalf("multiple node statuses report the fallback, previously on node %v, revision %v, currently on node %v, revision %v", nodeName, failedRevision, ns.NodeName, ns.LastFailedRevision) | ||
} | ||
if ns.LastFallbackCount != 0 { | ||
nodeName = ns.NodeName | ||
failedRevision = ns.LastFailedRevision | ||
} | ||
} | ||
|
||
t.Logf("The fallback has been reported on node %v, failed revision is %v", nodeName, failedRevision) | ||
return nodeName, failedRevision | ||
} | ||
|
||
func assertKasPodAnnotatedOnNode(t testing.TB, cs clientSet, expectedFailedRevision int32, expectedFallbackReason, expectedFallbackMessage, nodeName string) { | ||
t.Helper() | ||
t.Logf("Verifying if a kube-apiserver pod has been annotated with rev: %v, reason: %v, message: %v on node: %v", expectedFailedRevision, expectedFallbackReason, expectedFallbackMessage, nodeName) | ||
|
||
kasPods, err := cs.Kube.CoreV1().Pods("openshift-kube-apiserver").List(context.TODO(), metav1.ListOptions{LabelSelector: "apiserver=true"}) | ||
require.NoError(t, err) | ||
|
||
var kasPod corev1.Pod | ||
filteredKasPods := filterByNodeName(kasPods.Items, nodeName) | ||
switch len(filteredKasPods) { | ||
case 0: | ||
t.Fatalf("expected to find the kube-apiserver static pod on node %s but haven't found any", nodeName) | ||
case 1: | ||
kasPod = filteredKasPods[0] | ||
default: | ||
// this should never happen for static pod as they are uniquely named for each node | ||
podsOnCurrentNode := []string{} | ||
for _, filteredKasPod := range filteredKasPods { | ||
podsOnCurrentNode = append(podsOnCurrentNode, filteredKasPod.Name) | ||
} | ||
t.Fatalf("multiple kube-apiserver static pods for node %s found: %v", nodeName, podsOnCurrentNode) | ||
} | ||
|
||
if fallbackFor, ok := kasPod.Annotations[annotations.FallbackForRevision]; ok { | ||
if len(fallbackFor) == 0 { | ||
t.Fatalf("empty fallback revision label: %v on %s pod", annotations.FallbackForRevision, kasPod.Name) | ||
} | ||
revision, err := strconv.Atoi(fallbackFor) | ||
if err != nil || revision < 0 { | ||
t.Fatalf("invalid fallback revision: %v on pod: %s", fallbackFor, kasPod.Name) | ||
} | ||
|
||
reason, message := "", "" | ||
if s, ok := kasPod.Annotations[annotations.FallbackReason]; ok { | ||
reason = s | ||
} | ||
if s, ok := kasPod.Annotations[annotations.FallbackMessage]; ok { | ||
message = s | ||
} | ||
|
||
if expectedFallbackReason != "*" && expectedFallbackReason != reason { | ||
t.Fatalf("unexpected fallback reason: %v, expected: %v", reason, expectedFallbackReason) | ||
} else if len(reason) == 0 { | ||
t.Fatal("empty fallback reason") | ||
} | ||
if expectedFallbackMessage != "*" && expectedFallbackMessage != message { | ||
t.Fatalf("unexpected fallback message: %v, expected: %v", message, expectedFallbackMessage) | ||
} else if len(message) == 0 { | ||
t.Fatal("empty fallback message") | ||
} | ||
return | ||
} | ||
|
||
t.Fatalf("kube-apiserver %v on node %v hasn't been annotated with %v", kasPod.Name, nodeName, annotations.FallbackForRevision) | ||
} | ||
|
||
func filterByNodeName(kasPods []corev1.Pod, currentNodeName string) []corev1.Pod { | ||
filteredKasPods := []corev1.Pod{} | ||
|
||
for _, potentialKasPods := range kasPods { | ||
if potentialKasPods.Spec.NodeName == currentNodeName { | ||
filteredKasPods = append(filteredKasPods, potentialKasPods) | ||
} | ||
} | ||
|
||
return filteredKasPods | ||
} | ||
|
||
func getDefaultUnsupportedConfigForCurrentPlatform(t testing.TB, cs clientSet) map[string]interface{} { | ||
t.Helper() | ||
|
||
infraConfiguration, err := cs.Infra.Get(context.TODO(), "cluster", metav1.GetOptions{}) | ||
require.NoError(t, err) | ||
|
||
if infraConfiguration.Status.ControlPlaneTopology != configv1.SingleReplicaTopologyMode { | ||
return map[string]interface{}{"startupMonitor": true} | ||
} | ||
|
||
return map[string]interface{}{} | ||
} | ||
|
||
//fallbackTimeoutsForCurrentPlatform provides various timeouts that are tailored for the current platform | ||
//TODO: add timeouts for AWS and GCP | ||
func fallbackTimeoutsForCurrentPlatform(t testing.TB, cs clientSet) (time.Duration, time.Duration, time.Duration) { | ||
// defaults for SNO cluster | ||
return 6 * time.Minute, //clusterStateWaitPollInterval the max time after the cluster is considered not ready | ||
1 * time.Minute, //clusterMustBeReadyFor the time the cluster must stay stable | ||
6 * time.Minute //waitForFallbackDegradedConditionTimeout set to 6 min because the static pod monitor have 5 min to fallback to the previous revision | ||
} |