Skip to content

Commit

Permalink
OCPBUGS-26498: Add test for UpgradeValidation contention
Browse files Browse the repository at this point in the history
Add "The HAProxy router converges when multiple routers are
writing conflicting upgrade validation status" test which validates
router converge when writing conflicting status in a scenario that uses
multiple conditions.

Previously, we tested conflicting status fields (hostname), but don't
have a test for conflicting status. This test add logic that exercises
new logic in the router for the Upgrade Validation plugin.
  • Loading branch information
gcs278 committed Apr 23, 2024
1 parent 7379790 commit 4fd7700
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 89 deletions.
259 changes: 170 additions & 89 deletions test/extended/router/stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

routev1 "github.com/openshift/api/route/v1"
routeclientset "github.com/openshift/client-go/route/clientset/versioned"
v1 "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1"
exutil "github.com/openshift/origin/test/extended/util"
)

Expand Down Expand Up @@ -82,6 +83,7 @@ var _ = g.Describe("[sig-network][Feature:Router][apigroup:route.openshift.io]",
rs, err := oc.KubeClient().AppsV1().ReplicaSets(ns).Create(
context.Background(),
scaledRouter(
"router",
routerImage,
[]string{
"-v=4",
Expand All @@ -99,20 +101,7 @@ var _ = g.Describe("[sig-network][Feature:Router][apigroup:route.openshift.io]",
g.By("creating multiple routes")
client := routeclientset.NewForConfigOrDie(oc.AdminConfig()).RouteV1().Routes(ns)
var rv string
for i := 0; i < 10; i++ {
_, err := client.Create(context.Background(), &routev1.Route{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%d", i),
},
Spec: routev1.RouteSpec{
To: routev1.RouteTargetReference{Name: "test"},
Port: &routev1.RoutePort{
TargetPort: intstr.FromInt(8080),
},
},
}, metav1.CreateOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
}
createTestRoutes(client, 10)

g.By("waiting for all routes to have a status")
err = wait.Poll(time.Second, 2*time.Minute, func() (bool, error) {
Expand All @@ -139,23 +128,7 @@ var _ = g.Describe("[sig-network][Feature:Router][apigroup:route.openshift.io]",
o.Expect(err).NotTo(o.HaveOccurred())

g.By("verifying that we don't continue to write")
writes := 0
w, err := client.Watch(context.Background(), metav1.ListOptions{Watch: true, ResourceVersion: rv})
o.Expect(err).NotTo(o.HaveOccurred())
defer w.Stop()
timer := time.NewTimer(10 * time.Second)
ch := w.ResultChan()
Wait:
for i := 0; ; i++ {
select {
case _, ok := <-ch:
writes++
o.Expect(ok).To(o.BeTrue())
case <-timer.C:
break Wait
}
}
o.Expect(writes).To(o.BeNumerically("<=", 10))
verifyConflictingWrites(client, rv, 10*time.Second, 10, "")

verifyCommandEquivalent(oc.KubeClient(), rs, "md5sum /var/lib/haproxy/conf/*")
})
Expand All @@ -166,6 +139,7 @@ var _ = g.Describe("[sig-network][Feature:Router][apigroup:route.openshift.io]",
rs, err := oc.KubeClient().AppsV1().ReplicaSets(ns).Create(
context.Background(),
scaledRouter(
"router",
routerImage,
[]string{
"-v=4",
Expand All @@ -187,20 +161,7 @@ var _ = g.Describe("[sig-network][Feature:Router][apigroup:route.openshift.io]",
g.By("creating multiple routes")
client := routeclientset.NewForConfigOrDie(oc.AdminConfig()).RouteV1().Routes(ns)
var rv string
for i := 0; i < 20; i++ {
_, err := client.Create(context.Background(), &routev1.Route{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%d", i),
},
Spec: routev1.RouteSpec{
To: routev1.RouteTargetReference{Name: "test"},
Port: &routev1.RoutePort{
TargetPort: intstr.FromInt(8080),
},
},
}, metav1.CreateOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
}
createTestRoutes(client, 20)

g.By("waiting for sufficient routes to have a status")
err = wait.Poll(time.Second, 2*time.Minute, func() (bool, error) {
Expand Down Expand Up @@ -241,26 +202,7 @@ var _ = g.Describe("[sig-network][Feature:Router][apigroup:route.openshift.io]",
o.Expect(err).NotTo(o.HaveOccurred())

g.By("verifying that we stop writing conflicts rapidly")
writes := 0
w, err := client.Watch(context.Background(), metav1.ListOptions{Watch: true, ResourceVersion: rv})
o.Expect(err).NotTo(o.HaveOccurred())
func() {
defer w.Stop()
timer := time.NewTimer(15 * time.Second)
ch := w.ResultChan()
Wait:
for i := 0; ; i++ {
select {
case _, ok := <-ch:
writes++
o.Expect(ok).To(o.BeTrue())
case <-timer.C:
break Wait
}
}
// we expect to see no more than 10 writes per router (we should hit the hard limit) (3 replicas and 1 master)
o.Expect(writes).To(o.BeNumerically("<=", 50))
}()
verifyConflictingWrites(client, rv, 15*time.Second, 50, "")

// the os_http_be.map file will vary, so only check the haproxy config
verifyCommandEquivalent(oc.KubeClient(), rs, "md5sum /var/lib/haproxy/conf/haproxy.config")
Expand All @@ -270,34 +212,162 @@ var _ = g.Describe("[sig-network][Feature:Router][apigroup:route.openshift.io]",
o.Expect(err).NotTo(o.HaveOccurred())

g.By("verifying that only get a few updates")
writes = 0
w, err = client.Watch(context.Background(), metav1.ListOptions{Watch: true, ResourceVersion: route.ResourceVersion})
verifyConflictingWrites(client, route.ResourceVersion, 10*time.Second, 5, "9")
})

g.It("converges when multiple routers are writing conflicting upgrade validation status", func() {
g.By("deploying a scaled out namespace scoped router that adds the UnservableInFutureVersions condition")

rsAdd, err := oc.KubeClient().AppsV1().ReplicaSets(ns).Create(
context.Background(),
scaledRouter(
"router-add-condition",
routerImage,
[]string{
"-v=5",
fmt.Sprintf("--namespace=%s", ns),
// the contention tracker is resync / 10, so this will give us 2 minutes of contention tracking
"--resync-interval=20m",
"--name=conflicting",
"--debug-upgrade-validation-force-add-condition",
},
),
metav1.CreateOptions{},
)
o.Expect(err).NotTo(o.HaveOccurred())
func() {
defer w.Stop()
timer := time.NewTimer(10 * time.Second)
ch := w.ResultChan()
Wait:
for i := 0; ; i++ {
select {
case obj, ok := <-ch:
o.Expect(ok).To(o.BeTrue())
if r, ok := obj.Object.(*routev1.Route); ok {
if r == nil || r.Name != "9" {
continue
}
err = waitForReadyReplicaSet(oc.KubeClient(), ns, rsAdd.Name)
o.Expect(err).NotTo(o.HaveOccurred())

g.By("creating multiple routes")
client := routeclientset.NewForConfigOrDie(oc.AdminConfig()).RouteV1().Routes(ns)
var rv string
createTestRoutes(client, 20)

g.By("waiting for sufficient routes to have a UnservableInFutureVersions status condition")
err = wait.Poll(time.Second, 2*time.Minute, func() (bool, error) {
routes, err := client.List(context.Background(), metav1.ListOptions{})
if err != nil {
return false, err
}
o.Expect(routes.Items).To(o.HaveLen(20))
other := 0
unservableCondition := 0
for _, route := range routes.Items {
ingress := findIngress(&route, "conflicting")
if ingress == nil {
if len(route.Status.Ingress) > 0 {
other++
}
writes++
case <-timer.C:
break Wait
continue
}
if len(route.Status.Ingress) > 1 {
other++
}
// Find UnservableInFutureVersions condition.
if cond := findIngressCondition(ingress, routev1.RouteUnservableInFutureVersions); cond != nil {
unservableCondition++
o.Expect(ingress.Host).NotTo(o.BeEmpty())
o.Expect(ingress.Conditions).NotTo(o.BeEmpty())
o.Expect(cond.LastTransitionTime).NotTo(o.BeNil())
o.Expect(cond.Status).To(o.Equal(corev1.ConditionTrue))
}
}
o.Expect(writes).To(o.BeNumerically("<", 5))
}()
// if other routers are writing status, wait until we get a complete
// set since we don't have a way to tell other routers to ignore us
if unservableCondition < 3 && other%20 != 0 {
return false, nil
}
outputIngress(routes.Items...)
rv = routes.ResourceVersion
return true, nil
})
o.Expect(err).NotTo(o.HaveOccurred())

g.By("deploying a scaled out namespace scoped router that removes the UnservableInFutureVersions condition")
rsRemove, err := oc.KubeClient().AppsV1().ReplicaSets(ns).Create(
context.Background(),
scaledRouter(
"router-remove-condition",
routerImage,
[]string{
"-v=5",
fmt.Sprintf("--namespace=%s", ns),
// the contention tracker is resync / 10, so this will give us 2 minutes of contention tracking
"--resync-interval=20m",
"--name=conflicting",
"--debug-upgrade-validation-force-remove-condition",
},
),
metav1.CreateOptions{},
)
o.Expect(err).NotTo(o.HaveOccurred())
err = waitForReadyReplicaSet(oc.KubeClient(), ns, rsRemove.Name)
o.Expect(err).NotTo(o.HaveOccurred())

g.By("verifying that we stop writing conflicts rapidly")
verifyConflictingWrites(client, rv, 15*time.Second, 50, "")

g.By("clearing a single route's status")
route, err := client.Patch(context.Background(), "9", types.MergePatchType, []byte(`{"status":{"ingress":[]}}`), metav1.PatchOptions{}, "status")
o.Expect(err).NotTo(o.HaveOccurred())

g.By("verifying that only get a few updates")
verifyConflictingWrites(client, route.ResourceVersion, 10*time.Second, 5, "9")
})
})
})

// verifyConflictingWrites verifies that the router stops writing an object in a specified amount of time.
func verifyConflictingWrites(client v1.RouteInterface, rv string, observeTime time.Duration, writeLimit int, routeNameMatch string) {
writes := 0
w, err := client.Watch(context.Background(), metav1.ListOptions{Watch: true, ResourceVersion: rv})
o.Expect(err).NotTo(o.HaveOccurred())
func() {
defer w.Stop()
timer := time.NewTimer(observeTime)
ch := w.ResultChan()
Wait:
for i := 0; ; i++ {
select {
case obj, ok := <-ch:
o.Expect(ok).To(o.BeTrue())
if routeNameMatch != "" {
if r, ok := obj.Object.(*routev1.Route); ok {
if r == nil || routeNameMatch != routeNameMatch {
continue
}
}
}

writes++
case <-timer.C:
break Wait
}
}
e2e.Logf("wrote %d times", writes)
// we expect to see no more than 10 writes per router (we should hit the hard limit) (3 replicas and 1 master)
o.Expect(writes).To(o.BeNumerically("<=", writeLimit))
}()
}

// createTestRoutes creates test routes with the name as the index number.
func createTestRoutes(client v1.RouteInterface, numOfRoutes int) {
for i := 0; i < numOfRoutes; i++ {
_, err := client.Create(context.Background(), &routev1.Route{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%d", i),
},
Spec: routev1.RouteSpec{
To: routev1.RouteTargetReference{Name: "test"},
Port: &routev1.RoutePort{
TargetPort: intstr.FromInt(8080),
},
},
}, metav1.CreateOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
}
}

func findIngress(route *routev1.Route, name string) *routev1.RouteIngress {
for i, ingress := range route.Status.Ingress {
if ingress.RouterName == name {
Expand All @@ -307,21 +377,32 @@ func findIngress(route *routev1.Route, name string) *routev1.RouteIngress {
return nil
}

func scaledRouter(image string, args []string) *appsv1.ReplicaSet {
// findIngressCondition locates the first condition that corresponds to the requested type.
func findIngressCondition(ingress *routev1.RouteIngress, t routev1.RouteIngressConditionType) (_ *routev1.RouteIngressCondition) {
for i, existing := range ingress.Conditions {
if existing.Type != t {
continue
}
return &ingress.Conditions[i]
}
return nil
}

func scaledRouter(name, image string, args []string) *appsv1.ReplicaSet {
one := int64(1)
scale := int32(3)
return &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: "router",
Name: name,
},
Spec: appsv1.ReplicaSetSpec{
Replicas: &scale,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "router"},
MatchLabels: map[string]string{"app": name},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "router"},
Labels: map[string]string{"app": name},
},
Spec: corev1.PodSpec{
TerminationGracePeriodSeconds: &one,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4fd7700

Please sign in to comment.