From 5e0c305f6cd1cdd5db17e14c4a11c2c85db92b90 Mon Sep 17 00:00:00 2001 From: changluyi <47097611+changluyi@users.noreply.github.com> Date: Mon, 27 Feb 2023 09:00:54 +0800 Subject: [PATCH] fix service dual stack add/del cluster ips not change ovn nb (#2367) * fix service dual stack add/del cluster ips not change ovn nb * add e2e case --- pkg/controller/service.go | 44 +++++++++++--- test/e2e/framework/service.go | 29 +++++++++ test/e2e/kube-ovn/e2e_test.go | 1 + test/e2e/kube-ovn/service/service.go | 88 ++++++++++++++++++++++++++++ 4 files changed, 154 insertions(+), 8 deletions(-) create mode 100644 test/e2e/kube-ovn/service/service.go diff --git a/pkg/controller/service.go b/pkg/controller/service.go index 2a0d76d0c66..41f52f0d8bb 100644 --- a/pkg/controller/service.go +++ b/pkg/controller/service.go @@ -108,7 +108,22 @@ func (c *Controller) enqueueUpdateService(old, new interface{}) { utilruntime.HandleError(err) return } + + oldClusterIps := getVipIps(oldSvc) + newClusterIps := getVipIps(newSvc) + var ipsToDel []string + for _, oldClusterIp := range oldClusterIps { + if !util.ContainsString(newClusterIps, oldClusterIp) { + ipsToDel = append(ipsToDel, oldClusterIp) + } + } + klog.V(3).Infof("enqueue update service %s", key) + if len(ipsToDel) != 0 { + ipsToDelStr := strings.Join(ipsToDel, ",") + key = strings.Join([]string{key, ipsToDelStr}, "#") + } + c.updateServiceQueue.Add(key) } @@ -269,6 +284,14 @@ func (c *Controller) handleDeleteService(service *vpcService) error { } func (c *Controller) handleUpdateService(key string) error { + keys := strings.Split(key, "#") + key = keys[0] + var ipsToDel []string + if len(keys) == 2 { + ipsToDelStr := keys[1] + ipsToDel = strings.Split(ipsToDelStr, ",") + } + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) @@ -283,12 +306,7 @@ func (c *Controller) handleUpdateService(key string) error { return err } - var ips []string - if vip, ok := svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok { - ips = strings.Split(vip, ",") - } else { - ips = util.ServiceClusterIPs(*svc) - } + ips := getVipIps(svc) vpcName := svc.Annotations[util.VpcAnnotation] if vpcName == "" { @@ -340,7 +358,7 @@ func (c *Controller) handleUpdateService(key string) error { } } for vip := range vips { - if ip := parseVipAddr(vip); util.ContainsString(ips, ip) && !util.IsStringIn(vip, svcVips) { + if ip := parseVipAddr(vip); (util.ContainsString(ips, ip) && !util.IsStringIn(vip, svcVips)) || util.ContainsString(ipsToDel, ip) { klog.Infof("remove stale vip %s from LB %s", vip, lb) if err = c.ovnLegacyClient.DeleteLoadBalancerVip(vip, lb); err != nil { klog.Errorf("failed to delete vip %s from LB %s: %v", vip, lb, err) @@ -354,7 +372,7 @@ func (c *Controller) handleUpdateService(key string) error { } klog.V(3).Infof("existing vips of LB %s: %v", oLb, vips) for vip := range vips { - if ip := parseVipAddr(vip); util.ContainsString(ips, ip) { + if ip := parseVipAddr(vip); util.ContainsString(ips, ip) || util.ContainsString(ipsToDel, ip) { klog.Infof("remove stale vip %s from LB %s", vip, oLb) if err = c.ovnLegacyClient.DeleteLoadBalancerVip(vip, oLb); err != nil { klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oLb, err) @@ -473,3 +491,13 @@ func (c *Controller) handleAddService(key string) error { return nil } + +func getVipIps(svc *v1.Service) []string { + var ips []string + if vip, ok := svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok { + ips = strings.Split(vip, ",") + } else { + ips = util.ServiceClusterIPs(*svc) + } + return ips +} diff --git a/test/e2e/framework/service.go b/test/e2e/framework/service.go index 7670f7e9804..fa559118a1d 100644 --- a/test/e2e/framework/service.go +++ b/test/e2e/framework/service.go @@ -9,9 +9,11 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "github.com/kubeovn/kube-ovn/pkg/util" "github.com/onsi/gomega" ) @@ -49,6 +51,33 @@ func (c *ServiceClient) CreateSync(service *corev1.Service) *corev1.Service { return c.Get(s.Name).DeepCopy() } +// Patch patches the service +func (c *ServiceClient) Patch(original, modified *corev1.Service) *corev1.Service { + + patch, err := util.GenerateMergePatchPayload(original, modified) + ExpectNoError(err) + + var patchedService *corev1.Service + err = wait.PollImmediate(2*time.Second, timeout, func() (bool, error) { + s, err := c.ServiceInterface.Patch(context.TODO(), original.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "") + if err != nil { + return handleWaitingAPIError(err, false, "patch service %q", original.Name) + } + patchedService = s + return true, nil + }) + if err == nil { + return patchedService.DeepCopy() + } + + if IsTimeout(err) { + Failf("timed out while retrying to patch service %s", original.Name) + } + ExpectNoError(maybeTimeoutError(err, "patching service %s", original.Name)) + + return nil +} + // Delete deletes a service if the service exists func (c *ServiceClient) Delete(name string) { err := c.ServiceInterface.Delete(context.TODO(), name, metav1.DeleteOptions{}) diff --git a/test/e2e/kube-ovn/e2e_test.go b/test/e2e/kube-ovn/e2e_test.go index 8651d81d775..5925218b0fd 100644 --- a/test/e2e/kube-ovn/e2e_test.go +++ b/test/e2e/kube-ovn/e2e_test.go @@ -20,6 +20,7 @@ import ( _ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/node" _ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/pod" _ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/qos" + _ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/service" _ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/subnet" _ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/underlay" ) diff --git a/test/e2e/kube-ovn/service/service.go b/test/e2e/kube-ovn/service/service.go new file mode 100644 index 00000000000..a0aee6cf655 --- /dev/null +++ b/test/e2e/kube-ovn/service/service.go @@ -0,0 +1,88 @@ +package service + +import ( + "os/exec" + "strings" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/kubeovn/kube-ovn/test/e2e/framework" + "github.com/onsi/ginkgo/v2" +) + +var _ = framework.Describe("[group:service]", func() { + f := framework.NewDefaultFramework("service") + + var serviceClient *framework.ServiceClient + var podClient *framework.PodClient + var namespaceName, serviceName, podName string + + ginkgo.BeforeEach(func() { + serviceClient = f.ServiceClient() + podClient = f.PodClient() + namespaceName = f.Namespace.Name + serviceName = "service-" + framework.RandomSuffix() + podName = "pod-" + framework.RandomSuffix() + }) + ginkgo.AfterEach(func() { + ginkgo.By("Deleting pod " + podName) + podClient.DeleteSync(podName) + + ginkgo.By("Deleting service " + serviceName) + serviceClient.DeleteSync(serviceName) + }) + + framework.ConformanceIt("should ovn nb change vip when dual-stack service removes the cluster ip ", func() { + if f.ClusterIpFamily != "dual" { + ginkgo.Skip("this case only support dual mode") + } + + ginkgo.By("Creating service " + serviceName) + ports := []corev1.ServicePort{{ + Name: "tcp", + Protocol: corev1.ProtocolTCP, + Port: 80, + TargetPort: intstr.FromInt(80), + }} + + selector := map[string]string{"app": "svc-dual"} + service := framework.MakeService(serviceName, corev1.ServiceTypeClusterIP, nil, selector, ports, corev1.ServiceAffinityNone) + service.Namespace = namespaceName + service.Spec.IPFamilyPolicy = new(corev1.IPFamilyPolicy) + *service.Spec.IPFamilyPolicy = corev1.IPFamilyPolicyPreferDualStack + service = serviceClient.CreateSync(service) + v6ClusterIp := service.Spec.ClusterIPs[1] + originService := service.DeepCopy() + + podBackend := framework.MakePod(namespaceName, podName, selector, nil, framework.PauseImage, nil, nil) + _ = podClient.CreateSync(podBackend) + + execCmd := "kubectl ko nbctl --format=csv --data=bare --no-heading --columns=vips find Load_Balancer name=cluster-tcp-loadbalancer" + output, err := exec.Command("bash", "-c", execCmd).CombinedOutput() + framework.ExpectNoError(err) + framework.ExpectTrue(strings.Contains(string(output), v6ClusterIp), "should contains v6 cluster ip") + + ginkgo.By("change service from dual stack to single stack ") + modifyService := service.DeepCopy() + *modifyService.Spec.IPFamilyPolicy = corev1.IPFamilyPolicySingleStack + modifyService.Spec.IPFamilies = []corev1.IPFamily{corev1.IPv4Protocol} + modifyService.Spec.ClusterIPs = []string{service.Spec.ClusterIP} + service = serviceClient.Patch(service, modifyService) + + output, err = exec.Command("bash", "-c", execCmd).CombinedOutput() + framework.ExpectNoError(err) + framework.ExpectFalse(strings.Contains(string(output), v6ClusterIp), "should not contains v6 cluster ip") + + ginkgo.By("recover service from single stack to dual stack ") + recoverService := service.DeepCopy() + *recoverService.Spec.IPFamilyPolicy = *originService.Spec.IPFamilyPolicy + recoverService.Spec.IPFamilies = originService.Spec.IPFamilies + recoverService.Spec.ClusterIPs = originService.Spec.ClusterIPs + _ = serviceClient.Patch(service, recoverService) + + output, err = exec.Command("bash", "-c", execCmd).CombinedOutput() + framework.ExpectNoError(err) + framework.ExpectTrue(strings.Contains(string(output), v6ClusterIp), "should contains v6 cluster ip") + }) +})