Skip to content

Commit

Permalink
use node ip instead of ovn0 ip when accessing overlay pod/svc from ho…
Browse files Browse the repository at this point in the history
…st network (#2243)
  • Loading branch information
zhangzujian committed Jan 28, 2023
1 parent ea8523c commit 281242e
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 22 deletions.
3 changes: 3 additions & 0 deletions pkg/daemon/config.go
Expand Up @@ -43,6 +43,7 @@ type Configuration struct {
KubeOvnClient clientset.Interface
NodeName string
ServiceClusterIPRange string
NodeSwitch string
NodeLocalDnsIP string
EncapChecksum bool
EnablePprof bool
Expand Down Expand Up @@ -73,6 +74,7 @@ func ParseFlags() *Configuration {
argOvsSocket = pflag.String("ovs-socket", "", "The socket to local ovs-server")
argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.")
argServiceClusterIPRange = pflag.String("service-cluster-ip-range", "10.96.0.0/12", "The kubernetes service cluster ip range")
argNodeSwitch = pflag.String("node-switch", "join", "The name of node gateway switch which help node to access pod network")
argNodeLocalDnsIP = pflag.String("node-local-dns-ip", "", "If use nodelocaldns the local dns server ip should be set here.")
argEncapChecksum = pflag.Bool("encap-checksum", true, "Enable checksum")
argEnablePprof = pflag.Bool("enable-pprof", false, "Enable pprof")
Expand Down Expand Up @@ -125,6 +127,7 @@ func ParseFlags() *Configuration {
MacLearningFallback: *argMacLearningFallback,
NodeName: strings.ToLower(*argNodeName),
ServiceClusterIPRange: *argServiceClusterIPRange,
NodeSwitch: *argNodeSwitch,
NodeLocalDnsIP: *argNodeLocalDnsIP,
EncapChecksum: *argEncapChecksum,
NetworkType: *argsNetworkType,
Expand Down
56 changes: 41 additions & 15 deletions pkg/daemon/controller_linux.go
Expand Up @@ -128,6 +128,7 @@ func (c *Controller) reconcileRouters(event subnetEvent) error {
}
nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)

joinCIDR := make([]string, 0, 2)
cidrs := make([]string, 0, len(subnets)*2)
for _, subnet := range subnets {
if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) || subnet.Spec.Vpc != util.DefaultVpc || !subnet.Status.IsReady() {
Expand All @@ -145,6 +146,9 @@ func (c *Controller) reconcileRouters(event subnetEvent) error {
continue
}
cidrs = append(cidrs, ipNet.String())
if subnet.Name == c.config.NodeSwitch {
joinCIDR = append(joinCIDR, ipNet.String())
}
}
}
}
Expand All @@ -165,23 +169,17 @@ func (c *Controller) reconcileRouters(event subnetEvent) error {
return err
}

toAdd, toDel := routeDiff(existRoutes, cidrs)
toAdd, toDel := routeDiff(existRoutes, cidrs, joinCIDR, gateway, net.ParseIP(nodeIPv4), net.ParseIP(nodeIPv6))
for _, r := range toDel {
_, cidr, _ := net.ParseCIDR(r)
if err = netlink.RouteDel(&netlink.Route{Dst: cidr}); err != nil {
if err = netlink.RouteDel(&netlink.Route{Dst: r.Dst}); err != nil {
klog.Errorf("failed to del route %v", err)
}
}

for _, r := range toAdd {
_, cidr, _ := net.ParseCIDR(r)
for _, gw := range strings.Split(gateway, ",") {
if util.CheckProtocol(gw) != util.CheckProtocol(r) {
continue
}
if err = netlink.RouteReplace(&netlink.Route{Dst: cidr, LinkIndex: nic.Attrs().Index, Scope: netlink.SCOPE_UNIVERSE, Gw: net.ParseIP(gw)}); err != nil {
klog.Errorf("failed to add route %v", err)
}
r.LinkIndex = nic.Attrs().Index
if err = netlink.RouteReplace(&r); err != nil {
klog.Errorf("failed to replace route %v: %v", r, err)
}
}

Expand Down Expand Up @@ -216,7 +214,7 @@ func getNicExistRoutes(nic netlink.Link, gateway string) ([]netlink.Route, error
return existRoutes, nil
}

func routeDiff(existRoutes []netlink.Route, cidrs []string) (toAdd []string, toDel []string) {
func routeDiff(existRoutes []netlink.Route, cidrs, joinCIDR []string, gateway string, srcIPv4, srcIPv6 net.IP) (toAdd, toDel []netlink.Route) {
for _, route := range existRoutes {
if route.Scope == netlink.SCOPE_LINK {
continue
Expand All @@ -230,23 +228,51 @@ func routeDiff(existRoutes []netlink.Route, cidrs []string) (toAdd []string, toD
}
}
if !found {
toDel = append(toDel, route.Dst.String())
toDel = append(toDel, route)
}
}
if len(toDel) > 0 {
klog.Infof("route to del %v", toDel)
}

ipv4, ipv6 := util.SplitStringIP(gateway)
gwV4, gwV6 := net.ParseIP(ipv4), net.ParseIP(ipv6)
for _, c := range cidrs {
if util.ContainsString(joinCIDR, c) {
continue
}

var src, gw net.IP
switch util.CheckProtocol(c) {
case kubeovnv1.ProtocolIPv4:
src, gw = srcIPv4, gwV4
case kubeovnv1.ProtocolIPv6:
src, gw = srcIPv6, gwV6
}

found := false
for _, r := range existRoutes {
if r.Dst.String() == c {
if r.Dst == nil || r.Dst.String() != c {
continue
}
if src == nil {
if r.Src == nil {
found = true
break
}
} else if src.Equal(r.Src) {
found = true
break
}
}
if !found {
toAdd = append(toAdd, c)
_, cidr, _ := net.ParseCIDR(c)
toAdd = append(toAdd, netlink.Route{
Dst: cidr,
Src: src,
Gw: gw,
Scope: netlink.SCOPE_UNIVERSE,
})
}
}
if len(toAdd) > 0 {
Expand Down
18 changes: 14 additions & 4 deletions pkg/daemon/gateway_linux.go
Expand Up @@ -309,7 +309,7 @@ func (c *Controller) createIptablesRule(protocol string, rule util.IPTableRule)

s := strings.Join(rule.Rule, " ")
if exists {
klog.V(3).Infof(`iptables rule "%s" already exists`, s, exists)
klog.V(3).Infof(`iptables rule %q already exists`, s)
return nil
}

Expand Down Expand Up @@ -527,14 +527,14 @@ func (c *Controller) setIptables() error {
continue
}

var kubeProxyIpsetProtocol, matchset string
var kubeProxyIpsetProtocol, matchset, svcMatchset string
var abandonedRules, iptablesRules []util.IPTableRule
if protocol == kubeovnv1.ProtocolIPv4 {
iptablesRules, abandonedRules = v4Rules, v4AbandonedRules
matchset = "ovn40subnets"
matchset, svcMatchset = "ovn40subnets", "ovn40services"
} else {
iptablesRules, abandonedRules = v6Rules, v6AbandonedRules
kubeProxyIpsetProtocol, matchset = "6-", "ovn60subnets"
kubeProxyIpsetProtocol, matchset, svcMatchset = "6-", "ovn60subnets", "ovn60services"
}

if nodeIP := nodeIPs[protocol]; nodeIP != "" {
Expand All @@ -544,6 +544,16 @@ func (c *Controller) setIptables() error {
util.IPTableRule{Table: NAT, Chain: Postrouting, Rule: strings.Fields(fmt.Sprintf(`! -s %s -m set ! --match-set %s src -m set --match-set %s dst -j MASQUERADE`, nodeIP, matchset, matchset))},
)

rules := make([]util.IPTableRule, len(iptablesRules)+1)
copy(rules, iptablesRules[:1])
copy(rules[2:], iptablesRules[1:])
rules[1] = util.IPTableRule{
Table: NAT,
Chain: OvnPostrouting,
Rule: strings.Fields(fmt.Sprintf(`-m set --match-set %s src -m set --match-set %s dst -m mark --mark 0x4000/0x4000 -j SNAT --to-source %s`, svcMatchset, matchset, nodeIP)),
}
iptablesRules = rules

for _, p := range [...]string{"tcp", "udp"} {
ipset := fmt.Sprintf("KUBE-%sNODE-PORT-LOCAL-%s", kubeProxyIpsetProtocol, strings.ToUpper(p))
ipsetExists, err := ipsetExists(ipset)
Expand Down
156 changes: 153 additions & 3 deletions test/e2e/kube-ovn/node/node.go
@@ -1,24 +1,64 @@
package node

import (
"fmt"
"math/rand"
"net"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
clientset "k8s.io/client-go/kubernetes"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"

"github.com/onsi/ginkgo/v2"

apiv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
"github.com/kubeovn/kube-ovn/test/e2e/framework"
"github.com/kubeovn/kube-ovn/test/e2e/framework/iproute"
)

var _ = framework.Describe("[group:node]", func() {
var _ = framework.OrderedDescribe("[group:node]", func() {
f := framework.NewDefaultFramework("node")
f.SkipNamespaceCreation = true

var subnet *apiv1.Subnet
var cs clientset.Interface
var podClient *framework.PodClient
var serviceClient *framework.ServiceClient
var subnetClient *framework.SubnetClient
var podName, hostPodName, serviceName, namespaceName, subnetName, image string
var cidr string
ginkgo.BeforeEach(func() {
cs = f.ClientSet
podClient = f.PodClient()
serviceClient = f.ServiceClient()
subnetClient = f.SubnetClient()
namespaceName = f.Namespace.Name
podName = "pod-" + framework.RandomSuffix()
hostPodName = "pod-" + framework.RandomSuffix()
serviceName = "service-" + framework.RandomSuffix()
subnetName = "subnet-" + framework.RandomSuffix()
cidr = framework.RandomCIDR(f.ClusterIpFamily)

if image == "" {
image = framework.GetKubeOvnImage(cs)
}
})
ginkgo.AfterEach(func() {
ginkgo.By("Deleting service " + serviceName)
serviceClient.DeleteSync(serviceName)

ginkgo.By("Deleting pod " + podName)
podClient.DeleteSync(podName)

ginkgo.By("Deleting pod " + hostPodName)
podClient.DeleteSync(hostPodName)

ginkgo.By("Deleting subnet " + subnetName)
subnetClient.DeleteSync(subnetName)
})

framework.ConformanceIt("should allocate ip in join subnet to node", func() {
Expand All @@ -40,7 +80,117 @@ var _ = framework.Describe("[group:node]", func() {
framework.ExpectMAC(node.Annotations[util.MacAddressAnnotation])
framework.ExpectHaveKeyWithValue(node.Annotations, util.PortNameAnnotation, "node-"+node.Name)

// TODO: check IP/route on ovn0
podName := "pod-" + framework.RandomSuffix()
ginkgo.By("Creating pod " + podName + " with host network")
cmd := []string{"sh", "-c", "sleep infinity"}
pod := framework.MakePod(namespaceName, podName, nil, nil, image, cmd, nil)
pod.Spec.NodeName = node.Name
pod.Spec.HostNetwork = true
pod = podClient.CreateSync(pod)

ginkgo.By("Checking ip addresses on ovn0")
links, err := iproute.AddressShow("ovn0", func(cmd ...string) ([]byte, []byte, error) {
return framework.KubectlExec(pod.Namespace, pod.Name, cmd...)
})
framework.ExpectNoError(err)
framework.ExpectHaveLen(links, 1)
framework.Logf(util.GetIpAddrWithMask(node.Annotations[util.IpAddressAnnotation], join.Spec.CIDRBlock))
ips := strings.Split(util.GetIpAddrWithMask(node.Annotations[util.IpAddressAnnotation], join.Spec.CIDRBlock), ",")
framework.ExpectConsistOf(links[0].NonLinkLocalAddresses(), ips)
}
})

framework.ConformanceIt("should access overlay pods using node ip", func() {
f.SkipVersionPriorTo(1, 12, "This feature was introduce in v1.12")

ginkgo.By("Creating subnet " + subnetName)
subnet = framework.MakeSubnet(subnetName, "", cidr, "", nil, nil, nil)
subnet = subnetClient.CreateSync(subnet)

ginkgo.By("Creating pod " + podName)
annotations := map[string]string{
util.LogicalSwitchAnnotation: subnetName,
}
port := strconv.Itoa(8000 + rand.Intn(1000))
args := []string{"netexec", "--http-port", port}
pod := framework.MakePod(namespaceName, podName, nil, annotations, framework.AgnhostImage, nil, args)
pod = podClient.CreateSync(pod)

ginkgo.By("Creating pod " + hostPodName + " with host network")
cmd := []string{"sh", "-c", "sleep infinity"}
hostPod := framework.MakePod(namespaceName, hostPodName, nil, nil, image, cmd, nil)
hostPod.Spec.HostNetwork = true
hostPod = podClient.CreateSync(hostPod)

ginkgo.By("Validating client ip")
nodeIPs := make([]string, 0, len(hostPod.Status.PodIPs))
for _, podIP := range hostPod.Status.PodIPs {
nodeIPs = append(nodeIPs, podIP.IP)
}
for _, podIP := range pod.Status.PodIPs {
ip := podIP.IP
protocol := strings.ToLower(util.CheckProtocol(ip))
ginkgo.By("Checking connection from " + hostPodName + " to " + podName + " via " + protocol)
cmd := fmt.Sprintf("curl -q -s --connect-timeout 5 %s/clientip", net.JoinHostPort(ip, port))
ginkgo.By(fmt.Sprintf(`Executing %q in pod %s/%s`, cmd, namespaceName, hostPodName))
output := e2epodoutput.RunHostCmdOrDie(namespaceName, hostPodName, cmd)
client, _, err := net.SplitHostPort(strings.TrimSpace(output))
framework.ExpectNoError(err)
framework.ExpectContainElement(nodeIPs, client)
}
})

framework.ConformanceIt("should access overlay services using node ip", func() {
f.SkipVersionPriorTo(1, 12, "This feature was introduce in v1.12")

ginkgo.By("Creating subnet " + subnetName)
subnet = framework.MakeSubnet(subnetName, "", cidr, "", nil, nil, nil)
subnet = subnetClient.CreateSync(subnet)

ginkgo.By("Creating pod " + podName)
podLabels := map[string]string{"app": podName}
annotations := map[string]string{
util.LogicalSwitchAnnotation: subnetName,
}
port := 8000 + rand.Intn(1000)
portStr := strconv.Itoa(port)
args := []string{"netexec", "--http-port", portStr}
pod := framework.MakePod(namespaceName, podName, podLabels, annotations, framework.AgnhostImage, nil, args)
_ = podClient.CreateSync(pod)

ginkgo.By("Creating service " + serviceName)
ports := []corev1.ServicePort{{
Name: "tcp",
Protocol: corev1.ProtocolTCP,
Port: int32(port),
TargetPort: intstr.FromInt(port),
}}
service := framework.MakeService(serviceName, "", nil, podLabels, ports, "")
service.Spec.IPFamilyPolicy = new(corev1.IPFamilyPolicy)
*service.Spec.IPFamilyPolicy = corev1.IPFamilyPolicyPreferDualStack
_ = serviceClient.CreateSync(service)

ginkgo.By("Creating pod " + hostPodName + " with host network")
cmd := []string{"sh", "-c", "sleep infinity"}
hostPod := framework.MakePod(namespaceName, hostPodName, nil, nil, image, cmd, nil)
hostPod.Spec.HostNetwork = true
hostPod = podClient.CreateSync(hostPod)

ginkgo.By("Validating client ip")
nodeIPs := make([]string, 0, len(hostPod.Status.PodIPs))
for _, podIP := range hostPod.Status.PodIPs {
nodeIPs = append(nodeIPs, podIP.IP)
}
service = serviceClient.Get(serviceName)
for _, ip := range service.Spec.ClusterIPs {
protocol := strings.ToLower(util.CheckProtocol(ip))
ginkgo.By("Checking connection from " + hostPodName + " to " + serviceName + " via " + protocol)
cmd := fmt.Sprintf("curl -q -s --connect-timeout 5 %s/clientip", net.JoinHostPort(ip, portStr))
ginkgo.By(fmt.Sprintf(`Executing %q in pod %s/%s`, cmd, namespaceName, hostPodName))
output := e2epodoutput.RunHostCmdOrDie(namespaceName, hostPodName, cmd)
client, _, err := net.SplitHostPort(strings.TrimSpace(output))
framework.ExpectNoError(err)
framework.ExpectContainElement(nodeIPs, client)
}
})
})

0 comments on commit 281242e

Please sign in to comment.