diff --git a/pkg/daemon/config.go b/pkg/daemon/config.go index 653697d44fb..97636c2a611 100644 --- a/pkg/daemon/config.go +++ b/pkg/daemon/config.go @@ -43,6 +43,7 @@ type Configuration struct { KubeOvnClient clientset.Interface NodeName string ServiceClusterIPRange string + NodeSwitch string NodeLocalDnsIP string EncapChecksum bool EnablePprof bool @@ -73,6 +74,7 @@ func ParseFlags() *Configuration { argOvsSocket = pflag.String("ovs-socket", "", "The socket to local ovs-server") argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.") argServiceClusterIPRange = pflag.String("service-cluster-ip-range", "10.96.0.0/12", "The kubernetes service cluster ip range") + argNodeSwitch = pflag.String("node-switch", "join", "The name of node gateway switch which help node to access pod network") argNodeLocalDnsIP = pflag.String("node-local-dns-ip", "", "If use nodelocaldns the local dns server ip should be set here.") argEncapChecksum = pflag.Bool("encap-checksum", true, "Enable checksum") argEnablePprof = pflag.Bool("enable-pprof", false, "Enable pprof") @@ -125,6 +127,7 @@ func ParseFlags() *Configuration { MacLearningFallback: *argMacLearningFallback, NodeName: strings.ToLower(*argNodeName), ServiceClusterIPRange: *argServiceClusterIPRange, + NodeSwitch: *argNodeSwitch, NodeLocalDnsIP: *argNodeLocalDnsIP, EncapChecksum: *argEncapChecksum, NetworkType: *argsNetworkType, diff --git a/pkg/daemon/controller_linux.go b/pkg/daemon/controller_linux.go index db44e253e8b..eb4cc0bbf94 100644 --- a/pkg/daemon/controller_linux.go +++ b/pkg/daemon/controller_linux.go @@ -128,6 +128,7 @@ func (c *Controller) reconcileRouters(event subnetEvent) error { } nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node) + joinCIDR := make([]string, 0, 2) cidrs := make([]string, 0, len(subnets)*2) for _, subnet := range subnets { if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != util.DefaultVpc || !subnet.Status.IsReady() { @@ -145,6 +146,9 @@ func (c *Controller) reconcileRouters(event subnetEvent) error { continue } cidrs = append(cidrs, ipNet.String()) + if subnet.Name == c.config.NodeSwitch { + joinCIDR = append(joinCIDR, ipNet.String()) + } } } } @@ -165,23 +169,17 @@ func (c *Controller) reconcileRouters(event subnetEvent) error { return err } - toAdd, toDel := routeDiff(existRoutes, cidrs) + toAdd, toDel := routeDiff(existRoutes, cidrs, joinCIDR, gateway, net.ParseIP(nodeIPv4), net.ParseIP(nodeIPv6)) for _, r := range toDel { - _, cidr, _ := net.ParseCIDR(r) - if err = netlink.RouteDel(&netlink.Route{Dst: cidr}); err != nil { + if err = netlink.RouteDel(&netlink.Route{Dst: r.Dst}); err != nil { klog.Errorf("failed to del route %v", err) } } for _, r := range toAdd { - _, cidr, _ := net.ParseCIDR(r) - for _, gw := range strings.Split(gateway, ",") { - if util.CheckProtocol(gw) != util.CheckProtocol(r) { - continue - } - if err = netlink.RouteReplace(&netlink.Route{Dst: cidr, LinkIndex: nic.Attrs().Index, Scope: netlink.SCOPE_UNIVERSE, Gw: net.ParseIP(gw)}); err != nil { - klog.Errorf("failed to add route %v", err) - } + r.LinkIndex = nic.Attrs().Index + if err = netlink.RouteReplace(&r); err != nil { + klog.Errorf("failed to replace route %v: %v", r, err) } } @@ -216,7 +214,7 @@ func getNicExistRoutes(nic netlink.Link, gateway string) ([]netlink.Route, error return existRoutes, nil } -func routeDiff(existRoutes []netlink.Route, cidrs []string) (toAdd []string, toDel []string) { +func routeDiff(existRoutes []netlink.Route, cidrs, joinCIDR []string, gateway string, srcIPv4, srcIPv6 net.IP) (toAdd, toDel []netlink.Route) { for _, route := range existRoutes { if route.Scope == netlink.SCOPE_LINK { continue @@ -230,23 +228,51 @@ func routeDiff(existRoutes []netlink.Route, cidrs []string) (toAdd []string, toD } } if !found { - toDel = append(toDel, route.Dst.String()) + toDel = append(toDel, route) } } if len(toDel) > 0 { klog.Infof("route to del %v", toDel) } + ipv4, ipv6 := util.SplitStringIP(gateway) + gwV4, gwV6 := net.ParseIP(ipv4), net.ParseIP(ipv6) for _, c := range cidrs { + if util.ContainsString(joinCIDR, c) { + continue + } + + var src, gw net.IP + switch util.CheckProtocol(c) { + case kubeovnv1.ProtocolIPv4: + src, gw = srcIPv4, gwV4 + case kubeovnv1.ProtocolIPv6: + src, gw = srcIPv6, gwV6 + } + found := false for _, r := range existRoutes { - if r.Dst.String() == c { + if r.Dst == nil || r.Dst.String() != c { + continue + } + if src == nil { + if r.Src == nil { + found = true + break + } + } else if src.Equal(r.Src) { found = true break } } if !found { - toAdd = append(toAdd, c) + _, cidr, _ := net.ParseCIDR(c) + toAdd = append(toAdd, netlink.Route{ + Dst: cidr, + Src: src, + Gw: gw, + Scope: netlink.SCOPE_UNIVERSE, + }) } } if len(toAdd) > 0 { diff --git a/pkg/daemon/gateway_linux.go b/pkg/daemon/gateway_linux.go index aadfca47694..dd6542ccfc2 100644 --- a/pkg/daemon/gateway_linux.go +++ b/pkg/daemon/gateway_linux.go @@ -309,7 +309,7 @@ func (c *Controller) createIptablesRule(protocol string, rule util.IPTableRule) s := strings.Join(rule.Rule, " ") if exists { - klog.V(3).Infof(`iptables rule "%s" already exists`, s, exists) + klog.V(3).Infof(`iptables rule %q already exists`, s) return nil } @@ -527,14 +527,14 @@ func (c *Controller) setIptables() error { continue } - var kubeProxyIpsetProtocol, matchset string + var kubeProxyIpsetProtocol, matchset, svcMatchset string var abandonedRules, iptablesRules []util.IPTableRule if protocol == kubeovnv1.ProtocolIPv4 { iptablesRules, abandonedRules = v4Rules, v4AbandonedRules - matchset = "ovn40subnets" + matchset, svcMatchset = "ovn40subnets", "ovn40services" } else { iptablesRules, abandonedRules = v6Rules, v6AbandonedRules - kubeProxyIpsetProtocol, matchset = "6-", "ovn60subnets" + kubeProxyIpsetProtocol, matchset, svcMatchset = "6-", "ovn60subnets", "ovn60services" } if nodeIP := nodeIPs[protocol]; nodeIP != "" { @@ -544,6 +544,16 @@ func (c *Controller) setIptables() error { util.IPTableRule{Table: NAT, Chain: Postrouting, Rule: strings.Fields(fmt.Sprintf(`! -s %s -m set ! --match-set %s src -m set --match-set %s dst -j MASQUERADE`, nodeIP, matchset, matchset))}, ) + rules := make([]util.IPTableRule, len(iptablesRules)+1) + copy(rules, iptablesRules[:1]) + copy(rules[2:], iptablesRules[1:]) + rules[1] = util.IPTableRule{ + Table: NAT, + Chain: OvnPostrouting, + Rule: strings.Fields(fmt.Sprintf(`-m set --match-set %s src -m set --match-set %s dst -m mark --mark 0x4000/0x4000 -j SNAT --to-source %s`, svcMatchset, matchset, nodeIP)), + } + iptablesRules = rules + for _, p := range [...]string{"tcp", "udp"} { ipset := fmt.Sprintf("KUBE-%sNODE-PORT-LOCAL-%s", kubeProxyIpsetProtocol, strings.ToUpper(p)) ipsetExists, err := ipsetExists(ipset) diff --git a/test/e2e/kube-ovn/node/node.go b/test/e2e/kube-ovn/node/node.go index d3df826caab..e9fd8ab57e7 100644 --- a/test/e2e/kube-ovn/node/node.go +++ b/test/e2e/kube-ovn/node/node.go @@ -1,24 +1,64 @@ package node import ( + "fmt" + "math/rand" + "net" + "strconv" + "strings" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" clientset "k8s.io/client-go/kubernetes" e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output" "github.com/onsi/ginkgo/v2" + apiv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" "github.com/kubeovn/kube-ovn/pkg/util" "github.com/kubeovn/kube-ovn/test/e2e/framework" + "github.com/kubeovn/kube-ovn/test/e2e/framework/iproute" ) -var _ = framework.Describe("[group:node]", func() { +var _ = framework.OrderedDescribe("[group:node]", func() { f := framework.NewDefaultFramework("node") - f.SkipNamespaceCreation = true + var subnet *apiv1.Subnet var cs clientset.Interface + var podClient *framework.PodClient + var serviceClient *framework.ServiceClient var subnetClient *framework.SubnetClient + var podName, hostPodName, serviceName, namespaceName, subnetName, image string + var cidr string ginkgo.BeforeEach(func() { cs = f.ClientSet + podClient = f.PodClient() + serviceClient = f.ServiceClient() subnetClient = f.SubnetClient() + namespaceName = f.Namespace.Name + podName = "pod-" + framework.RandomSuffix() + hostPodName = "pod-" + framework.RandomSuffix() + serviceName = "service-" + framework.RandomSuffix() + subnetName = "subnet-" + framework.RandomSuffix() + cidr = framework.RandomCIDR(f.ClusterIpFamily) + + if image == "" { + image = framework.GetKubeOvnImage(cs) + } + }) + ginkgo.AfterEach(func() { + ginkgo.By("Deleting service " + serviceName) + serviceClient.DeleteSync(serviceName) + + ginkgo.By("Deleting pod " + podName) + podClient.DeleteSync(podName) + + ginkgo.By("Deleting pod " + hostPodName) + podClient.DeleteSync(hostPodName) + + ginkgo.By("Deleting subnet " + subnetName) + subnetClient.DeleteSync(subnetName) }) framework.ConformanceIt("should allocate ip in join subnet to node", func() { @@ -40,7 +80,117 @@ var _ = framework.Describe("[group:node]", func() { framework.ExpectMAC(node.Annotations[util.MacAddressAnnotation]) framework.ExpectHaveKeyWithValue(node.Annotations, util.PortNameAnnotation, "node-"+node.Name) - // TODO: check IP/route on ovn0 + podName := "pod-" + framework.RandomSuffix() + ginkgo.By("Creating pod " + podName + " with host network") + cmd := []string{"sh", "-c", "sleep infinity"} + pod := framework.MakePod(namespaceName, podName, nil, nil, image, cmd, nil) + pod.Spec.NodeName = node.Name + pod.Spec.HostNetwork = true + pod = podClient.CreateSync(pod) + + ginkgo.By("Checking ip addresses on ovn0") + links, err := iproute.AddressShow("ovn0", func(cmd ...string) ([]byte, []byte, error) { + return framework.KubectlExec(pod.Namespace, pod.Name, cmd...) + }) + framework.ExpectNoError(err) + framework.ExpectHaveLen(links, 1) + framework.Logf(util.GetIpAddrWithMask(node.Annotations[util.IpAddressAnnotation], join.Spec.CIDRBlock)) + ips := strings.Split(util.GetIpAddrWithMask(node.Annotations[util.IpAddressAnnotation], join.Spec.CIDRBlock), ",") + framework.ExpectConsistOf(links[0].NonLinkLocalAddresses(), ips) + } + }) + + framework.ConformanceIt("should access overlay pods using node ip", func() { + f.SkipVersionPriorTo(1, 12, "This feature was introduce in v1.12") + + ginkgo.By("Creating subnet " + subnetName) + subnet = framework.MakeSubnet(subnetName, "", cidr, "", nil, nil, nil) + subnet = subnetClient.CreateSync(subnet) + + ginkgo.By("Creating pod " + podName) + annotations := map[string]string{ + util.LogicalSwitchAnnotation: subnetName, + } + port := strconv.Itoa(8000 + rand.Intn(1000)) + args := []string{"netexec", "--http-port", port} + pod := framework.MakePod(namespaceName, podName, nil, annotations, framework.AgnhostImage, nil, args) + pod = podClient.CreateSync(pod) + + ginkgo.By("Creating pod " + hostPodName + " with host network") + cmd := []string{"sh", "-c", "sleep infinity"} + hostPod := framework.MakePod(namespaceName, hostPodName, nil, nil, image, cmd, nil) + hostPod.Spec.HostNetwork = true + hostPod = podClient.CreateSync(hostPod) + + ginkgo.By("Validating client ip") + nodeIPs := make([]string, 0, len(hostPod.Status.PodIPs)) + for _, podIP := range hostPod.Status.PodIPs { + nodeIPs = append(nodeIPs, podIP.IP) + } + for _, podIP := range pod.Status.PodIPs { + ip := podIP.IP + protocol := strings.ToLower(util.CheckProtocol(ip)) + ginkgo.By("Checking connection from " + hostPodName + " to " + podName + " via " + protocol) + cmd := fmt.Sprintf("curl -q -s --connect-timeout 5 %s/clientip", net.JoinHostPort(ip, port)) + ginkgo.By(fmt.Sprintf(`Executing %q in pod %s/%s`, cmd, namespaceName, hostPodName)) + output := e2epodoutput.RunHostCmdOrDie(namespaceName, hostPodName, cmd) + client, _, err := net.SplitHostPort(strings.TrimSpace(output)) + framework.ExpectNoError(err) + framework.ExpectContainElement(nodeIPs, client) + } + }) + + framework.ConformanceIt("should access overlay services using node ip", func() { + f.SkipVersionPriorTo(1, 12, "This feature was introduce in v1.12") + + ginkgo.By("Creating subnet " + subnetName) + subnet = framework.MakeSubnet(subnetName, "", cidr, "", nil, nil, nil) + subnet = subnetClient.CreateSync(subnet) + + ginkgo.By("Creating pod " + podName) + podLabels := map[string]string{"app": podName} + annotations := map[string]string{ + util.LogicalSwitchAnnotation: subnetName, + } + port := 8000 + rand.Intn(1000) + portStr := strconv.Itoa(port) + args := []string{"netexec", "--http-port", portStr} + pod := framework.MakePod(namespaceName, podName, podLabels, annotations, framework.AgnhostImage, nil, args) + _ = podClient.CreateSync(pod) + + ginkgo.By("Creating service " + serviceName) + ports := []corev1.ServicePort{{ + Name: "tcp", + Protocol: corev1.ProtocolTCP, + Port: int32(port), + TargetPort: intstr.FromInt(port), + }} + service := framework.MakeService(serviceName, "", nil, podLabels, ports, "") + service.Spec.IPFamilyPolicy = new(corev1.IPFamilyPolicy) + *service.Spec.IPFamilyPolicy = corev1.IPFamilyPolicyPreferDualStack + _ = serviceClient.CreateSync(service) + + ginkgo.By("Creating pod " + hostPodName + " with host network") + cmd := []string{"sh", "-c", "sleep infinity"} + hostPod := framework.MakePod(namespaceName, hostPodName, nil, nil, image, cmd, nil) + hostPod.Spec.HostNetwork = true + hostPod = podClient.CreateSync(hostPod) + + ginkgo.By("Validating client ip") + nodeIPs := make([]string, 0, len(hostPod.Status.PodIPs)) + for _, podIP := range hostPod.Status.PodIPs { + nodeIPs = append(nodeIPs, podIP.IP) + } + service = serviceClient.Get(serviceName) + for _, ip := range service.Spec.ClusterIPs { + protocol := strings.ToLower(util.CheckProtocol(ip)) + ginkgo.By("Checking connection from " + hostPodName + " to " + serviceName + " via " + protocol) + cmd := fmt.Sprintf("curl -q -s --connect-timeout 5 %s/clientip", net.JoinHostPort(ip, portStr)) + ginkgo.By(fmt.Sprintf(`Executing %q in pod %s/%s`, cmd, namespaceName, hostPodName)) + output := e2epodoutput.RunHostCmdOrDie(namespaceName, hostPodName, cmd) + client, _, err := net.SplitHostPort(strings.TrimSpace(output)) + framework.ExpectNoError(err) + framework.ExpectContainElement(nodeIPs, client) } }) })