Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use direct server return in east-west overlay load balancing #2270

Merged
merged 2 commits into from
Oct 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,9 @@ func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver,
return nil
}

// XXX This should be made driver agnostic. See comment below.
const overlayDSROptionString = "dsr"

// NewNetwork creates a new network of the specified network type. The options
// are network specific and modeled in a generic way.
func (c *controller) NewNetwork(networkType, name string, id string, options ...NetworkOption) (Network, error) {
Expand All @@ -723,15 +726,16 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ...
defaultIpam := defaultIpamForNetworkType(networkType)
// Construct the network object
network := &network{
name: name,
networkType: networkType,
generic: map[string]interface{}{netlabel.GenericData: make(map[string]string)},
ipamType: defaultIpam,
id: id,
created: time.Now(),
ctrlr: c,
persist: true,
drvOnce: &sync.Once{},
name: name,
networkType: networkType,
generic: map[string]interface{}{netlabel.GenericData: make(map[string]string)},
ipamType: defaultIpam,
id: id,
created: time.Now(),
ctrlr: c,
persist: true,
drvOnce: &sync.Once{},
loadBalancerMode: loadBalancerModeDefault,
}

network.processOptions(options...)
Expand Down Expand Up @@ -829,6 +833,21 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ...
}
}()

// XXX If the driver type is "overlay" check the options for DSR
// being set. If so, set the network's load balancing mode to DSR.
// This should really be done in a network option, but due to
// time pressure to get this in without adding changes to moby,
// swarm and CLI, it is being implemented as a driver-specific
// option. Unfortunately, drivers can't influence the core
// "libnetwork.network" data type. Hence we need this hack code
// to implement in this manner.
if gval, ok := network.generic[netlabel.GenericData]; ok && network.networkType == "overlay" {
optMap := gval.(map[string]string)
if _, ok := optMap[overlayDSROptionString]; ok {
network.loadBalancerMode = loadBalancerModeDSR
}
}

addToStore:
// First store the endpoint count, then the network. To avoid to
// end up with a datastore containing a network and not an epCnt,
Expand Down
20 changes: 20 additions & 0 deletions ipvs/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,23 @@ const (
// addresses.
SourceHashing = "sh"
)

const (
// ConnFwdMask is a mask for the fwd methods
ConnFwdMask = 0x0007

// ConnFwdMasq denotes forwarding via masquerading/NAT
ConnFwdMasq = 0x0000

// ConnFwdLocalNode denotes forwarding to a local node
ConnFwdLocalNode = 0x0001

// ConnFwdTunnel denotes forwarding via a tunnel
ConnFwdTunnel = 0x0002

// ConnFwdDirectRoute denotes forwarding via direct routing
ConnFwdDirectRoute = 0x0003

// ConnFwdBypass denotes forwarding while bypassing the cache
ConnFwdBypass = 0x0004
)
81 changes: 47 additions & 34 deletions network.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,43 +199,50 @@ func (i *IpamInfo) UnmarshalJSON(data []byte) error {
}

type network struct {
ctrlr *controller
name string
networkType string
id string
created time.Time
scope string // network data scope
labels map[string]string
ipamType string
ipamOptions map[string]string
addrSpace string
ipamV4Config []*IpamConf
ipamV6Config []*IpamConf
ipamV4Info []*IpamInfo
ipamV6Info []*IpamInfo
enableIPv6 bool
postIPv6 bool
epCnt *endpointCnt
generic options.Generic
dbIndex uint64
dbExists bool
persist bool
stopWatchCh chan struct{}
drvOnce *sync.Once
resolverOnce sync.Once
resolver []Resolver
internal bool
attachable bool
inDelete bool
ingress bool
driverTables []networkDBTable
dynamic bool
configOnly bool
configFrom string
loadBalancerIP net.IP
ctrlr *controller
name string
networkType string
id string
created time.Time
scope string // network data scope
labels map[string]string
ipamType string
ipamOptions map[string]string
addrSpace string
ipamV4Config []*IpamConf
ipamV6Config []*IpamConf
ipamV4Info []*IpamInfo
ipamV6Info []*IpamInfo
enableIPv6 bool
postIPv6 bool
epCnt *endpointCnt
generic options.Generic
dbIndex uint64
dbExists bool
persist bool
stopWatchCh chan struct{}
drvOnce *sync.Once
resolverOnce sync.Once
resolver []Resolver
internal bool
attachable bool
inDelete bool
ingress bool
driverTables []networkDBTable
dynamic bool
configOnly bool
configFrom string
loadBalancerIP net.IP
loadBalancerMode string
sync.Mutex
}

