Skip to content

Commit

Permalink
fix service dual stack add/del cluster ips not change ovn nb (#2367)
Browse files Browse the repository at this point in the history
* fix service dual stack add/del cluster ips not change ovn nb

* add e2e case
  • Loading branch information
changluyi committed Feb 27, 2023
1 parent ff83611 commit 5e0c305
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 8 deletions.
44 changes: 36 additions & 8 deletions pkg/controller/service.go
Expand Up @@ -108,7 +108,22 @@ func (c *Controller) enqueueUpdateService(old, new interface{}) {
utilruntime.HandleError(err)
return
}

oldClusterIps := getVipIps(oldSvc)
newClusterIps := getVipIps(newSvc)
var ipsToDel []string
for _, oldClusterIp := range oldClusterIps {
if !util.ContainsString(newClusterIps, oldClusterIp) {
ipsToDel = append(ipsToDel, oldClusterIp)
}
}

klog.V(3).Infof("enqueue update service %s", key)
if len(ipsToDel) != 0 {
ipsToDelStr := strings.Join(ipsToDel, ",")
key = strings.Join([]string{key, ipsToDelStr}, "#")
}

c.updateServiceQueue.Add(key)
}

Expand Down Expand Up @@ -269,6 +284,14 @@ func (c *Controller) handleDeleteService(service *vpcService) error {
}

func (c *Controller) handleUpdateService(key string) error {
keys := strings.Split(key, "#")
key = keys[0]
var ipsToDel []string
if len(keys) == 2 {
ipsToDelStr := keys[1]
ipsToDel = strings.Split(ipsToDelStr, ",")
}

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
Expand All @@ -283,12 +306,7 @@ func (c *Controller) handleUpdateService(key string) error {
return err
}

var ips []string
if vip, ok := svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
ips = strings.Split(vip, ",")
} else {
ips = util.ServiceClusterIPs(*svc)
}
ips := getVipIps(svc)

