Skip to content

Commit

Permalink
fix nat-outgoing/policy-routing on pod startup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzujian committed Sep 8, 2021
1 parent fd3fdf2 commit ea723d6
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 107 deletions.
6 changes: 4 additions & 2 deletions pkg/daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -62,8 +63,9 @@ type Controller struct {

recorder record.EventRecorder

iptable map[string]*iptables.IPTables
ipset map[string]*ipsets.IPSets
iptable map[string]*iptables.IPTables
ipset map[string]*ipsets.IPSets
ipsetLock sync.Mutex

protocol string
}
Expand Down
103 changes: 78 additions & 25 deletions pkg/daemon/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func (c *Controller) runGateway() {
}

func (c *Controller) setIPSet() error {
c.ipsetLock.Lock()
defer c.ipsetLock.Unlock()

protocols := make([]string, 2)
if c.protocol == kubeovnv1.ProtocolDual {
protocols[0] = kubeovnv1.ProtocolIPv4
Expand Down Expand Up @@ -226,6 +229,9 @@ func (c *Controller) removeEgressConfig(subnet, ip string) error {
}

func (c *Controller) addIPSetMembers(setID, protocol string, ips []string) {
c.ipsetLock.Lock()
defer c.ipsetLock.Unlock()

if protocol == kubeovnv1.ProtocolDual {
if c.ipset[kubeovnv1.ProtocolIPv4] != nil {
c.ipset[kubeovnv1.ProtocolIPv4].AddMembers(setID, ips[:1])
Expand All @@ -242,6 +248,9 @@ func (c *Controller) addIPSetMembers(setID, protocol string, ips []string) {
}

func (c *Controller) removeIPSetMembers(setID, protocol string, ips []string) {
c.ipsetLock.Lock()
defer c.ipsetLock.Unlock()

if protocol == kubeovnv1.ProtocolDual {
if c.ipset[kubeovnv1.ProtocolIPv4] != nil {
c.ipset[kubeovnv1.ProtocolIPv4].RemoveMembers(setID, ips[:1])
Expand Down Expand Up @@ -669,8 +678,9 @@ func (c *Controller) getLocalPodIPsNeedNAT(protocol string) ([]string, error) {
}
for _, pod := range allPods {
if pod.Spec.HostNetwork ||
pod.Status.PodIP == "" ||
pod.Annotations[util.LogicalSwitchAnnotation] == "" {
pod.DeletionTimestamp != nil ||
pod.Annotations[util.LogicalSwitchAnnotation] == "" ||
pod.Annotations[util.IpAddressAnnotation] == "" {
continue
}
subnet, err := c.subnetsLister.Get(pod.Annotations[util.LogicalSwitchAnnotation])
Expand All @@ -679,16 +689,33 @@ func (c *Controller) getLocalPodIPsNeedNAT(protocol string) ([]string, error) {
continue
}

nsGWType := subnet.Spec.GatewayType
nsGWNat := subnet.Spec.NatOutgoing
if nsGWNat &&
if subnet.Spec.NatOutgoing &&
subnet.Spec.Vpc == util.DefaultVpc &&
nsGWType == kubeovnv1.GWDistributedType &&
subnet.Spec.GatewayType == kubeovnv1.GWDistributedType &&
pod.Spec.NodeName == hostname {
if len(pod.Status.PodIPs) == 2 && protocol == kubeovnv1.ProtocolIPv6 {
localPodIPs = append(localPodIPs, pod.Status.PodIPs[1].IP)
} else if util.CheckProtocol(pod.Status.PodIP) == protocol {
localPodIPs = append(localPodIPs, pod.Status.PodIP)
if pod.Status.Phase == v1.PodPending {
var containerCreating bool
for _, s := range pod.Status.ContainerStatuses {
if s.State.Waiting != nil && s.State.Waiting.Reason == "ContainerCreating" {
containerCreating = true
break
}
}
if containerCreating {
ipv4, ipv6 := util.SplitStringIP(pod.Annotations[util.IpAddressAnnotation])
if ipv4 != "" && protocol == kubeovnv1.ProtocolIPv4 {
localPodIPs = append(localPodIPs, ipv4)
}
if ipv6 != "" && protocol == kubeovnv1.ProtocolIPv6 {
localPodIPs = append(localPodIPs, ipv6)
}
}
} else if len(pod.Status.PodIPs) != 0 {
if len(pod.Status.PodIPs) == 2 && protocol == kubeovnv1.ProtocolIPv6 {
localPodIPs = append(localPodIPs, pod.Status.PodIPs[1].IP)
} else if util.CheckProtocol(pod.Status.PodIP) == protocol {
localPodIPs = append(localPodIPs, pod.Status.PodIP)
}
}
}
}
Expand All @@ -709,8 +736,8 @@ func (c *Controller) getLocalPodIPsNeedPR(protocol string) (map[policyRouteMeta]
for _, pod := range allPods {
if pod.Spec.HostNetwork ||
pod.DeletionTimestamp != nil ||
pod.Status.PodIP == "" ||
pod.Annotations[util.LogicalSwitchAnnotation] == "" {
pod.Annotations[util.LogicalSwitchAnnotation] == "" ||
pod.Annotations[util.IpAddressAnnotation] == "" {
continue
}

Expand All @@ -724,22 +751,48 @@ func (c *Controller) getLocalPodIPsNeedPR(protocol string) (map[policyRouteMeta]
subnet.Spec.Vpc == util.DefaultVpc &&
subnet.Spec.GatewayType == kubeovnv1.GWDistributedType &&
pod.Spec.NodeName == hostname {
meta := policyRouteMeta{
priority: subnet.Spec.PolicyRoutingPriority,
tableID: subnet.Spec.PolicyRoutingTableID,
ips := make([]string, 0, 2)
if pod.Status.Phase == v1.PodPending {
var containerCreating bool
for _, s := range pod.Status.ContainerStatuses {
if s.State.Waiting != nil && s.State.Waiting.Reason == "ContainerCreating" {
containerCreating = true
break
}
}
if containerCreating {
ipv4, ipv6 := util.SplitStringIP(pod.Annotations[util.IpAddressAnnotation])
if ipv4 != "" && protocol == kubeovnv1.ProtocolIPv4 {
ips = append(ips, ipv4)
}
if ipv6 != "" && protocol == kubeovnv1.ProtocolIPv6 {
ips = append(ips, ipv6)
}
}
} else if len(pod.Status.PodIPs) != 0 {
for _, ip := range pod.Status.PodIPs {
ips = append(ips, ip.IP)
}
}

egw := strings.Split(subnet.Spec.ExternalEgressGateway, ",")
if util.CheckProtocol(egw[0]) == protocol {
meta.gateway = egw[0]
if util.CheckProtocol(pod.Status.PodIPs[0].IP) == protocol {
localPodIPs[meta] = append(localPodIPs[meta], pod.Status.PodIPs[0].IP)
} else if len(pod.Status.PodIPs) == 2 {
localPodIPs[meta] = append(localPodIPs[meta], pod.Status.PodIPs[1].IP)
if len(ips) != 0 {
meta := policyRouteMeta{
priority: subnet.Spec.PolicyRoutingPriority,
tableID: subnet.Spec.PolicyRoutingTableID,
}

egw := strings.Split(subnet.Spec.ExternalEgressGateway, ",")
if util.CheckProtocol(egw[0]) == protocol {
meta.gateway = egw[0]
if util.CheckProtocol(ips[0]) == protocol {
localPodIPs[meta] = append(localPodIPs[meta], ips[0])
} else {
localPodIPs[meta] = append(localPodIPs[meta], ips[1])
}
} else if len(egw) == 2 && len(ips) == 2 {
meta.gateway = egw[1]
localPodIPs[meta] = append(localPodIPs[meta], ips[1])
}
} else if len(egw) == 2 && len(pod.Status.PodIPs) == 2 {
meta.gateway = egw[1]
localPodIPs[meta] = append(localPodIPs[meta], pod.Status.PodIPs[1].IP)
}
}
}
Expand Down
91 changes: 37 additions & 54 deletions test/e2e/subnet/normal.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,6 @@ var _ = Describe("[Subnet]", func() {
Expect(subnet.Spec.ExternalEgressGateway).To(Equal(egw))
Expect(subnet.Spec.PolicyRoutingPriority).To(Equal(priority))
Expect(subnet.Spec.PolicyRoutingTableID).To(Equal(tableID))
time.Sleep(5 * time.Second)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -535,14 +534,8 @@ var _ = Describe("[Subnet]", func() {
_, err = f.KubeClientSet.CoreV1().Pods(namespace).Create(context.Background(), pod, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())

_, err = f.WaitPodReady(name, namespace)
Expect(err).NotTo(HaveOccurred())

pod, err = f.KubeClientSet.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{})
pod, err = f.WaitPodReady(name, namespace)
Expect(err).NotTo(HaveOccurred())
Expect(pod.Annotations[util.AllocatedAnnotation]).To(Equal("true"))
Expect(pod.Annotations[util.RoutedAnnotation]).To(Equal("true"))
time.Sleep(1 * time.Second)

ovsPods, err := f.KubeClientSet.CoreV1().Pods("kube-system").List(context.Background(), metav1.ListOptions{LabelSelector: "app=ovs"})
Expect(err).NotTo(HaveOccurred())
Expand All @@ -551,74 +544,64 @@ var _ = Describe("[Subnet]", func() {
ruleSuffix := fmt.Sprintf("from %s lookup %d", pod.Status.PodIP, tableID)
routePrefix := fmt.Sprintf("default via %s ", egw)

for _, ovsPod := range ovsPods.Items {
if ovsPod.Status.HostIP != nodeIPv4 && ovsPod.Status.HostIP != nodeIPv6 {
continue
var ovsPod *corev1.Pod
for i := range ovsPods.Items {
if ovsPods.Items[i].Spec.NodeName == selectedNode.Name {
ovsPod = &ovsPods.Items[i]
break
}
}
Expect(ovsPod).NotTo(BeNil())

stdout, _, err := f.ExecToPodThroughAPI(fmt.Sprintf("ip -%d rule show", af), "openvswitch", ovsPod.Name, ovsPod.Namespace, nil)
Expect(err).NotTo(HaveOccurred())
stdout, _, err := f.ExecToPodThroughAPI(fmt.Sprintf("ip -%d rule show", af), "openvswitch", ovsPod.Name, ovsPod.Namespace, nil)
Expect(err).NotTo(HaveOccurred())

var found bool
rules := strings.Split(stdout, "\n")
for _, rule := range rules {
if strings.HasPrefix(rule, rulePrefix) && strings.HasSuffix(rule, ruleSuffix) {
found = true
break
}
var found bool
rules := strings.Split(stdout, "\n")
for _, rule := range rules {
if strings.HasPrefix(rule, rulePrefix) && strings.HasSuffix(rule, ruleSuffix) {
found = true
break
}
Expect(found).To(BeTrue())

stdout, _, err = f.ExecToPodThroughAPI(fmt.Sprintf("ip -%d route show table %d", af, tableID), "openvswitch", ovsPod.Name, ovsPod.Namespace, nil)
Expect(err).NotTo(HaveOccurred())
Expect(stdout).To(HavePrefix(routePrefix))
}
Expect(found).To(BeTrue())

stdout, _, err = f.ExecToPodThroughAPI(fmt.Sprintf("ip -%d route show table %d", af, tableID), "openvswitch", ovsPod.Name, ovsPod.Namespace, nil)
Expect(err).NotTo(HaveOccurred())
Expect(stdout).To(HavePrefix(routePrefix))

By("delete pod")
err = f.KubeClientSet.CoreV1().Pods(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())

err = f.WaitPodDeleted(name, namespace)
Expect(err).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)

for _, ovsPod := range ovsPods.Items {
if ovsPod.Status.HostIP != nodeIPv4 && ovsPod.Status.HostIP != nodeIPv6 {
continue
}

stdout, _, err := f.ExecToPodThroughAPI(fmt.Sprintf("ip -%d rule show", af), "openvswitch", ovsPod.Name, ovsPod.Namespace, nil)
Expect(err).NotTo(HaveOccurred())
stdout, _, err = f.ExecToPodThroughAPI(fmt.Sprintf("ip -%d rule show", af), "openvswitch", ovsPod.Name, ovsPod.Namespace, nil)
Expect(err).NotTo(HaveOccurred())

var found bool
rules := strings.Split(stdout, "\n")
for _, rule := range rules {
if strings.HasPrefix(rule, rulePrefix) && strings.HasSuffix(rule, ruleSuffix) {
found = true
break
}
found = false
rules = strings.Split(stdout, "\n")
for _, rule := range rules {
if strings.HasPrefix(rule, rulePrefix) && strings.HasSuffix(rule, ruleSuffix) {
found = true
break
}
Expect(found).NotTo(BeTrue())

stdout, _, err = f.ExecToPodThroughAPI(fmt.Sprintf("ip -%d route show table %d", af, tableID), "openvswitch", ovsPod.Name, ovsPod.Namespace, nil)
Expect(err).NotTo(HaveOccurred())
Expect(stdout).To(HavePrefix(routePrefix))
}
Expect(found).NotTo(BeTrue())

stdout, _, err = f.ExecToPodThroughAPI(fmt.Sprintf("ip -%d route show table %d", af, tableID), "openvswitch", ovsPod.Name, ovsPod.Namespace, nil)
Expect(err).NotTo(HaveOccurred())
Expect(stdout).To(HavePrefix(routePrefix))

By("delete subnet")
err = f.OvnClientSet.KubeovnV1().Subnets().Delete(context.Background(), name, metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
time.Sleep(5 * time.Second)

for _, ovsPod := range ovsPods.Items {
if ovsPod.Status.HostIP != nodeIPv4 && ovsPod.Status.HostIP != nodeIPv6 {
continue
}

stdout, _, err := f.ExecToPodThroughAPI(fmt.Sprintf("ip -%d route show table %d", af, tableID), "openvswitch", ovsPod.Name, ovsPod.Namespace, nil)
Expect(err).NotTo(HaveOccurred())
Expect(stdout).NotTo(HavePrefix(routePrefix))
}
stdout, _, err = f.ExecToPodThroughAPI(fmt.Sprintf("ip -%d route show table %d", af, tableID), "openvswitch", ovsPod.Name, ovsPod.Namespace, nil)
Expect(err).NotTo(HaveOccurred())
Expect(stdout).NotTo(HavePrefix(routePrefix))
})
})

Expand Down
38 changes: 12 additions & 26 deletions test/e2e/underlay/underlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,9 @@ var _ = Describe("[Underlay]", func() {
Expect(err).NotTo(HaveOccurred())
Expect(ovsPods).NotTo(BeNil())
for _, node := range nodes.Items {
var hostIP string
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
hostIP = addr.Address
break
}
}
Expect(hostIP).NotTo(BeEmpty())

var ovsPod *corev1.Pod
for _, pod := range ovsPods.Items {
if pod.Status.HostIP == hostIP {
if pod.Spec.NodeName == node.Name {
ovsPod = &pod
break
}
Expand Down Expand Up @@ -196,18 +187,9 @@ var _ = Describe("[Underlay]", func() {

cniPods = make(map[string]corev1.Pod)
for _, node := range nodeList.Items {
var nodeIP string
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
nodeIP = addr.Address
break
}
}
Expect(nodeIP).NotTo(BeEmpty())

var cniPod *corev1.Pod
for _, pod := range podList.Items {
if pod.Status.HostIP == nodeIP {
if pod.Spec.NodeName == node.Name {
cniPod = &pod
break
}
Expand Down Expand Up @@ -566,6 +548,10 @@ var _ = Describe("[Underlay]", func() {
})

It("o2u", func() {
if strings.EqualFold(os.Getenv("IPV6"), "true") {
return
}

By("create underlay pod")
var autoMount bool
upod := &corev1.Pod{
Expand Down Expand Up @@ -611,12 +597,12 @@ var _ = Describe("[Underlay]", func() {
}
_, err = f.KubeClientSet.CoreV1().Pods(opod.Namespace).Create(context.Background(), opod, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
opod, err = f.WaitPodReady(opod.Name, upod.Namespace)
opod, err = f.WaitPodReady(opod.Name, opod.Namespace)
Expect(err).NotTo(HaveOccurred())

By("get underlay pod's netns")
cniPod := cniPods[upod.Spec.NodeName]
cmd := fmt.Sprintf("ovs-vsctl --no-heading --columns=external_ids find interface external-ids:pod_name=%s external-ids:pod_namespace=%s", upod.Name, upod.Namespace)
By("get overlay pod's netns")
cniPod := cniPods[opod.Spec.NodeName]
cmd := fmt.Sprintf("ovs-vsctl --no-heading --columns=external_ids find interface external-ids:pod_name=%s external-ids:pod_namespace=%s", opod.Name, opod.Namespace)
stdout, _, err := f.ExecToPodThroughAPI(cmd, "cni-server", cniPod.Name, cniPod.Namespace, nil)
Expect(err).NotTo(HaveOccurred())
var netns string
Expand All @@ -629,8 +615,8 @@ var _ = Describe("[Underlay]", func() {
}
Expect(netns).NotTo(BeEmpty())

By("ping overlay pod")
cmd = fmt.Sprintf("nsenter --net=%s ping -c1 -W1 %s", netns, opod.Status.PodIP)
By("ping underlay pod")
cmd = fmt.Sprintf("nsenter --net=%s ping -c1 -W1 %s", netns, upod.Status.PodIP)
stdout, _, err = f.ExecToPodThroughAPI(cmd, "cni-server", cniPod.Name, cniPod.Namespace, nil)
Expect(err).NotTo(HaveOccurred())
Expect(stdout).To(ContainSubstring(" 0% packet loss"))
Expand Down

0 comments on commit ea723d6

Please sign in to comment.