-
Notifications
You must be signed in to change notification settings - Fork 157
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
wip: e2e for testing the fallback scenario
- Loading branch information
1 parent
ac9fc0b
commit b19d13c
Showing
2 changed files
with
270 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package e2e_sno_disruptive | ||
|
||
import ( | ||
"github.com/stretchr/testify/require" | ||
"testing" | ||
|
||
operatorv1client "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1" | ||
libgotest "github.com/openshift/library-go/test/library" | ||
|
||
"k8s.io/client-go/kubernetes" | ||
) | ||
|
||
type clientSet struct { | ||
Operator operatorv1client.KubeAPIServerInterface | ||
Kube kubernetes.Interface | ||
} | ||
|
||
func getClients(t testing.TB) clientSet { | ||
t.Helper() | ||
|
||
kubeConfig, err := libgotest.NewClientConfigForTest() | ||
require.NoError(t, err) | ||
kubeClient := kubernetes.NewForConfigOrDie(kubeConfig) | ||
|
||
operatorClient, err := operatorv1client.NewForConfig(kubeConfig) | ||
require.NoError(t, err) | ||
|
||
return clientSet{Operator: operatorClient.KubeAPIServers(), Kube: kubeClient} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,245 @@ | ||
package e2e_sno_disruptive | ||
|
||
import "testing" | ||
import ( | ||
"context" | ||
"encoding/json" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
func TestFallback(t *testing.T) { | ||
t.Log("implement me") | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/openshift/library-go/pkg/operator/staticpod/startupmonitor/annotations" | ||
"github.com/openshift/library-go/pkg/operator/v1helpers" | ||
commontesthelpers "github.com/openshift/library-go/test/library/encryption" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"k8s.io/client-go/util/retry" | ||
) | ||
|
||
func TestFallback(tt *testing.T) { | ||
t := commontesthelpers.NewE(tt) | ||
cs := getClients(t) | ||
t.Log("Starting the fallback test") | ||
|
||
// before starting a new test make sure the current state of the cluster is good | ||
ensureClusterInGoodState(t, cs, 10*time.Second, 6*time.Minute, 1*time.Minute) | ||
|
||
// cause a disruption | ||
data := map[string]map[string]string{} | ||
setUnsupportedConfig(t, cs, data) | ||
|
||
// validate if the fallback condition is reported and the cluster is stable | ||
waitForFallbackDegradedCondition(t, cs) | ||
nodeName, failedRevision := assertNodeStatus(t, cs) | ||
assertKasPodAnnotatedOnNode(t, cs, failedRevision, "*", "*", nodeName) | ||
// TODO: try kas - get pod/ns ? | ||
// TODO: check if pod is ready ? | ||
|
||
// clean up | ||
setUnsupportedConfig(t, cs, map[string]map[string]string{}) | ||
if err := clusterInGoodState(t, cs, 10*time.Second, 6*time.Minute, 1*time.Minute); err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
func ensureClusterInGoodState(t testing.TB, cs clientSet, waitPollInterval, waitPollTimeout, mustBeReadyFor time.Duration) { | ||
if err := clusterInGoodState(t, cs, waitPollInterval, waitPollTimeout, mustBeReadyFor); err != nil { | ||
t.Logf("Trying to bring the cluster back to normal state by removing all unsupportedConfigOverrides entries, err: %v", err) | ||
setUnsupportedConfig(t, cs, map[string]map[string]string{}) | ||
err := clusterInGoodState(t, cs, waitPollInterval, waitPollTimeout, mustBeReadyFor) | ||
require.NoError(t, err) | ||
} | ||
} | ||
|
||
func clusterInGoodState(t testing.TB, cs clientSet, waitPollInterval, waitPollTimeout, mustBeReadyFor time.Duration) error { | ||
t.Helper() | ||
|
||
startTs := time.Now() | ||
t.Logf("Waiting %s for the cluster to be in a good condition, interval = %v, timeout %v", mustBeReadyFor.String(), waitPollInterval, waitPollTimeout) | ||
|
||
return wait.Poll(waitPollInterval, waitPollTimeout, func() (bool, error) { | ||
monitorPod, err := cs.Kube.CoreV1().Pods("openshift-kube-apiserver").Get(context.TODO(), "kube-apiserver-startup-monitor", metav1.GetOptions{}) | ||
if err == nil { | ||
t.Logf("the static pod monitor: %v is running since: %v", monitorPod.Name, monitorPod.CreationTimestamp) | ||
return false, nil /*retry*/ | ||
} else if !errors.IsNotFound(err) { | ||
t.Logf("error while getting kube-apiserver-startup-monitor: %v", err) | ||
return false, nil /*retry*/ | ||
} | ||
|
||
kasPods, err := cs.Kube.CoreV1().Pods("openshift-kube-apiserver").List(context.TODO(), metav1.ListOptions{LabelSelector: "apiserver=true"}) | ||
if err != nil { | ||
t.Logf("error while getting kube-apiserver pods: %v", err) | ||
return false, nil /*retry*/ | ||
} | ||
if len(kasPods.Items) == 0 { | ||
t.Log("expected to find the kube-apiserver static pod but haven't found any") | ||
return false, nil /*retry*/ | ||
} | ||
for _, kasPod := range kasPods.Items { | ||
if fallbackFor, ok := kasPod.Annotations[annotations.FallbackForRevision]; ok { | ||
t.Logf("%v pod has %v annotation set to: %v", kasPod.Name, annotations.FallbackForRevision, fallbackFor) | ||
return false, nil /*retry*/ | ||
} | ||
} | ||
// TODO: on an HA cluster we could also check if pods are on the same revision | ||
|
||
ckaso, err := cs.Operator.Get(context.TODO(), "cluster", metav1.GetOptions{}) | ||
if err != nil { | ||
t.Logf("unable to get kube-apiserver-operator resource: %v", err) | ||
return false, nil /*retry*/ | ||
} | ||
if !v1helpers.IsOperatorConditionFalse(ckaso.Status.Conditions, "StaticPodFallbackRevisionDegraded") { | ||
t.Logf("StaticPodFallbackRevisionDegraded condition is set") | ||
return false, nil /*retry*/ | ||
} | ||
|
||
if time.Since(startTs) > mustBeReadyFor { | ||
t.Logf("The cluster has been in good condition for %s", mustBeReadyFor.String()) | ||
return true, nil /*done*/ | ||
} | ||
return false, nil /*wait a bit more*/ | ||
}) | ||
} | ||
|
||
func setUnsupportedConfig(t testing.TB, cs clientSet, cfg map[string]map[string]string) { | ||
t.Helper() | ||
|
||
t.Logf("Setting UnsupportedConfigOverrides to %v", cfg) | ||
err := retry.OnError(retry.DefaultRetry, func(error) bool { return true }, func() error { | ||
raw, err := json.Marshal(cfg) | ||
if err != nil { | ||
t.Log(err) | ||
return err | ||
} | ||
ckaso, err := cs.Operator.Get(context.TODO(), "cluster", metav1.GetOptions{}) | ||
if err != nil { | ||
t.Log(err) | ||
return err | ||
} | ||
ckaso.Spec.UnsupportedConfigOverrides.Raw = raw | ||
_, err = cs.Operator.Update(context.TODO(), ckaso, metav1.UpdateOptions{}) | ||
if err != nil { | ||
t.Log(err) | ||
} | ||
return err | ||
}) | ||
require.NoError(t, err) | ||
} | ||
|
||
func waitForFallbackDegradedCondition(t testing.TB, cs clientSet) { | ||
t.Helper() | ||
|
||
t.Log("Waiting for StaticPodFallbackRevisionDegraded condition, interval = 20s, timeout = 6min") | ||
err := wait.Poll(20*time.Second, 6*time.Minute, func() (bool, error) { | ||
ckaso, err := cs.Operator.Get(context.TODO(), "cluster", metav1.GetOptions{}) | ||
if err != nil { | ||
t.Logf("unable to get kube-apiserver-operator resource: %v", err) | ||
return false, nil /*retry*/ | ||
} | ||
|
||
if v1helpers.IsOperatorConditionFalse(ckaso.Status.Conditions, "StaticPodFallbackRevisionDegraded") { | ||
t.Logf("StaticPodFallbackRevisionDegraded condition hasn't been set yet") | ||
return false, nil /*retry*/ | ||
} | ||
|
||
return true, nil /*done*/ | ||
}) | ||
require.NoError(t, err) | ||
} | ||
|
||
func assertNodeStatus(t testing.TB, cs clientSet) (string, int32) { | ||
t.Helper() | ||
|
||
t.Log("Checking if a NodeStatus has been updated to report the fallback condition") | ||
|
||
ckaso, err := cs.Operator.Get(context.TODO(), "cluster", metav1.GetOptions{}) | ||
require.NoError(t, err) | ||
|
||
var nodeName string | ||
var failedRevision int32 | ||
for _, ns := range ckaso.Status.NodeStatuses { | ||
if ns.LastFallbackCount != 0 && len(nodeName) > 0 { | ||
t.Fatalf("multiple node statuses report the fallback, previously on node %v, revision %v, currently on node %v, revision %v", nodeName, failedRevision, ns.NodeName, ns.LastFailedRevision) | ||
} | ||
if ns.LastFallbackCount != 0 { | ||
nodeName = ns.NodeName | ||
failedRevision = ns.LastFailedRevision | ||
} | ||
} | ||
|
||
t.Logf("The fallback has been reported on node %v, failed revision is %v", nodeName, failedRevision) | ||
return nodeName, failedRevision | ||
} | ||
|
||
func assertKasPodAnnotatedOnNode(t testing.TB, cs clientSet, expectedFailedRevision int32, expectedFallbackReason, expectedFallbackMessage, nodeName string) { | ||
t.Helper() | ||
t.Logf("Verifying if a kube-apiserver pod has been annotated with rev: %v, reason: %v, message: %v on node: %v", expectedFailedRevision, expectedFallbackReason, expectedFallbackMessage, nodeName) | ||
|
||
kasPods, err := cs.Kube.CoreV1().Pods("openshift-kube-apiserver").List(context.TODO(), metav1.ListOptions{LabelSelector: "apiserver=true"}) | ||
require.NoError(t, err) | ||
|
||
var kasPod corev1.Pod | ||
filteredKasPods := filterByNodeName(kasPods.Items, nodeName) | ||
switch len(filteredKasPods) { | ||
case 0: | ||
t.Fatalf("expected to find the kube-apiserver static pod on node %s but haven't found any", nodeName) | ||
case 1: | ||
kasPod = filteredKasPods[0] | ||
default: | ||
// this should never happen for static pod as they are uniquely named for each node | ||
podsOnCurrentNode := []string{} | ||
for _, filteredKasPod := range filteredKasPods { | ||
podsOnCurrentNode = append(podsOnCurrentNode, filteredKasPod.Name) | ||
} | ||
t.Fatalf("multiple kube-apiserver static pods for node %s found: %v", nodeName, podsOnCurrentNode) | ||
} | ||
|
||
if fallbackFor, ok := kasPod.Annotations[annotations.FallbackForRevision]; ok { | ||
if len(fallbackFor) == 0 { | ||
t.Fatalf("empty fallback revision label: %v on %s pod", annotations.FallbackForRevision, kasPod.Name) | ||
} | ||
revision, err := strconv.Atoi(fallbackFor) | ||
if err != nil || revision < 0 { | ||
t.Fatalf("invalid fallback revision: %v on pod: %s", fallbackFor, kasPod.Name) | ||
} | ||
|
||
reason, message := "", "" | ||
if s, ok := kasPod.Annotations[annotations.FallbackReason]; ok { | ||
reason = s | ||
} | ||
if s, ok := kasPod.Annotations[annotations.FallbackMessage]; ok { | ||
message = s | ||
} | ||
|
||
if expectedFallbackReason != "*" && expectedFallbackReason != reason { | ||
t.Fatalf("unexpected fallback reason: %v, expected: %v", reason, expectedFallbackReason) | ||
} else if len(reason) == 0 { | ||
t.Fatal("empty fallback reason") | ||
} | ||
if expectedFallbackMessage != "*" && expectedFallbackMessage != message { | ||
t.Fatalf("unexpected fallback message: %v, expected: %v", message, expectedFallbackMessage) | ||
} else if len(message) == 0 { | ||
t.Fatal("empty fallback message") | ||
} | ||
return | ||
} | ||
|
||
t.Fatalf("kube-apiserver %v on node %v hasn't been annotated with %v", kasPod.Name, nodeName, annotations.FallbackForRevision) | ||
} | ||
|
||
func filterByNodeName(kasPods []corev1.Pod, currentNodeName string) []corev1.Pod { | ||
filteredKasPods := []corev1.Pod{} | ||
|
||
for _, potentialKasPods := range kasPods { | ||
if potentialKasPods.Spec.NodeName == currentNodeName { | ||
filteredKasPods = append(filteredKasPods, potentialKasPods) | ||
} | ||
} | ||
|
||
return filteredKasPods | ||
} |