const (
loadBalancerModeNAT = "NAT"
loadBalancerModeDSR = "DSR"
loadBalancerModeDefault = loadBalancerModeNAT
)

func (n *network) Name() string {
n.Lock()
defer n.Unlock()
Expand Down Expand Up @@ -475,6 +482,7 @@ func (n *network) CopyTo(o datastore.KVObject) error {
dstN.configOnly = n.configOnly
dstN.configFrom = n.configFrom
dstN.loadBalancerIP = n.loadBalancerIP
dstN.loadBalancerMode = n.loadBalancerMode

// copy labels
if dstN.labels == nil {
Expand Down Expand Up @@ -592,6 +600,7 @@ func (n *network) MarshalJSON() ([]byte, error) {
netMap["configOnly"] = n.configOnly
netMap["configFrom"] = n.configFrom
netMap["loadBalancerIP"] = n.loadBalancerIP
netMap["loadBalancerMode"] = n.loadBalancerMode
return json.Marshal(netMap)
}

Expand Down Expand Up @@ -705,6 +714,10 @@ func (n *network) UnmarshalJSON(b []byte) (err error) {
if v, ok := netMap["loadBalancerIP"]; ok {
n.loadBalancerIP = net.ParseIP(v.(string))
}
n.loadBalancerMode = loadBalancerModeDefault
if v, ok := netMap["loadBalancerMode"]; ok {
n.loadBalancerMode = v.(string)
}
// Reconcile old networks with the recently added `--ipv6` flag
if !n.enableIPv6 {
n.enableIPv6 = len(n.ipamV6Info) > 0
Expand Down
30 changes: 30 additions & 0 deletions osl/namespace_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,36 @@ func (n *networkNamespace) RemoveAliasIP(ifName string, ip *net.IPNet) error {
return n.nlHandle.AddrDel(iface, &netlink.Addr{IPNet: ip})
}

func (n *networkNamespace) DisableARPForVIP(srcName string) (Err error) {
dstName := ""
for _, i := range n.Interfaces() {
if i.SrcName() == srcName {
dstName = i.DstName()
break
}
}
if dstName == "" {
return fmt.Errorf("failed to find interface %s in sandbox", srcName)
}

err := n.InvokeFunc(func() {
path := filepath.Join("/proc/sys/net/ipv4/conf", dstName, "arp_ignore")
if err := ioutil.WriteFile(path, []byte{'1', '\n'}, 0644); err != nil {
Err = fmt.Errorf("Failed to set %s to 1: %v", path, err)
return
}
path = filepath.Join("/proc/sys/net/ipv4/conf", dstName, "arp_announce")
if err := ioutil.WriteFile(path, []byte{'2', '\n'}, 0644); err != nil {
Err = fmt.Errorf("Failed to set %s to 2: %v", path, err)
return
}
})
if err != nil {
return err
}
return
}

func (n *networkNamespace) InvokeFunc(f func()) error {
return nsInvoke(n.nsPath(), func(nsFD int) error { return nil }, func(callerFD int) error {
f()
Expand Down
4 changes: 4 additions & 0 deletions osl/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type Sandbox interface {
// RemoveAliasIP removes the passed IP address from the named interface
RemoveAliasIP(ifName string, ip *net.IPNet) error

// DisableARPForVIP disables ARP replies and requests for VIP addresses
// on a particular interface
DisableARPForVIP(ifName string) error

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better DisableARPForIfc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm.. Don't object to renaming, but the suggestion is less accurate. The function doesn't disable ARP for all addresses. It just disables it for addresses assigned to a different interface. We really only care that the VIP addresses have ARP suppressed for purposes of this API. But the operation does change behavior slightly. Say you had:

                            +---------------+
       eth0(10.1.0.2) ------+   container   +------ eth1(10.2.0.7)
                            +---------------+

The way containers are currently configured, if you ARPed 10.2.0.7 on eth0, you'd get a response back indicating that that address was on that interface. The ARP configurations that this function performs would prevent that. (the contaienr would ignore incoming ARPs for 10.2.0.7 on eth0) Ideally, we would be configuring this behavior ONLY for the VIP addresses (which get assigned to lo). Unfortunately, there isn't a nice way to do that without using arptables as far as I know. And requiring arptables and the like on docker installations is not feasible.

As far as ARPing across interfaces goes, my personal opinion is that this behavior should be the restriction that this function puts in place should be the default. But the Linux devs disagree. In any case I don't think that the more relaxed behavior is something an application developer should rely on. (see https://lwn.net/Articles/45373/) So the restriction should be ok. It would also be a property of the network, in any case. (i.e. only applies to endpoints that are on a "dsr"-option-enabled "overlay" network).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the point, it's difficult to express with a concise name. nvm for the moment then


// Add a static route to the sandbox.
AddStaticRoute(*types.StaticRoute) error

Expand Down
22 changes: 22 additions & 0 deletions sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,17 @@ func releaseOSSboxResources(osSbox osl.Sandbox, ep *endpoint) {

ep.Lock()
joinInfo := ep.joinInfo
vip := ep.virtualIP
lbModeIsDSR := ep.network.loadBalancerMode == loadBalancerModeDSR
ep.Unlock()

if len(vip) > 0 && lbModeIsDSR {
ipNet := &net.IPNet{IP: vip, Mask: net.CIDRMask(32, 32)}
if err := osSbox.RemoveAliasIP(osSbox.GetLoopbackIfaceName(), ipNet); err != nil {
logrus.WithError(err).Debugf("failed to remove virtual ip %v to loopback", ipNet)
}
}

if joinInfo == nil {
return
}
Expand Down Expand Up @@ -831,6 +840,7 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
ep.Lock()
joinInfo := ep.joinInfo
i := ep.iface
lbModeIsDSR := ep.network.loadBalancerMode == loadBalancerModeDSR
ep.Unlock()

if ep.needResolver() {
Expand All @@ -854,6 +864,18 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
if err := sb.osSbox.AddInterface(i.srcName, i.dstPrefix, ifaceOptions...); err != nil {
return fmt.Errorf("failed to add interface %s to sandbox: %v", i.srcName, err)
}

if len(ep.virtualIP) > 0 && lbModeIsDSR {
if sb.loadBalancerNID == "" {
if err := sb.osSbox.DisableARPForVIP(i.srcName); err != nil {
return fmt.Errorf("failed disable ARP for VIP: %v", err)
}
}
ipNet := &net.IPNet{IP: ep.virtualIP, Mask: net.CIDRMask(32, 32)}
if err := sb.osSbox.AddAliasIP(sb.osSbox.GetLoopbackIfaceName(), ipNet); err != nil {
return fmt.Errorf("failed to add virtual ip %v to loopback: %v", ipNet, err)
}
}
}

if joinInfo != nil {
Expand Down
19 changes: 13 additions & 6 deletions service_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (n *network) addLBBackend(ip net.IP, lb *loadBalancer) {
}

logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v in sbox %.7s (%.7s)", lb.vip, lb.fwMark, lb.service.ingressPorts, sb.ID(), sb.ContainerID())
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, false); err != nil {
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, false, n.loadBalancerMode); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
return
}
Expand All @@ -158,6 +158,9 @@ func (n *network) addLBBackend(ip net.IP, lb *loadBalancer) {
Address: ip,
Weight: 1,
}
if n.loadBalancerMode == loadBalancerModeDSR {
d.ConnectionFlags = ipvs.ConnFwdDirectRoute
}

// Remove the sched name before using the service to add
// destination.
Expand Down Expand Up @@ -203,6 +206,9 @@ func (n *network) rmLBBackend(ip net.IP, lb *loadBalancer, rmService bool, fullR
Address: ip,
Weight: 1,
}
if n.loadBalancerMode == loadBalancerModeDSR {
d.ConnectionFlags = ipvs.ConnFwdDirectRoute
}

if fullRemove {
if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT {
Expand Down Expand Up @@ -231,7 +237,7 @@ func (n *network) rmLBBackend(ip net.IP, lb *loadBalancer, rmService bool, fullR
}
}

if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, true); err != nil {
if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, true, n.loadBalancerMode); err != nil {
logrus.Errorf("Failed to delete firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
}

Expand Down Expand Up @@ -566,7 +572,7 @@ func readPortsFromFile(fileName string) ([]*PortConfig, error) {

// Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool, lbMode string) error {
var ingressPortsFile string

if len(ingressPorts) != 0 {
Expand All @@ -586,7 +592,7 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*Port

cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()),
Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String(), lbMode),
Stdout: os.Stdout,
Stderr: os.Stderr,
}
Expand All @@ -603,7 +609,7 @@ func fwMarker() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()

if len(os.Args) < 7 {
if len(os.Args) < 8 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}
Expand Down Expand Up @@ -645,7 +651,8 @@ func fwMarker() {
os.Exit(5)
}

if addDelOpt == "-A" {
lbMode := os.Args[7]
if addDelOpt == "-A" && lbMode == loadBalancerModeNAT {
eIP, subnet, err := net.ParseCIDR(os.Args[6])
if err != nil {
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)
Expand Down