diff --git a/e2etest/bgptests/bgp.go b/e2etest/bgptests/bgp.go index 6723a33acbe..27676da326b 100644 --- a/e2etest/bgptests/bgp.go +++ b/e2etest/bgptests/bgp.go @@ -123,7 +123,7 @@ var _ = ginkgo.Describe("BGP", func() { allNodes, err := cs.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) framework.ExpectNoError(err) - validateDesiredLB(svc) + testservice.ValidateDesiredLB(svc) for _, c := range FRRContainers { validateService(svc, allNodes.Items, c) @@ -156,7 +156,7 @@ var _ = ginkgo.Describe("BGP", func() { testservice.TrafficPolicyCluster(svc) }) defer testservice.Delete(cs, svc) - validateDesiredLB(svc) + testservice.ValidateDesiredLB(svc) for _, c := range FRRContainers { validateService(svc, allNodes.Items, c) @@ -193,7 +193,7 @@ var _ = ginkgo.Describe("BGP", func() { }) defer testservice.Delete(cs, svc) - validateDesiredLB(svc) + testservice.ValidateDesiredLB(svc) err := jig.Scale(2) framework.ExpectNoError(err) @@ -280,8 +280,8 @@ var _ = ginkgo.Describe("BGP", func() { }) defer testservice.Delete(cs, svc1) - validateDesiredLB(svc) - validateDesiredLB(svc1) + testservice.ValidateDesiredLB(svc) + testservice.ValidateDesiredLB(svc1) for _, c := range FRRContainers { validateService(svc, allNodes.Items, c) diff --git a/e2etest/bgptests/validate.go b/e2etest/bgptests/validate.go index b9f38a1b4c5..725770e0a77 100644 --- a/e2etest/bgptests/validate.go +++ b/e2etest/bgptests/validate.go @@ -12,7 +12,6 @@ import ( "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - metallbv1beta1 "go.universe.tf/metallb/api/v1beta1" "go.universe.tf/e2etest/pkg/executor" "go.universe.tf/e2etest/pkg/frr" frrcontainer "go.universe.tf/e2etest/pkg/frr/container" @@ -21,6 +20,7 @@ import ( "go.universe.tf/e2etest/pkg/metallb" "go.universe.tf/e2etest/pkg/routes" "go.universe.tf/e2etest/pkg/wget" + metallbv1beta1 "go.universe.tf/metallb/api/v1beta1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" @@ -168,14 +168,6 @@ func checkBFDConfigPropagated(nodeConfig metallbv1beta1.BFDProfile, peerConfig f return nil } -func validateDesiredLB(svc *corev1.Service) { - desiredLbIPs := svc.Annotations["metallb.universe.tf/loadBalancerIPs"] - if desiredLbIPs == "" { - return - } - framework.ExpectEqual(desiredLbIPs, strings.Join(getIngressIPs(svc.Status.LoadBalancer.Ingress), ",")) -} - func checkServiceOnlyOnNodes(svc *corev1.Service, expectedNodes []corev1.Node, ipFamily ipfamily.Family) { if len(expectedNodes) == 0 { return @@ -286,14 +278,6 @@ OUTER: return nonSelectedNodes } -func getIngressIPs(ingresses []corev1.LoadBalancerIngress) []string { - var ips []string - for _, ingress := range ingresses { - ips = append(ips, ingress.IP) - } - return ips -} - func validateServiceNotAdvertised(svc *corev1.Service, frrContainers []*frrcontainer.FRR, advertised string, ipFamily ipfamily.Family) { for _, c := range frrContainers { if c.Name != advertised { diff --git a/e2etest/l2tests/assignment.go b/e2etest/l2tests/assignment.go index d7ec631efc5..130bcc10c4a 100644 --- a/e2etest/l2tests/assignment.go +++ b/e2etest/l2tests/assignment.go @@ -9,10 +9,13 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" - metallbv1beta1 "go.universe.tf/metallb/api/v1beta1" + "go.universe.tf/e2etest/pkg/config" "go.universe.tf/e2etest/pkg/k8s" + "go.universe.tf/e2etest/pkg/metallb" + "go.universe.tf/e2etest/pkg/service" + metallbv1beta1 "go.universe.tf/metallb/api/v1beta1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -145,6 +148,74 @@ var _ = ginkgo.Describe("IP Assignment", func() { err := cs.CoreV1().Services(svc.Namespace).Delete(context.Background(), svc.Name, metav1.DeleteOptions{}) return err })) + + ginkgo.It("should preseve the same external ip after controller restart", func() { + const numOfRestarts = 5 + resources := config.Resources{ + Pools: []metallbv1beta1.IPAddressPool{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "assignment-controller-reset-test-pool", + }, + Spec: metallbv1beta1.IPAddressPoolSpec{ + Addresses: []string{"192.168.10.100/32", "192.168.20.200/32"}, + }, + }, + }, + } + err := ConfigUpdater.Update(resources) + framework.ExpectNoError(err) + + ginkgo.By("creating 4 LB services") + jig := e2eservice.NewTestJig(cs, f.Namespace.Name, "service-a") + serviceA, err := jig.CreateLoadBalancerService(30*time.Second, nil) + framework.ExpectNoError(err) + defer service.Delete(cs, serviceA) + service.ValidateDesiredLB(serviceA) + + jig = e2eservice.NewTestJig(cs, f.Namespace.Name, "service-b") + serviceB, err := jig.CreateLoadBalancerService(30*time.Second, nil) + framework.ExpectNoError(err) + defer service.Delete(cs, serviceB) + service.ValidateDesiredLB(serviceB) + + jig = e2eservice.NewTestJig(cs, f.Namespace.Name, "service-c") + serviceC, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(nil) + framework.ExpectNoError(err) + defer service.Delete(cs, serviceC) + + jig = e2eservice.NewTestJig(cs, f.Namespace.Name, "service-d") + serviceD, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(nil) + framework.ExpectNoError(err) + defer service.Delete(cs, serviceD) + + restartAndAssert := func() { + metallb.RestartController(cs) + gomega.Consistently(func() error { + serviceA, err = cs.CoreV1().Services(serviceA.Namespace).Get(context.TODO(), serviceA.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + err = service.ValidateAssignedWith(serviceA, "192.168.10.100") + if err != nil { + return err + } + serviceB, err = cs.CoreV1().Services(serviceB.Namespace).Get(context.TODO(), serviceB.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + err = service.ValidateAssignedWith(serviceB, "192.168.20.200") + if err != nil { + return err + } + + return nil + }, 10*time.Second, 2*time.Second).ShouldNot(gomega.HaveOccurred()) + } + + ginkgo.By("restarting the controller and validating that the service keeps the same ip") + for i := 0; i < numOfRestarts; i++ { + restartAndAssert() + } + }) }) ginkgo.Context("IPV4 - Validate service allocation in address pools", func() { diff --git a/e2etest/pkg/k8s/pods.go b/e2etest/pkg/k8s/pods.go index 59a1e508d80..4006b23372b 100644 --- a/e2etest/pkg/k8s/pods.go +++ b/e2etest/pkg/k8s/pods.go @@ -37,3 +37,23 @@ func PodLogs(cs clientset.Interface, pod *corev1.Pod, podLogOpts corev1.PodLogOp str := buf.String() return str, nil } + +// PodIsReady returns the given pod's PodReady condition. +func PodIsReady(p *corev1.Pod) bool { + return podConditionStatus(p, corev1.PodReady) == corev1.ConditionTrue +} + +// podConditionStatus returns the status of the condition for a given pod. +func podConditionStatus(p *corev1.Pod, condition corev1.PodConditionType) corev1.ConditionStatus { + if p == nil { + return corev1.ConditionUnknown + } + + for _, c := range p.Status.Conditions { + if c.Type == condition { + return c.Status + } + } + + return corev1.ConditionUnknown +} diff --git a/e2etest/pkg/metallb/metallb.go b/e2etest/pkg/metallb/metallb.go index 3dd27b1b9a4..846b0d3d4e2 100644 --- a/e2etest/pkg/metallb/metallb.go +++ b/e2etest/pkg/metallb/metallb.go @@ -6,11 +6,15 @@ import ( "context" "fmt" "os" + "time" "github.com/pkg/errors" + "go.universe.tf/e2etest/pkg/k8s" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/framework" ) var ( @@ -53,7 +57,7 @@ func ControllerPod(cs clientset.Interface) (*corev1.Pod, error) { LabelSelector: ControllerLabelSelector, }) if err != nil { - return nil, errors.Wrap(err, "failed to fetch controller pods") + framework.ExpectNoError(err, "failed to fetch controller pods") } if len(pods.Items) != 1 { return nil, fmt.Errorf("expected one controller pod, found %d", len(pods.Items)) @@ -74,3 +78,26 @@ func SpeakerPodInNode(cs clientset.Interface, node string) (*corev1.Pod, error) } return nil, errors.Errorf("no speaker pod run in the node %s", node) } + +// RestartController restarts metallb's controller pod and waits for it to be running and ready. +func RestartController(cs clientset.Interface) { + controllerPod, err := ControllerPod(cs) + framework.ExpectNoError(err) + + err = cs.CoreV1().Pods(controllerPod.Namespace).Delete(context.TODO(), controllerPod.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + err = wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) { + pod, err := ControllerPod(cs) + if err != nil { + return false, nil + } + if controllerPod.Name == pod.Name { + return false, nil + } + isReady := (pod.Status.Phase == corev1.PodRunning) && (k8s.PodIsReady(pod)) + + return isReady, nil + }) + framework.ExpectNoError(err) +} diff --git a/e2etest/pkg/service/validate.go b/e2etest/pkg/service/validate.go index 5ed70258966..9f957090aeb 100644 --- a/e2etest/pkg/service/validate.go +++ b/e2etest/pkg/service/validate.go @@ -6,10 +6,12 @@ import ( "fmt" "net" "strconv" + "strings" "go.universe.tf/e2etest/pkg/executor" "go.universe.tf/e2etest/pkg/wget" corev1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/test/e2e/framework" e2eservice "k8s.io/kubernetes/test/e2e/framework/service" ) @@ -25,3 +27,35 @@ func ValidateL2(svc *corev1.Service) error { } return nil } + +func ValidateDesiredLB(svc *corev1.Service) { + desiredLbIPs := svc.Annotations["metallb.universe.tf/loadBalancerIPs"] + if desiredLbIPs == "" { + return + } + framework.ExpectEqual(desiredLbIPs, strings.Join(getIngressIPs(svc.Status.LoadBalancer.Ingress), ",")) +} + +// ValidateAssignedWith validates that the service is assigned with the given ip. +func ValidateAssignedWith(svc *corev1.Service, ip string) error { + if ip == "" { + return nil + } + + ingressIPs := getIngressIPs(svc.Status.LoadBalancer.Ingress) + for _, ingressIP := range ingressIPs { + if ingressIP == ip { + return nil + } + } + + return fmt.Errorf("validation failed: ip %s is not assigned to service %s", ip, svc.Name) +} + +func getIngressIPs(ingresses []corev1.LoadBalancerIngress) []string { + var ips []string + for _, ingress := range ingresses { + ips = append(ips, ingress.IP) + } + return ips +} diff --git a/internal/k8s/controllers/service_controller.go b/internal/k8s/controllers/service_controller.go index 744d9a336e3..7fa83bbfdb8 100644 --- a/internal/k8s/controllers/service_controller.go +++ b/internal/k8s/controllers/service_controller.go @@ -47,6 +47,10 @@ type ServiceReconciler struct { Endpoints NeedEndPoints LoadBalancerClass string Reload chan event.GenericEvent + // initialLoadPerformed is set after the first time we call reprocessAll. + // This is required because we want the first time we load the services to follow the assigned first, non assigned later order. + // This allows avoiding to have services with already assigned IP to get their IP stolen by other services. + initialLoadPerformed bool } func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -63,6 +67,11 @@ func (r *ServiceReconciler) reconcileService(ctx context.Context, req ctrl.Reque var service *v1.Service + if !r.initialLoadPerformed { + level.Debug(r.Logger).Log("controller", "ServiceReconciler", "message", "filtered service, still waiting for the initial load to be performed") + return ctrl.Result{}, nil + } + service, err := r.serviceFor(ctx, req.NamespacedName) if err != nil { level.Error(r.Logger).Log("controller", "ServiceReconciler", "message", "failed to get service", "service", req.NamespacedName, "error", err) diff --git a/internal/k8s/controllers/service_controller_reload.go b/internal/k8s/controllers/service_controller_reload.go index 7c2aeb9a6ee..e679e85aaaa 100644 --- a/internal/k8s/controllers/service_controller_reload.go +++ b/internal/k8s/controllers/service_controller_reload.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "sort" "github.com/go-kit/log/level" @@ -67,8 +68,14 @@ func (r *ServiceReconciler) reprocessAll(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } + // Make it process the already assigned services first + sortedServices := services.Items + sort.Slice(sortedServices, func(i, j int) bool { + return len(sortedServices[i].Status.LoadBalancer.Ingress) > len(sortedServices[j].Status.LoadBalancer.Ingress) + }) + retry := false - for _, service := range services.Items { + for _, service := range sortedServices { service := service // so we can use &service if filterByLoadBalancerClass(&service, r.LoadBalancerClass) { level.Debug(r.Logger).Log("controller", "ServiceReconciler", "filtered service", req.NamespacedName) @@ -101,6 +108,8 @@ func (r *ServiceReconciler) reprocessAll(ctx context.Context, req ctrl.Request) level.Info(r.Logger).Log("controller", "ServiceReconciler - reprocessAll", "event", "force service reload") return ctrl.Result{}, errRetry } + r.initialLoadPerformed = true + return ctrl.Result{}, nil } diff --git a/internal/k8s/controllers/service_controller_test.go b/internal/k8s/controllers/service_controller_test.go index bdc60c7f25b..fc6e6e94798 100644 --- a/internal/k8s/controllers/service_controller_test.go +++ b/internal/k8s/controllers/service_controller_test.go @@ -70,6 +70,7 @@ func TestServiceController(t *testing.T) { shouldReprocessAll bool expectReconcileFails bool expectForceReloadCalled bool + initialLoadPerformed bool }{ { desc: "call reconcileService, handler returns SyncStateSuccess", @@ -79,6 +80,7 @@ func TestServiceController(t *testing.T) { shouldReprocessAll: false, expectReconcileFails: false, expectForceReloadCalled: false, + initialLoadPerformed: true, }, { desc: "call reconcileService, handler returns SyncStateSuccess - with endpoints", @@ -88,6 +90,7 @@ func TestServiceController(t *testing.T) { shouldReprocessAll: false, expectReconcileFails: false, expectForceReloadCalled: false, + initialLoadPerformed: true, }, { desc: "call reconcileService, handler returns SyncStateSuccess - with endpointSlices", @@ -97,6 +100,7 @@ func TestServiceController(t *testing.T) { shouldReprocessAll: false, expectReconcileFails: false, expectForceReloadCalled: false, + initialLoadPerformed: true, }, { desc: "call reconcileService, handler returns SyncStateError", @@ -106,6 +110,7 @@ func TestServiceController(t *testing.T) { shouldReprocessAll: false, expectReconcileFails: true, expectForceReloadCalled: false, + initialLoadPerformed: true, }, { desc: "call reconcileService, handler returns SyncStateErrorNoRetry", @@ -115,6 +120,7 @@ func TestServiceController(t *testing.T) { shouldReprocessAll: false, expectReconcileFails: false, expectForceReloadCalled: false, + initialLoadPerformed: true, }, { desc: "call reconcileService, handler returns SyncStateReprocessAll", @@ -124,6 +130,17 @@ func TestServiceController(t *testing.T) { shouldReprocessAll: false, expectReconcileFails: false, expectForceReloadCalled: true, + initialLoadPerformed: true, + }, + { + desc: "call reconcileService, initialLoadPerformed initiated to false", + handlerRes: SyncStateReprocessAll, + needEndPoints: NoNeed, + initObjects: []client.Object{testService}, + shouldReprocessAll: false, + expectReconcileFails: false, + expectForceReloadCalled: false, + initialLoadPerformed: false, }, { desc: "call reprocessAll, handler returns SyncStateSuccess", @@ -133,6 +150,7 @@ func TestServiceController(t *testing.T) { shouldReprocessAll: true, expectReconcileFails: false, expectForceReloadCalled: false, + initialLoadPerformed: true, }, { desc: "call reprocessAll, handler returns SyncStateSuccess - with endpoints", @@ -142,6 +160,7 @@ func TestServiceController(t *testing.T) { shouldReprocessAll: true, expectReconcileFails: false, expectForceReloadCalled: false, + initialLoadPerformed: true, }, { desc: "call reprocessAll, handler returns SyncStateSuccess - with endpointSlices", @@ -151,6 +170,7 @@ func TestServiceController(t *testing.T) { shouldReprocessAll: true, expectReconcileFails: false, expectForceReloadCalled: false, + initialLoadPerformed: true, }, { desc: "call reprocessAll, handler returns SyncStateError", @@ -160,6 +180,7 @@ func TestServiceController(t *testing.T) { shouldReprocessAll: true, expectReconcileFails: true, expectForceReloadCalled: false, + initialLoadPerformed: true, }, { desc: "call reprocessAll, handler returns SyncStateErrorNoRetry", @@ -169,6 +190,7 @@ func TestServiceController(t *testing.T) { shouldReprocessAll: true, expectReconcileFails: false, expectForceReloadCalled: false, + initialLoadPerformed: true, }, { desc: "call reprocessAll, handler returns SyncStateReprocessAll", @@ -178,6 +200,17 @@ func TestServiceController(t *testing.T) { shouldReprocessAll: true, expectReconcileFails: true, expectForceReloadCalled: false, + initialLoadPerformed: true, + }, + { + desc: "call reprocessAll, initialLoadPerformed initiated to false", + handlerRes: SyncStateSuccess, + needEndPoints: NoNeed, + initObjects: []client.Object{testService}, + shouldReprocessAll: true, + expectReconcileFails: false, + expectForceReloadCalled: false, + initialLoadPerformed: false, }, } for _, test := range tests { @@ -207,15 +240,16 @@ func TestServiceController(t *testing.T) { mockReload := make(chan event.GenericEvent, 1) r := &ServiceReconciler{ - Client: fakeClient, - Logger: log.NewNopLogger(), - Scheme: scheme, - Namespace: testNamespace, - Handler: mockHandler, - Endpoints: test.needEndPoints, - Reload: mockReload, + Client: fakeClient, + Logger: log.NewNopLogger(), + Scheme: scheme, + Namespace: testNamespace, + Handler: mockHandler, + Endpoints: test.needEndPoints, + Reload: mockReload, + initialLoadPerformed: false, } - + r.initialLoadPerformed = test.initialLoadPerformed var req reconcile.Request if test.shouldReprocessAll { req = reconcile.Request{ @@ -254,6 +288,9 @@ func TestServiceController(t *testing.T) { t.Errorf("test %s failed: call force reload expected: %v, got: %v", test.desc, test.expectForceReloadCalled, calledForceReload) } + if test.shouldReprocessAll && !r.initialLoadPerformed { + t.Errorf("test %s failed: reconciler's initialLoadPerformed flag didn't change to true", test.desc) + } } } diff --git a/internal/k8s/nodes/nodes.go b/internal/k8s/nodes/nodes.go index 5d335f47f82..415eda0625b 100644 --- a/internal/k8s/nodes/nodes.go +++ b/internal/k8s/nodes/nodes.go @@ -6,8 +6,13 @@ import ( corev1 "k8s.io/api/core/v1" ) -// ConditionStatus returns the status of the condition for a given node. -func ConditionStatus(n *corev1.Node, ct corev1.NodeConditionType) corev1.ConditionStatus { +// IsNetworkUnavailable returns true if the given node NodeNetworkUnavailable condition status is true. +func IsNetworkUnavailable(n *corev1.Node) bool { + return conditionStatus(n, corev1.NodeNetworkUnavailable) == corev1.ConditionTrue +} + +// conditionStatus returns the status of the condition for a given node. +func conditionStatus(n *corev1.Node, ct corev1.NodeConditionType) corev1.ConditionStatus { if n == nil { return corev1.ConditionUnknown } @@ -20,8 +25,3 @@ func ConditionStatus(n *corev1.Node, ct corev1.NodeConditionType) corev1.Conditi return corev1.ConditionUnknown } - -// IsNetworkUnavailable returns true if the given node NodeNetworkUnavailable condition status is true. -func IsNetworkUnavailable(n *corev1.Node) bool { - return ConditionStatus(n, corev1.NodeNetworkUnavailable) == corev1.ConditionTrue -}