Skip to content

Commit

Permalink
add designative nat ip process for centralized subnet
Browse files Browse the repository at this point in the history
  • Loading branch information
hongzhen-ma committed Jul 19, 2021
1 parent a118b4d commit 2427a4b
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/subnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ For a centralized gateway, outgoing traffic from Pods within the OVN network to
- `gatewayNode`: when `gatewayType` is `centralized` used this field to specify which node act as the namespace gateway. This field can be a comma separated string, like `node1,node2`.
Before kube-ovn v1.6.3, kube-ovn will automatically apply an active-backup failover strategy.
Since kube-ovn v1.7.0, kube-ovn support ecmp routes, and outgoing traffic can go through multiple gateway specified.
Since kube-ovn v1.8.0, kube-ovn support using designative egress ip on node, the format of gatewayNode can be like 'kube-ovn-worker:172.18.0.2, kube-ovn-control-plane:172.18.0.3'.
- `natOutgoing`: `true` or `false`, whether pod ip need to be masqueraded when go through gateway. When `false`, pod ip will be exposed to external network directly, default `false`.

## Advance Options
Expand Down
20 changes: 16 additions & 4 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,6 @@ func (c *Controller) reconcileGateway(subnet *kubeovnv1.Subnet) error {
}
return c.deleteStaticRoute(subnet.Spec.CIDRBlock, c.config.ClusterRouter, subnet)
} else {
klog.Infof("start to init centralized gateway for subnet %s", subnet.Name)
if subnet.Spec.GatewayNode == "" {
klog.Errorf("subnet %s Spec.GatewayNode field must be specified for centralized gateway type", subnet.Name)
subnet.Status.NotReady("NoReadyGateway", "")
Expand All @@ -953,7 +952,13 @@ func (c *Controller) reconcileGateway(subnet *kubeovnv1.Subnet) error {

nodeIPs := make([]string, len(strings.Split(subnet.Spec.GatewayNode, ",")))
for _, gw := range strings.Split(subnet.Spec.GatewayNode, ",") {
gw = strings.TrimSpace(gw)
// the format of gatewayNodeStr can be like 'kube-ovn-worker:172.18.0.2, kube-ovn-control-plane:172.18.0.3', which consists of node name and designative egress ip
if strings.Contains(gw, ":") {
gw = strings.TrimSpace(strings.Split(gw, ":")[0])
} else {
gw = strings.TrimSpace(gw)
}

node, err := c.nodesLister.Get(gw)
if err == nil && nodeReady(node) {
nodeTunlIP := strings.TrimSpace(node.Annotations[util.IpAddressAnnotation])
Expand All @@ -968,7 +973,7 @@ func (c *Controller) reconcileGateway(subnet *kubeovnv1.Subnet) error {
if err != nil {
klog.Errorf("filter ecmp static route for subnet %v, error %v", subnet.Name, err)
}
klog.Infof("subnet %s uses centralized gw %v", subnet.Name, nodeIPs)
klog.Infof("subnet %s adds centralized gw %v", subnet.Name, nodeIPs)

for _, nextHop := range nodeIPs {
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, subnet.Spec.CIDRBlock, nextHop, c.config.ClusterRouter, util.EcmpRouteType); err != nil {
Expand Down Expand Up @@ -1242,7 +1247,14 @@ func (c *Controller) filterRepeatEcmpRoutes(nodeIps []string, cidrBlock string)
func (c *Controller) checkGwNodeExists(gatewayNode string) bool {
found := false
for _, gwName := range strings.Split(gatewayNode, ",") {
gwNode, err := c.nodesLister.Get(strings.TrimSpace(gwName))
// the format of gatewayNode can be like 'kube-ovn-worker:172.18.0.2, kube-ovn-control-plane:172.18.0.3', which consists of node name and designative egress ip
if strings.Contains(gwName, ":") {
gwName = strings.TrimSpace(strings.Split(gwName, ":")[0])
} else {
gwName = strings.TrimSpace(gwName)
}

gwNode, err := c.nodesLister.Get(gwName)
if err != nil {
if k8serrors.IsNotFound(err) {
klog.Errorf("gw node %s does not exist, %v", gwName, err)
Expand Down
135 changes: 135 additions & 0 deletions pkg/daemon/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,12 @@ func (c *Controller) setIptables() error {
}

hostIP := util.GetNodeInternalIP(*node)
subnetNatips, err := c.getEgressNatIpByNode(c.config.NodeName)
if err != nil {
klog.Errorf("failed to get centralized subnets nat ips on node %s, %v", c.config.NodeName, err)
return err
}
klog.V(3).Infof("centralized subnets nat ips %v", subnetNatips)

var (
v4Rules = []util.IPTableRule{
Expand Down Expand Up @@ -434,12 +440,36 @@ func (c *Controller) setIptables() error {
if c.iptable[protocol] == nil {
continue
}
// delete unused iptable rule when nat gw with designative ip has been changed in centralize subnet
if err = c.deleteUnusedIptablesRule(protocol, "nat", "POSTROUTING", subnetNatips); err != nil {
klog.Errorf("failed to delete iptable rule on node %s, maybe can delete manually, %v", c.config.NodeName, err)
return err
}

var matchset string
var iptableRules []util.IPTableRule
if protocol == kubeovnv1.ProtocolIPv4 {
iptableRules = v4Rules
matchset = "ovn40subnets"
} else {
iptableRules = v6Rules
matchset = "ovn60subnets"
}
// add iptable rule for nat gw with designative ip in centralize subnet
for cidr, natip := range subnetNatips {
if util.CheckProtocol(cidr) != protocol {
continue
}

ruleval := fmt.Sprintf("-s %v -m set ! --match-set %s dst -j SNAT --to-source %v", cidr, matchset, natip)
rule := util.IPTableRule{
Table: "nat",
Chain: "POSTROUTING",
Rule: strings.Split(ruleval, " "),
}
iptableRules = append(iptableRules, rule)
}

iptableRules[0], iptableRules[1], iptableRules[3], iptableRules[4] =
iptableRules[4], iptableRules[3], iptableRules[1], iptableRules[0]
for _, iptRule := range iptableRules {
Expand All @@ -460,6 +490,7 @@ func (c *Controller) setIptables() error {
return err
}
}
klog.V(3).Infof("iptables rules %v, exists %v", strings.Join(iptRule.Rule, " "), exists)
}
}
return nil
Expand Down Expand Up @@ -659,6 +690,18 @@ func (c *Controller) getSubnetsNeedNAT(protocol string) ([]string, error) {
(subnet.Spec.Protocol == kubeovnv1.ProtocolDual || subnet.Spec.Protocol == protocol) &&
subnet.Spec.NatOutgoing &&
subnet.Spec.Vlan == "" {
// centralized subnet with gatewayNode assigned designative ip processed seperately
found := false
for _, gw := range strings.Split(subnet.Spec.GatewayNode, ",") {
if strings.Contains(gw, ":") && util.GatewayContains(gw, c.config.NodeName) {
found = true
break
}
}
if found {
continue
}

cidrBlock := getCidrByProtocol(subnet.Spec.CIDRBlock, protocol)
subnetsNeedNat = append(subnetsNeedNat, cidrBlock)
}
Expand Down Expand Up @@ -811,3 +854,95 @@ func getCidrByProtocol(cidr, protocol string) string {
}
return cidrStr
}

func (c *Controller) getEgressNatIpByNode(nodeName string) (map[string]string, error) {
var subnetsNatIp = make(map[string]string)
subnetList, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnets %v", err)
return subnetsNatIp, err
}

for _, subnet := range subnetList {
if subnet.Spec.UnderlayGateway || subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType || subnet.Spec.GatewayNode == "" || !util.GatewayContains(subnet.Spec.GatewayNode, nodeName) {
continue
}

// only check format like 'kube-ovn-worker:172.18.0.2, kube-ovn-control-plane:172.18.0.3'
for _, cidr := range strings.Split(subnet.Spec.CIDRBlock, ",") {
for _, gw := range strings.Split(subnet.Spec.GatewayNode, ",") {
if strings.Contains(gw, ":") && util.GatewayContains(gw, nodeName) && util.CheckProtocol(cidr) == util.CheckProtocol(strings.Split(gw, ":")[1]) {
subnetsNatIp[cidr] = strings.TrimSpace(strings.Split(gw, ":")[1])
break
}
}
}
}
return subnetsNatIp, nil
}

func (c *Controller) deleteUnusedIptablesRule(protocol, table, chain string, subnetsNatIps map[string]string) error {
rules, err := c.iptable[protocol].List(table, chain)
if err != nil {
klog.Errorf("failed to list iptable rules in table %v chain %v, %+v", table, chain, err)
return err
}

for _, rule := range rules {
if !strings.Contains(rule, "--to-source") {
continue
}
// "-A POSTROUTING -s 100.168.10.0/24 -m set ! --match-set ovn40subnets dst -j SNAT --to-source 172.17.0.3"
rule = strings.TrimPrefix(rule, "-A POSTROUTING ")
ruleval := strings.Split(rule, " ")
dstNatIp := ruleval[len(ruleval)-1]

found := false
for cidr, natip := range subnetsNatIps {
if util.CheckProtocol(cidr) != protocol {
continue
}

if dstNatIp == natip {
found = true
break
}
}

if !found {
num, err := getIptableRuleNum(table, chain, rule, dstNatIp)
if err != nil {
klog.Errorf("failed to get iptable rule num when delete rule %v, please check manually", rule)
continue
}

klog.Infof("iptable rule %v %v %s, num %v should be deleted because nat gw has been changed", table, chain, rule, num)
if err := c.iptable[protocol].Delete(table, chain, num); err != nil {
klog.Errorf("delete iptable rule %s failed, %+v", rule, err)
return err
}
}
}
return nil
}

func getIptableRuleNum(table, chain, rule, dstNatIp string) (string, error) {
var num string
var err error

cmdstr := fmt.Sprintf("iptables -t %v -L %v --line-numbers", table, chain)
cmd := exec.Command("sh", "-c", cmdstr)
output, err := cmd.CombinedOutput()
if err != nil {
return num, fmt.Errorf("Failed to get iptable rule num: %v", err)
}

for _, line := range strings.Split(string(output), "\n") {
if strings.Contains(line, dstNatIp) {
num = strings.Split(line, " ")[0]
klog.Infof("get iptable rule %v num %v", rule, num)
break
}
}
return num, nil
}
7 changes: 6 additions & 1 deletion pkg/util/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,13 @@ func CountIpNums(excludeIPs []string) float64 {
}

func GatewayContains(gatewayNodeStr, gateway string) bool {
// the format of gatewayNodeStr can be like 'kube-ovn-worker:172.18.0.2, kube-ovn-control-plane:172.18.0.3', which consists of node name and designative egress ip
for _, gw := range strings.Split(gatewayNodeStr, ",") {
gw = strings.TrimSpace(gw)
if strings.Contains(gw, ":") {
gw = strings.TrimSpace(strings.Split(gw, ":")[0])
} else {
gw = strings.TrimSpace(gw)
}
if gw == gateway {
return true
}
Expand Down

0 comments on commit 2427a4b

Please sign in to comment.