vpcName := svc.Annotations[util.VpcAnnotation]
if vpcName == "" {
Expand Down Expand Up @@ -340,7 +358,7 @@ func (c *Controller) handleUpdateService(key string) error {
}
}
for vip := range vips {
if ip := parseVipAddr(vip); util.ContainsString(ips, ip) && !util.IsStringIn(vip, svcVips) {
if ip := parseVipAddr(vip); (util.ContainsString(ips, ip) && !util.IsStringIn(vip, svcVips)) || util.ContainsString(ipsToDel, ip) {
klog.Infof("remove stale vip %s from LB %s", vip, lb)
if err = c.ovnLegacyClient.DeleteLoadBalancerVip(vip, lb); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, lb, err)
Expand All @@ -354,7 +372,7 @@ func (c *Controller) handleUpdateService(key string) error {
}
klog.V(3).Infof("existing vips of LB %s: %v", oLb, vips)
for vip := range vips {
if ip := parseVipAddr(vip); util.ContainsString(ips, ip) {
if ip := parseVipAddr(vip); util.ContainsString(ips, ip) || util.ContainsString(ipsToDel, ip) {
klog.Infof("remove stale vip %s from LB %s", vip, oLb)
if err = c.ovnLegacyClient.DeleteLoadBalancerVip(vip, oLb); err != nil {
klog.Errorf("failed to delete vip %s from LB %s: %v", vip, oLb, err)
Expand Down Expand Up @@ -473,3 +491,13 @@ func (c *Controller) handleAddService(key string) error {

return nil
}

func getVipIps(svc *v1.Service) []string {
var ips []string
if vip, ok := svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
ips = strings.Split(vip, ",")
} else {
ips = util.ServiceClusterIPs(*svc)
}
return ips
}
29 changes: 29 additions & 0 deletions test/e2e/framework/service.go
Expand Up @@ -9,9 +9,11 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/kubeovn/kube-ovn/pkg/util"
"github.com/onsi/gomega"
)

Expand Down Expand Up @@ -49,6 +51,33 @@ func (c *ServiceClient) CreateSync(service *corev1.Service) *corev1.Service {
return c.Get(s.Name).DeepCopy()
}

// Patch patches the service
func (c *ServiceClient) Patch(original, modified *corev1.Service) *corev1.Service {

patch, err := util.GenerateMergePatchPayload(original, modified)
ExpectNoError(err)

var patchedService *corev1.Service
err = wait.PollImmediate(2*time.Second, timeout, func() (bool, error) {
s, err := c.ServiceInterface.Patch(context.TODO(), original.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "")
if err != nil {
return handleWaitingAPIError(err, false, "patch service %q", original.Name)
}
patchedService = s
return true, nil
})
if err == nil {
return patchedService.DeepCopy()
}

if IsTimeout(err) {
Failf("timed out while retrying to patch service %s", original.Name)
}
ExpectNoError(maybeTimeoutError(err, "patching service %s", original.Name))

return nil
}

// Delete deletes a service if the service exists
func (c *ServiceClient) Delete(name string) {
err := c.ServiceInterface.Delete(context.TODO(), name, metav1.DeleteOptions{})
Expand Down
1 change: 1 addition & 0 deletions test/e2e/kube-ovn/e2e_test.go
Expand Up @@ -20,6 +20,7 @@ import (
_ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/node"
_ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/pod"
_ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/qos"
_ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/service"
_ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/subnet"
_ "github.com/kubeovn/kube-ovn/test/e2e/kube-ovn/underlay"
)
Expand Down
88 changes: 88 additions & 0 deletions test/e2e/kube-ovn/service/service.go
@@ -0,0 +1,88 @@
package service

import (
"os/exec"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/kubeovn/kube-ovn/test/e2e/framework"
"github.com/onsi/ginkgo/v2"
)

var _ = framework.Describe("[group:service]", func() {
f := framework.NewDefaultFramework("service")

var serviceClient *framework.ServiceClient
var podClient *framework.PodClient
var namespaceName, serviceName, podName string

ginkgo.BeforeEach(func() {
serviceClient = f.ServiceClient()
podClient = f.PodClient()
namespaceName = f.Namespace.Name
serviceName = "service-" + framework.RandomSuffix()
podName = "pod-" + framework.RandomSuffix()
})
ginkgo.AfterEach(func() {
ginkgo.By("Deleting pod " + podName)
podClient.DeleteSync(podName)

ginkgo.By("Deleting service " + serviceName)
serviceClient.DeleteSync(serviceName)
})

framework.ConformanceIt("should ovn nb change vip when dual-stack service removes the cluster ip ", func() {
if f.ClusterIpFamily != "dual" {
ginkgo.Skip("this case only support dual mode")
}

ginkgo.By("Creating service " + serviceName)
ports := []corev1.ServicePort{{
Name: "tcp",
Protocol: corev1.ProtocolTCP,
Port: 80,
TargetPort: intstr.FromInt(80),
}}

selector := map[string]string{"app": "svc-dual"}
service := framework.MakeService(serviceName, corev1.ServiceTypeClusterIP, nil, selector, ports, corev1.ServiceAffinityNone)
service.Namespace = namespaceName
service.Spec.IPFamilyPolicy = new(corev1.IPFamilyPolicy)
*service.Spec.IPFamilyPolicy = corev1.IPFamilyPolicyPreferDualStack
service = serviceClient.CreateSync(service)
v6ClusterIp := service.Spec.ClusterIPs[1]
originService := service.DeepCopy()

podBackend := framework.MakePod(namespaceName, podName, selector, nil, framework.PauseImage, nil, nil)
_ = podClient.CreateSync(podBackend)

execCmd := "kubectl ko nbctl --format=csv --data=bare --no-heading --columns=vips find Load_Balancer name=cluster-tcp-loadbalancer"
output, err := exec.Command("bash", "-c", execCmd).CombinedOutput()
framework.ExpectNoError(err)
framework.ExpectTrue(strings.Contains(string(output), v6ClusterIp), "should contains v6 cluster ip")

ginkgo.By("change service from dual stack to single stack ")
modifyService := service.DeepCopy()
*modifyService.Spec.IPFamilyPolicy = corev1.IPFamilyPolicySingleStack
modifyService.Spec.IPFamilies = []corev1.IPFamily{corev1.IPv4Protocol}
modifyService.Spec.ClusterIPs = []string{service.Spec.ClusterIP}
service = serviceClient.Patch(service, modifyService)

output, err = exec.Command("bash", "-c", execCmd).CombinedOutput()
framework.ExpectNoError(err)
framework.ExpectFalse(strings.Contains(string(output), v6ClusterIp), "should not contains v6 cluster ip")

ginkgo.By("recover service from single stack to dual stack ")
recoverService := service.DeepCopy()
*recoverService.Spec.IPFamilyPolicy = *originService.Spec.IPFamilyPolicy
recoverService.Spec.IPFamilies = originService.Spec.IPFamilies
recoverService.Spec.ClusterIPs = originService.Spec.ClusterIPs
_ = serviceClient.Patch(service, recoverService)

output, err = exec.Command("bash", "-c", execCmd).CombinedOutput()
framework.ExpectNoError(err)
framework.ExpectTrue(strings.Contains(string(output), v6ClusterIp), "should contains v6 cluster ip")
})
})

0 comments on commit 5e0c305

Please sign in to comment.