Skip to content

Commit

Permalink
[unnumbered-ptp] Allow nodeports to work when routed to local pods (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
lbernail authored and paulnivin committed Dec 17, 2018
1 parent bb2757d commit 22a14f6
Showing 1 changed file with 77 additions and 3 deletions.
80 changes: 77 additions & 3 deletions plugin/unnumbered-ptp/unnumbered-ptp.go
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"runtime"
"sort"
"strconv"
"time"

"github.com/containernetworking/cni/pkg/skel"
Expand All @@ -35,14 +36,20 @@ import (
"github.com/containernetworking/plugins/pkg/ip"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/containernetworking/plugins/pkg/utils"
"github.com/containernetworking/plugins/pkg/utils/sysctl"
"github.com/coreos/go-iptables/iptables"
"github.com/j-keck/arping"
"github.com/vishvananda/netlink"
)

// constants for full jitter backoff in milliseconds
const maxSleep = 10000 // 10.00s
const baseSleep = 20 // 0.02s
// constants for full jitter backoff in milliseconds, and for nodeport marks
const (
maxSleep = 10000 // 10.00s
baseSleep = 20 // 0.02
RPFilterTemplate = "net.ipv4.conf.%s.rp_filter"
podRulePriority = 1024
nodePortRulePriority = 512
)

func init() {
// this ensures that main runs only on main thread (thread group leader).
Expand Down Expand Up @@ -72,6 +79,8 @@ type PluginConf struct {
ContainerInterface string `json:"containerInterface"`
MTU int `json:"mtu"`
TableStart int `json:"routeTableStart"`
NodePortMark int `json:"nodePortMark"`
NodePorts string `json:"nodePorts"`
}

// parseConfig parses the supplied configuration (and prevResult) from stdin.
Expand Down Expand Up @@ -108,6 +117,14 @@ func parseConfig(stdin []byte) (*PluginConf, error) {
return nil, fmt.Errorf("containerInterface must be specified")
}

if conf.NodePorts == "" {
conf.NodePorts = "30000:32767"
}

if conf.NodePortMark == 0 {
conf.NodePortMark = 0x2000
}

// start using tables by default at 256
if conf.TableStart == 0 {
conf.TableStart = 256
Expand Down Expand Up @@ -215,6 +232,8 @@ func addPolicyRules(veth *net.Interface, ipc *current.IPConfig, routes []*types.
rule := netlink.NewRule()
rule.IifName = veth.Name
rule.Table = table
rule.Priority = podRulePriority

err := netlink.RuleAdd(rule)
if err != nil {
return fmt.Errorf("failed to add policy rule %v: %v", rule, err)
Expand All @@ -223,6 +242,57 @@ func addPolicyRules(veth *net.Interface, ipc *current.IPConfig, routes []*types.
return nil
}

func setupNodePortRule(ifName string, nodePorts string, nodePortMark int) error {
ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
if err != nil {
return fmt.Errorf("failed to locate iptables: %v", err)
}

// Create iptables rules to ensure that nodeport traffic is marked
if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", ifName, "-p", "tcp", "--dport", nodePorts, "-j", "CONNMARK", "--set-mark", strconv.Itoa(nodePortMark), "-m", "comment", "--comment", "NodePort Mark"); err != nil {
return err
}
if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", ifName, "-p", "udp", "--dport", nodePorts, "-j", "CONNMARK", "--set-mark", strconv.Itoa(nodePortMark), "-m", "comment", "--comment", "NodePort Mark"); err != nil {
return err
}
if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", "veth+", "-j", "CONNMARK", "--restore-mark", "-m", "comment", "--comment", "NodePort Mark"); err != nil {
return err
}

// Use loose RP filter on host interface (RP filter does not take mark-based rules into account)
_, err = sysctl.Sysctl(fmt.Sprintf(RPFilterTemplate, ifName), "2")
if err != nil {
return fmt.Errorf("failed to set RP filter to loose for interface %q: %v", ifName, err)
}

// add policy route for traffic from marked as nodeport
rule := netlink.NewRule()
rule.Mark = nodePortMark
rule.Table = 254 // main table
rule.Priority = nodePortRulePriority

exists := false
rules, err := netlink.RuleList(netlink.FAMILY_V4)
if err != nil {
return fmt.Errorf("Unable to retrive IP rules %v", err)
}

for _, r := range rules {
if r.Table == rule.Table && r.Mark == rule.Mark && r.Priority == rule.Priority {
exists = true
break
}
}
if !exists {
err := netlink.RuleAdd(rule)
if err != nil {
return fmt.Errorf("failed to add policy rule %v: %v", rule, err)
}
}

return nil
}

func setupContainerVeth(netns ns.NetNS, ifName string, mtu int, hostAddrs []netlink.Addr, masq, containerIPV4, containerIPV6 bool, k8sIfName string, pr *current.Result) (*current.Interface, *current.Interface, error) {
hostInterface := &current.Interface{}
containerInterface := &current.Interface{}
Expand Down Expand Up @@ -450,6 +520,10 @@ func cmdAdd(args *skel.CmdArgs) error {
}
}

if err = setupNodePortRule(conf.HostInterface, conf.NodePorts, conf.NodePortMark); err != nil {
return err
}

// Pass through the result for the next plugin
return types.PrintResult(conf.PrevResult, conf.CNIVersion)
}
Expand Down

0 comments on commit 22a14f6

Please sign in to comment.