Skip to content

Commit

Permalink
fix SNAT/PR on Pod startup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzujian committed Mar 7, 2022
1 parent 8124772 commit a3618bc
Showing 1 changed file with 83 additions and 96 deletions.
179 changes: 83 additions & 96 deletions pkg/daemon/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (c *Controller) setPolicyRouting() error {
}

func (c *Controller) addEgressConfig(subnet *kubeovnv1.Subnet, ip string) error {
if subnet.Spec.Vlan != "" ||
if (subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) ||
subnet.Spec.GatewayType != kubeovnv1.GWDistributedType ||
subnet.Spec.Vpc != util.DefaultVpc {
return nil
Expand Down Expand Up @@ -212,7 +212,7 @@ func (c *Controller) removeEgressConfig(subnet, ip string) error {
return err
}

if podSubnet.Spec.Vlan != "" ||
if (podSubnet.Spec.Vlan != "" && !podSubnet.Spec.LogicalGateway) ||
podSubnet.Spec.GatewayType != kubeovnv1.GWDistributedType ||
podSubnet.Spec.Vpc != util.DefaultVpc {
return nil
Expand Down Expand Up @@ -674,7 +674,7 @@ func (c *Controller) setExGateway() error {

func (c *Controller) getLocalPodIPsNeedNAT(protocol string) ([]string, error) {
var localPodIPs []string
hostname := os.Getenv(util.HostnameEnv)
nodeName := os.Getenv(util.HostnameEnv)
allPods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("list pods failed, %+v", err)
Expand All @@ -683,6 +683,7 @@ func (c *Controller) getLocalPodIPsNeedNAT(protocol string) ([]string, error) {
for _, pod := range allPods {
if pod.Spec.HostNetwork ||
pod.DeletionTimestamp != nil ||
pod.Spec.NodeName != nodeName ||
pod.Annotations[util.LogicalSwitchAnnotation] == "" ||
pod.Annotations[util.IpAddressAnnotation] == "" {
continue
Expand All @@ -693,36 +694,31 @@ func (c *Controller) getLocalPodIPsNeedNAT(protocol string) ([]string, error) {
continue
}

if subnet.Spec.NatOutgoing &&
subnet.Spec.Vpc == util.DefaultVpc &&
subnet.Spec.GatewayType == kubeovnv1.GWDistributedType &&
pod.Spec.NodeName == hostname {
if pod.Status.Phase == v1.PodPending {
var containerCreating bool
for _, s := range pod.Status.ContainerStatuses {
if s.State.Waiting != nil && s.State.Waiting.Reason == "ContainerCreating" {
containerCreating = true
break
}
}
if containerCreating {
ipv4, ipv6 := util.SplitStringIP(pod.Annotations[util.IpAddressAnnotation])
if ipv4 != "" && protocol == kubeovnv1.ProtocolIPv4 {
localPodIPs = append(localPodIPs, ipv4)
}
if ipv6 != "" && protocol == kubeovnv1.ProtocolIPv6 {
localPodIPs = append(localPodIPs, ipv6)
}
}
} else if len(pod.Status.PodIPs) != 0 {
if len(pod.Status.PodIPs) == 2 && protocol == kubeovnv1.ProtocolIPv6 {
localPodIPs = append(localPodIPs, pod.Status.PodIPs[1].IP)
} else if util.CheckProtocol(pod.Status.PodIP) == protocol {
localPodIPs = append(localPodIPs, pod.Status.PodIP)
}
if !subnet.Spec.NatOutgoing ||
subnet.Spec.Vpc != util.DefaultVpc ||
subnet.Spec.GatewayType != kubeovnv1.GWDistributedType {
continue
}
if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway {
continue
}

if len(pod.Status.PodIPs) != 0 {
if len(pod.Status.PodIPs) == 2 && protocol == kubeovnv1.ProtocolIPv6 {
localPodIPs = append(localPodIPs, pod.Status.PodIPs[1].IP)
} else if util.CheckProtocol(pod.Status.PodIP) == protocol {
localPodIPs = append(localPodIPs, pod.Status.PodIP)
}
} else {
ipv4, ipv6 := util.SplitStringIP(pod.Annotations[util.IpAddressAnnotation])
if ipv4 != "" && protocol == kubeovnv1.ProtocolIPv4 {
localPodIPs = append(localPodIPs, ipv4)
}
if ipv6 != "" && protocol == kubeovnv1.ProtocolIPv6 {
localPodIPs = append(localPodIPs, ipv6)
}
}
attachIps, err := c.getAttachmentLocalPodIPsNeedNAT(pod, hostname, protocol)
attachIps, err := c.getAttachmentLocalPodIPsNeedNAT(pod, nodeName, protocol)
if len(attachIps) != 0 && err == nil {
localPodIPs = append(localPodIPs, attachIps...)
}
Expand All @@ -739,11 +735,12 @@ func (c *Controller) getLocalPodIPsNeedPR(protocol string) (map[policyRouteMeta]
return nil, err
}

hostname := os.Getenv(util.HostnameEnv)
nodeName := os.Getenv(util.HostnameEnv)
localPodIPs := make(map[policyRouteMeta][]string)
for _, pod := range allPods {
if pod.Spec.HostNetwork ||
pod.DeletionTimestamp != nil ||
pod.Spec.NodeName != nodeName ||
pod.Annotations[util.LogicalSwitchAnnotation] == "" ||
pod.Annotations[util.IpAddressAnnotation] == "" {
continue
Expand All @@ -755,52 +752,49 @@ func (c *Controller) getLocalPodIPsNeedPR(protocol string) (map[policyRouteMeta]
continue
}

if subnet.Spec.ExternalEgressGateway != "" &&
subnet.Spec.Vpc == util.DefaultVpc &&
subnet.Spec.GatewayType == kubeovnv1.GWDistributedType &&
pod.Spec.NodeName == hostname {
ips := make([]string, 0, 2)
if pod.Status.Phase == v1.PodPending {
var containerCreating bool
for _, s := range pod.Status.ContainerStatuses {
if s.State.Waiting != nil && s.State.Waiting.Reason == "ContainerCreating" {
containerCreating = true
break
}
}
if containerCreating {
ipv4, ipv6 := util.SplitStringIP(pod.Annotations[util.IpAddressAnnotation])
if ipv4 != "" && protocol == kubeovnv1.ProtocolIPv4 {
ips = append(ips, ipv4)
}
if ipv6 != "" && protocol == kubeovnv1.ProtocolIPv6 {
ips = append(ips, ipv6)
}
}
} else if len(pod.Status.PodIPs) != 0 {
for _, ip := range pod.Status.PodIPs {
ips = append(ips, ip.IP)
}
if !subnet.Spec.NatOutgoing ||
subnet.Spec.Vpc != util.DefaultVpc ||
subnet.Spec.GatewayType != kubeovnv1.GWDistributedType {
continue
}
if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway {
continue
}

ips := make([]string, 0, 2)
if len(pod.Status.PodIPs) != 0 {
if len(pod.Status.PodIPs) == 2 && protocol == kubeovnv1.ProtocolIPv6 {
ips = append(ips, pod.Status.PodIPs[1].IP)
} else if util.CheckProtocol(pod.Status.PodIP) == protocol {
ips = append(ips, pod.Status.PodIP)
}
} else {
ipv4, ipv6 := util.SplitStringIP(pod.Annotations[util.IpAddressAnnotation])
if ipv4 != "" && protocol == kubeovnv1.ProtocolIPv4 {
ips = append(ips, ipv4)
}
if ipv6 != "" && protocol == kubeovnv1.ProtocolIPv6 {
ips = append(ips, ipv6)
}
}

if len(ips) != 0 {
meta := policyRouteMeta{
priority: subnet.Spec.PolicyRoutingPriority,
tableID: subnet.Spec.PolicyRoutingTableID,
}
if len(ips) != 0 {
meta := policyRouteMeta{
priority: subnet.Spec.PolicyRoutingPriority,
tableID: subnet.Spec.PolicyRoutingTableID,
}

egw := strings.Split(subnet.Spec.ExternalEgressGateway, ",")
if util.CheckProtocol(egw[0]) == protocol {
meta.gateway = egw[0]
if util.CheckProtocol(ips[0]) == protocol {
localPodIPs[meta] = append(localPodIPs[meta], ips[0])
} else {
localPodIPs[meta] = append(localPodIPs[meta], ips[1])
}
} else if len(egw) == 2 && len(ips) == 2 {
meta.gateway = egw[1]
egw := strings.Split(subnet.Spec.ExternalEgressGateway, ",")
if util.CheckProtocol(egw[0]) == protocol {
meta.gateway = egw[0]
if util.CheckProtocol(ips[0]) == protocol {
localPodIPs[meta] = append(localPodIPs[meta], ips[0])
} else {
localPodIPs[meta] = append(localPodIPs[meta], ips[1])
}
} else if len(egw) == 2 && len(ips) == 2 {
meta.gateway = egw[1]
localPodIPs[meta] = append(localPodIPs[meta], ips[1])
}
}
}
Expand All @@ -817,24 +811,13 @@ func (c *Controller) getSubnetsNeedNAT(protocol string) ([]string, error) {
}

for _, subnet := range subnets {
if subnet.Spec.Vpc == util.DefaultVpc &&
if subnet.DeletionTimestamp == nil &&
subnet.Spec.NatOutgoing &&
(subnet.Spec.Vlan == "" || subnet.Spec.LogicalGateway) &&
subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType &&
util.GatewayContains(subnet.Spec.GatewayNode, c.config.NodeName) &&
(subnet.Spec.Protocol == kubeovnv1.ProtocolDual || subnet.Spec.Protocol == protocol) &&
subnet.Spec.NatOutgoing &&
subnet.Spec.Vlan == "" {
// centralized subnet with gatewayNode assigned designative ip processed seperately
found := false
for _, gw := range strings.Split(subnet.Spec.GatewayNode, ",") {
if strings.Contains(gw, ":") && util.GatewayContains(gw, c.config.NodeName) {
found = true
break
}
}
if found {
continue
}

subnet.Spec.Vpc == util.DefaultVpc &&
(subnet.Spec.Protocol == kubeovnv1.ProtocolDual || subnet.Spec.Protocol == protocol) {
cidrBlock := getCidrByProtocol(subnet.Spec.CIDRBlock, protocol)
subnetsNeedNat = append(subnetsNeedNat, cidrBlock)
}
Expand All @@ -852,12 +835,12 @@ func (c *Controller) getSubnetsNeedPR(protocol string) (map[policyRouteMeta]stri

for _, subnet := range subnets {
if subnet.DeletionTimestamp == nil &&
subnet.Spec.Vpc == util.DefaultVpc &&
subnet.Spec.ExternalEgressGateway != "" &&
(subnet.Spec.Vlan == "" || subnet.Spec.LogicalGateway) &&
subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType &&
util.GatewayContains(subnet.Spec.GatewayNode, c.config.NodeName) &&
(subnet.Spec.Protocol == kubeovnv1.ProtocolDual || subnet.Spec.Protocol == protocol) &&
subnet.Spec.ExternalEgressGateway != "" &&
subnet.Spec.Vlan == "" {
subnet.Spec.Vpc == util.DefaultVpc &&
(subnet.Spec.Protocol == kubeovnv1.ProtocolDual || subnet.Spec.Protocol == protocol) {
meta := policyRouteMeta{
priority: subnet.Spec.PolicyRoutingPriority,
tableID: subnet.Spec.PolicyRoutingTableID,
Expand Down Expand Up @@ -1002,7 +985,11 @@ func (c *Controller) getEgressNatIpByNode(nodeName string) (map[string]string, e
}

for _, subnet := range subnetList {
if subnet.Spec.Vlan != "" || subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType || !subnet.Spec.NatOutgoing || subnet.Spec.GatewayNode == "" || !util.GatewayContains(subnet.Spec.GatewayNode, nodeName) {
if !subnet.Spec.NatOutgoing ||
(subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway) ||
subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType ||
!util.GatewayContains(subnet.Spec.GatewayNode, nodeName) ||
subnet.Spec.Vpc != util.DefaultVpc {
continue
}

Expand Down Expand Up @@ -1085,7 +1072,7 @@ func getIptablesRuleNum(table, chain, rule, dstNatIp string) (string, error) {
return num, nil
}

func (c *Controller) getAttachmentLocalPodIPsNeedNAT(pod *v1.Pod, hostname, protocol string) ([]string, error) {
func (c *Controller) getAttachmentLocalPodIPsNeedNAT(pod *v1.Pod, nodeName, protocol string) ([]string, error) {
var attachPodIPs []string

attachNets, err := util.ParsePodNetworkAnnotation(pod.Annotations[util.AttachmentNetworkAnnotation], pod.Namespace)
Expand All @@ -1105,7 +1092,7 @@ func (c *Controller) getAttachmentLocalPodIPsNeedNAT(pod *v1.Pod, hostname, prot
if subnet.Spec.NatOutgoing &&
subnet.Spec.Vpc == util.DefaultVpc &&
subnet.Spec.GatewayType == kubeovnv1.GWDistributedType &&
pod.Spec.NodeName == hostname {
pod.Spec.NodeName == nodeName {
ipv4, ipv6 := util.SplitStringIP(pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, provider)])
if ipv4 != "" && protocol == kubeovnv1.ProtocolIPv4 {
attachPodIPs = append(attachPodIPs, ipv4)
Expand Down

0 comments on commit a3618bc

Please sign in to comment.