Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release 4.14] OCPBUGS-16267: Fix controller reboot bug #155

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions e2etest/bgptests/bgp.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ var _ = ginkgo.Describe("BGP", func() {

allNodes, err := cs.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
framework.ExpectNoError(err)
validateDesiredLB(svc)
testservice.ValidateDesiredLB(svc)

for _, c := range FRRContainers {
validateService(svc, allNodes.Items, c)
Expand Down Expand Up @@ -156,7 +156,7 @@ var _ = ginkgo.Describe("BGP", func() {
testservice.TrafficPolicyCluster(svc)
})
defer testservice.Delete(cs, svc)
validateDesiredLB(svc)
testservice.ValidateDesiredLB(svc)

for _, c := range FRRContainers {
validateService(svc, allNodes.Items, c)
Expand Down Expand Up @@ -193,7 +193,7 @@ var _ = ginkgo.Describe("BGP", func() {
})
defer testservice.Delete(cs, svc)

validateDesiredLB(svc)
testservice.ValidateDesiredLB(svc)

err := jig.Scale(2)
framework.ExpectNoError(err)
Expand Down Expand Up @@ -280,8 +280,8 @@ var _ = ginkgo.Describe("BGP", func() {
})
defer testservice.Delete(cs, svc1)

validateDesiredLB(svc)
validateDesiredLB(svc1)
testservice.ValidateDesiredLB(svc)
testservice.ValidateDesiredLB(svc1)

for _, c := range FRRContainers {
validateService(svc, allNodes.Items, c)
Expand Down
18 changes: 1 addition & 17 deletions e2etest/bgptests/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"
"go.universe.tf/e2etest/pkg/executor"
"go.universe.tf/e2etest/pkg/frr"
frrcontainer "go.universe.tf/e2etest/pkg/frr/container"
Expand All @@ -21,6 +20,7 @@ import (
"go.universe.tf/e2etest/pkg/metallb"
"go.universe.tf/e2etest/pkg/routes"
"go.universe.tf/e2etest/pkg/wget"
metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -168,14 +168,6 @@ func checkBFDConfigPropagated(nodeConfig metallbv1beta1.BFDProfile, peerConfig f
return nil
}

func validateDesiredLB(svc *corev1.Service) {
desiredLbIPs := svc.Annotations["metallb.universe.tf/loadBalancerIPs"]
if desiredLbIPs == "" {
return
}
framework.ExpectEqual(desiredLbIPs, strings.Join(getIngressIPs(svc.Status.LoadBalancer.Ingress), ","))
}

func checkServiceOnlyOnNodes(svc *corev1.Service, expectedNodes []corev1.Node, ipFamily ipfamily.Family) {
if len(expectedNodes) == 0 {
return
Expand Down Expand Up @@ -286,14 +278,6 @@ OUTER:
return nonSelectedNodes
}

func getIngressIPs(ingresses []corev1.LoadBalancerIngress) []string {
var ips []string
for _, ingress := range ingresses {
ips = append(ips, ingress.IP)
}
return ips
}

func validateServiceNotAdvertised(svc *corev1.Service, frrContainers []*frrcontainer.FRR, advertised string, ipFamily ipfamily.Family) {
for _, c := range frrContainers {
if c.Name != advertised {
Expand Down
73 changes: 72 additions & 1 deletion e2etest/l2tests/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import (

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"

"go.universe.tf/e2etest/pkg/config"
"go.universe.tf/e2etest/pkg/k8s"
"go.universe.tf/e2etest/pkg/metallb"

"go.universe.tf/e2etest/pkg/service"
metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -145,6 +148,74 @@ var _ = ginkgo.Describe("IP Assignment", func() {
err := cs.CoreV1().Services(svc.Namespace).Delete(context.Background(), svc.Name, metav1.DeleteOptions{})
return err
}))

ginkgo.It("should preseve the same external ip after controller restart", func() {
const numOfRestarts = 5
resources := config.Resources{
Pools: []metallbv1beta1.IPAddressPool{
{
ObjectMeta: metav1.ObjectMeta{
Name: "assignment-controller-reset-test-pool",
},
Spec: metallbv1beta1.IPAddressPoolSpec{
Addresses: []string{"192.168.10.100/32", "192.168.20.200/32"},
},
},
},
}
err := ConfigUpdater.Update(resources)
framework.ExpectNoError(err)

ginkgo.By("creating 4 LB services")
jig := e2eservice.NewTestJig(cs, f.Namespace.Name, "service-a")
serviceA, err := jig.CreateLoadBalancerService(30*time.Second, nil)
framework.ExpectNoError(err)
defer service.Delete(cs, serviceA)
service.ValidateDesiredLB(serviceA)

jig = e2eservice.NewTestJig(cs, f.Namespace.Name, "service-b")
serviceB, err := jig.CreateLoadBalancerService(30*time.Second, nil)
framework.ExpectNoError(err)
defer service.Delete(cs, serviceB)
service.ValidateDesiredLB(serviceB)

jig = e2eservice.NewTestJig(cs, f.Namespace.Name, "service-c")
serviceC, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(nil)
framework.ExpectNoError(err)
defer service.Delete(cs, serviceC)

jig = e2eservice.NewTestJig(cs, f.Namespace.Name, "service-d")
serviceD, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(nil)
framework.ExpectNoError(err)
defer service.Delete(cs, serviceD)

restartAndAssert := func() {
metallb.RestartController(cs)
gomega.Consistently(func() error {
serviceA, err = cs.CoreV1().Services(serviceA.Namespace).Get(context.TODO(), serviceA.Name, metav1.GetOptions{})
framework.ExpectNoError(err)

err = service.ValidateAssignedWith(serviceA, "192.168.10.100")
if err != nil {
return err
}
serviceB, err = cs.CoreV1().Services(serviceB.Namespace).Get(context.TODO(), serviceB.Name, metav1.GetOptions{})
framework.ExpectNoError(err)

err = service.ValidateAssignedWith(serviceB, "192.168.20.200")
if err != nil {
return err
}

return nil
}, 10*time.Second, 2*time.Second).ShouldNot(gomega.HaveOccurred())
}

ginkgo.By("restarting the controller and validating that the service keeps the same ip")
for i := 0; i < numOfRestarts; i++ {
restartAndAssert()
}
})
})

ginkgo.Context("IPV4 - Validate service allocation in address pools", func() {
Expand Down
20 changes: 20 additions & 0 deletions e2etest/pkg/k8s/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,23 @@ func PodLogs(cs clientset.Interface, pod *corev1.Pod, podLogOpts corev1.PodLogOp
str := buf.String()
return str, nil
}

// PodIsReady returns the given pod's PodReady condition.
func PodIsReady(p *corev1.Pod) bool {
return podConditionStatus(p, corev1.PodReady) == corev1.ConditionTrue
}

// podConditionStatus returns the status of the condition for a given pod.
func podConditionStatus(p *corev1.Pod, condition corev1.PodConditionType) corev1.ConditionStatus {
if p == nil {
return corev1.ConditionUnknown
}

for _, c := range p.Status.Conditions {
if c.Type == condition {
return c.Status
}
}

return corev1.ConditionUnknown
}
29 changes: 28 additions & 1 deletion e2etest/pkg/metallb/metallb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (
"context"
"fmt"
"os"
"time"

"github.com/pkg/errors"
"go.universe.tf/e2etest/pkg/k8s"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
)

var (
Expand Down Expand Up @@ -53,7 +57,7 @@ func ControllerPod(cs clientset.Interface) (*corev1.Pod, error) {
LabelSelector: ControllerLabelSelector,
})
if err != nil {
return nil, errors.Wrap(err, "failed to fetch controller pods")
framework.ExpectNoError(err, "failed to fetch controller pods")
}
if len(pods.Items) != 1 {
return nil, fmt.Errorf("expected one controller pod, found %d", len(pods.Items))
Expand All @@ -74,3 +78,26 @@ func SpeakerPodInNode(cs clientset.Interface, node string) (*corev1.Pod, error)
}
return nil, errors.Errorf("no speaker pod run in the node %s", node)
}

// RestartController restarts metallb's controller pod and waits for it to be running and ready.
func RestartController(cs clientset.Interface) {
controllerPod, err := ControllerPod(cs)
framework.ExpectNoError(err)

err = cs.CoreV1().Pods(controllerPod.Namespace).Delete(context.TODO(), controllerPod.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err)

err = wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) {
pod, err := ControllerPod(cs)
if err != nil {
return false, nil
}
if controllerPod.Name == pod.Name {
return false, nil
}
isReady := (pod.Status.Phase == corev1.PodRunning) && (k8s.PodIsReady(pod))

return isReady, nil
})
framework.ExpectNoError(err)
}
34 changes: 34 additions & 0 deletions e2etest/pkg/service/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"fmt"
"net"
"strconv"
"strings"

"go.universe.tf/e2etest/pkg/executor"
"go.universe.tf/e2etest/pkg/wget"
corev1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/test/e2e/framework"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
)

Expand All @@ -25,3 +27,35 @@ func ValidateL2(svc *corev1.Service) error {
}
return nil
}

func ValidateDesiredLB(svc *corev1.Service) {
desiredLbIPs := svc.Annotations["metallb.universe.tf/loadBalancerIPs"]
if desiredLbIPs == "" {
return
}
framework.ExpectEqual(desiredLbIPs, strings.Join(getIngressIPs(svc.Status.LoadBalancer.Ingress), ","))
}

// ValidateAssignedWith validates that the service is assigned with the given ip.
func ValidateAssignedWith(svc *corev1.Service, ip string) error {
if ip == "" {
return nil
}

ingressIPs := getIngressIPs(svc.Status.LoadBalancer.Ingress)
for _, ingressIP := range ingressIPs {
if ingressIP == ip {
return nil
}
}

return fmt.Errorf("validation failed: ip %s is not assigned to service %s", ip, svc.Name)
}

func getIngressIPs(ingresses []corev1.LoadBalancerIngress) []string {
var ips []string
for _, ingress := range ingresses {
ips = append(ips, ingress.IP)
}
return ips
}
9 changes: 9 additions & 0 deletions internal/k8s/controllers/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type ServiceReconciler struct {
Endpoints NeedEndPoints
LoadBalancerClass string
Reload chan event.GenericEvent
// initialLoadPerformed is set after the first time we call reprocessAll.
// This is required because we want the first time we load the services to follow the assigned first, non assigned later order.
// This allows avoiding to have services with already assigned IP to get their IP stolen by other services.
initialLoadPerformed bool
}

func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand All @@ -63,6 +67,11 @@ func (r *ServiceReconciler) reconcileService(ctx context.Context, req ctrl.Reque

var service *v1.Service

if !r.initialLoadPerformed {
level.Debug(r.Logger).Log("controller", "ServiceReconciler", "message", "filtered service, still waiting for the initial load to be performed")
return ctrl.Result{}, nil
}

service, err := r.serviceFor(ctx, req.NamespacedName)
if err != nil {
level.Error(r.Logger).Log("controller", "ServiceReconciler", "message", "failed to get service", "service", req.NamespacedName, "error", err)
Expand Down
11 changes: 10 additions & 1 deletion internal/k8s/controllers/service_controller_reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"sort"

"github.com/go-kit/log/level"

Expand Down Expand Up @@ -67,8 +68,14 @@ func (r *ServiceReconciler) reprocessAll(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

// Make it process the already assigned services first
sortedServices := services.Items
sort.Slice(sortedServices, func(i, j int) bool {
return len(sortedServices[i].Status.LoadBalancer.Ingress) > len(sortedServices[j].Status.LoadBalancer.Ingress)
})

retry := false
for _, service := range services.Items {
for _, service := range sortedServices {
service := service // so we can use &service
if filterByLoadBalancerClass(&service, r.LoadBalancerClass) {
level.Debug(r.Logger).Log("controller", "ServiceReconciler", "filtered service", req.NamespacedName)
Expand Down Expand Up @@ -101,6 +108,8 @@ func (r *ServiceReconciler) reprocessAll(ctx context.Context, req ctrl.Request)
level.Info(r.Logger).Log("controller", "ServiceReconciler - reprocessAll", "event", "force service reload")
return ctrl.Result{}, errRetry
}
r.initialLoadPerformed = true

return ctrl.Result{}, nil
}

Expand Down
Loading