Skip to content

Commit

Permalink
neigh: Support multi device neighbor discovery
Browse files Browse the repository at this point in the history
[ upstream commit 6d208e7 ]

Currently, the neighbor discovery only supports a single device specified
as the directRoutingDevice.  That causes packet drops due to FIB lookup
failed error in a multi-device environment.  The BPF Nodeport
implementation no longer relies on the directRoutingDevice since cilium#18585.

The following example illustrates the issue which occurs in the
multi-device environment. Each node has two devices connected to a
different L3 network (10.69.0.64/26 and 10.69.0.128/26), and global scope
addresses each(10.69.0.1/26 and 10.69.0.2/26). A nexthop from node1 to
node2 is either 10.69.0.66 dev eno1 or 10.69.0.130 dev eno2.

+--------------+     +--------------+
|    node1     |     |    node2     |
| 10.69.0.1/26 |     | 10.69.0.2/26 |
|          eno1+-----+eno1          |
|          |   |     |   |          |
| 10.69.0.65/26|     |10.69.0.66/26 |
|              |     |              |
|          eno2+-----+eno2          |
|          |   |     | |            |
|10.69.0.129/26|     |10.69.0.130/26|
+--------------+     +--------------+

(On node1)
$ ip route show
10.69.0.2
        nexthop via 10.69.0.66 dev eno1 weight 1
        nexthop via 10.69.0.130 dev eno2 weight 1

Assuming the directRoutingDevice is eno1, if BPF Nodeport implementation
selects 10.69.0.130 dev eno2 as the nexthop when it redirects a packet to
the selected backend on an intermediate node, it will fail due to FIB lookup
failed because there's no neighbor entry for the device eno2 that is not the
directRoutingDevice. This PR fixes the issue by populating L2 addresses
with all target devices.

Fixes: cilium#19908

Signed-off-by: Yusuke Suzuki <yusuke-suzuki@cybozu.co.jp>
Signed-off-by: Paul Chaignon <paul@cilium.io>
  • Loading branch information
ysksuzuki authored and pchaigno committed Jun 28, 2022
1 parent fb22627 commit 628e9dd
Show file tree
Hide file tree
Showing 3 changed files with 1,287 additions and 493 deletions.
186 changes: 127 additions & 59 deletions pkg/datapath/linux/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"reflect"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -60,9 +60,9 @@ type linuxNodeHandler struct {
nodes map[nodeTypes.Identity]*nodeTypes.Node
enableNeighDiscovery bool
neighLock lock.Mutex // protects neigh* fields below
neighDiscoveryLink netlink.Link
neighNextHopByNode4 map[nodeTypes.Identity]string // val = string(net.IP)
neighNextHopByNode6 map[nodeTypes.Identity]string // val = string(net.IP)
neighDiscoveryLinks []netlink.Link
neighNextHopByNode4 map[nodeTypes.Identity]map[string]string // val = (key=link, value=string(net.IP))
neighNextHopByNode6 map[nodeTypes.Identity]map[string]string // val = (key=link, value=string(net.IP))
// All three mappings below hold both IPv4 and IPv6 entries.
neighNextHopRefCount counter.StringCounter
neighByNextHop map[string]*netlink.Neigh // key = string(net.IP)
Expand All @@ -79,8 +79,8 @@ func NewNodeHandler(datapathConfig DatapathConfiguration, nodeAddressing types.N
nodeAddressing: nodeAddressing,
datapathConfig: datapathConfig,
nodes: map[nodeTypes.Identity]*nodeTypes.Node{},
neighNextHopByNode4: map[nodeTypes.Identity]string{},
neighNextHopByNode6: map[nodeTypes.Identity]string{},
neighNextHopByNode4: map[nodeTypes.Identity]map[string]string{},
neighNextHopByNode6: map[nodeTypes.Identity]map[string]string{},
neighNextHopRefCount: counter.StringCounter{},
neighByNextHop: map[string]*netlink.Neigh{},
neighLastPingByNextHop: map[string]time.Time{},
Expand Down Expand Up @@ -656,9 +656,9 @@ func (n *linuxNodeHandler) encryptNode(newNode *nodeTypes.Node) {

}

func getNextHopIP(nodeIP net.IP) (nextHopIP net.IP, err error) {
func getNextHopIP(nodeIP net.IP, link netlink.Link) (nextHopIP net.IP, err error) {
// Figure out whether nodeIP is directly reachable (i.e. in the same L2)
routes, err := netlink.RouteGet(nodeIP)
routes, err := netlink.RouteGetWithOptions(nodeIP, &netlink.RouteGetOptions{Oif: link.Attrs().Name})
if err != nil {
return nil, fmt.Errorf("failed to retrieve route for remote node IP: %w", err)
}
Expand Down Expand Up @@ -775,7 +775,7 @@ func (n *linuxNodeHandler) insertNeighbor4(ctx context.Context, newNode *nodeTyp
logfields.IPAddr: newNodeIP,
})

nextHopIPv4, err := getNextHopIP(nextHopIPv4)
nextHopIPv4, err := getNextHopIP(nextHopIPv4, link)
if err != nil {
scopedLog.WithError(err).Info("Unable to determine next hop address")
return
Expand All @@ -786,8 +786,14 @@ func (n *linuxNodeHandler) insertNeighbor4(ctx context.Context, newNode *nodeTyp
n.neighLock.Lock()
defer n.neighLock.Unlock()

nextHopByLink, found := n.neighNextHopByNode4[newNode.Identity()]
if !found {
nextHopByLink = make(map[string]string)
n.neighNextHopByNode4[newNode.Identity()] = nextHopByLink
}

nextHopIsNew := false
if existingNextHopStr, found := n.neighNextHopByNode4[newNode.Identity()]; found {
if existingNextHopStr, found := nextHopByLink[link.Attrs().Name]; found {
if existingNextHopStr != nextHopStr {
if n.neighNextHopRefCount.Delete(existingNextHopStr) {
neigh, found := n.neighByNextHop[existingNextHopStr]
Expand Down Expand Up @@ -820,7 +826,7 @@ func (n *linuxNodeHandler) insertNeighbor4(ctx context.Context, newNode *nodeTyp
nextHopIsNew = n.neighNextHopRefCount.Add(nextHopStr)
}

n.neighNextHopByNode4[newNode.Identity()] = nextHopStr
n.neighNextHopByNode4[newNode.Identity()][link.Attrs().Name] = nextHopStr
nh := NextHop{
Name: nextHopStr,
IP: nextHopIPv4,
Expand All @@ -840,7 +846,7 @@ func (n *linuxNodeHandler) insertNeighbor6(ctx context.Context, newNode *nodeTyp
logfields.IPAddr: newNodeIP,
})

nextHopIPv6, err := getNextHopIP(nextHopIPv6)
nextHopIPv6, err := getNextHopIP(nextHopIPv6, link)
if err != nil {
scopedLog.WithError(err).Info("Unable to determine next hop address")
return
Expand All @@ -851,8 +857,14 @@ func (n *linuxNodeHandler) insertNeighbor6(ctx context.Context, newNode *nodeTyp
n.neighLock.Lock()
defer n.neighLock.Unlock()

nextHopByLink, found := n.neighNextHopByNode6[newNode.Identity()]
if !found {
nextHopByLink = make(map[string]string)
n.neighNextHopByNode6[newNode.Identity()] = nextHopByLink
}

nextHopIsNew := false
if existingNextHopStr, found := n.neighNextHopByNode6[newNode.Identity()]; found {
if existingNextHopStr, found := nextHopByLink[link.Attrs().Name]; found {
if existingNextHopStr != nextHopStr {
if n.neighNextHopRefCount.Delete(existingNextHopStr) {
// nextHop has changed and nobody else is using it, so remove the old one.
Expand Down Expand Up @@ -886,7 +898,7 @@ func (n *linuxNodeHandler) insertNeighbor6(ctx context.Context, newNode *nodeTyp
nextHopIsNew = n.neighNextHopRefCount.Add(nextHopStr)
}

n.neighNextHopByNode6[newNode.Identity()] = nextHopStr
n.neighNextHopByNode6[newNode.Identity()][link.Attrs().Name] = nextHopStr
nh := NextHop{
Name: nextHopStr,
IP: nextHopIPv6,
Expand All @@ -904,22 +916,26 @@ func (n *linuxNodeHandler) insertNeighbor6(ctx context.Context, newNode *nodeTyp
// which tries to update neighbor entries previously inserted by insertNeighbor().
// In this case the kernel refreshes the entry via NTF_USE.
func (n *linuxNodeHandler) insertNeighbor(ctx context.Context, newNode *nodeTypes.Node, refresh bool) {
var link netlink.Link
var links []netlink.Link

n.neighLock.Lock()
if n.neighDiscoveryLink == nil || reflect.ValueOf(n.neighDiscoveryLink).IsNil() {
if n.neighDiscoveryLinks == nil || len(n.neighDiscoveryLinks) == 0 {
n.neighLock.Unlock()
// Nothing to do - the discovery link was not set yet
return
}
link = n.neighDiscoveryLink
links = n.neighDiscoveryLinks
n.neighLock.Unlock()

if newNode.GetNodeIP(false).To4() != nil {
n.insertNeighbor4(ctx, newNode, link, refresh)
for _, l := range links {
n.insertNeighbor4(ctx, newNode, l, refresh)
}
}
if newNode.GetNodeIP(true).To16() != nil {
n.insertNeighbor6(ctx, newNode, link, refresh)
for _, l := range links {
n.insertNeighbor6(ctx, newNode, l, refresh)
}
}
}

Expand Down Expand Up @@ -951,23 +967,27 @@ func (n *linuxNodeHandler) deleteNeighborCommon(nextHopStr string) {
func (n *linuxNodeHandler) deleteNeighbor4(oldNode *nodeTypes.Node) {
n.neighLock.Lock()
defer n.neighLock.Unlock()
nextHopStr, found := n.neighNextHopByNode4[oldNode.Identity()]
nextHopByLink, found := n.neighNextHopByNode4[oldNode.Identity()]
if !found {
return
}
defer func() { delete(n.neighNextHopByNode4, oldNode.Identity()) }()
n.deleteNeighborCommon(nextHopStr)
for _, nextHopStr := range nextHopByLink {
n.deleteNeighborCommon(nextHopStr)
}
}

func (n *linuxNodeHandler) deleteNeighbor6(oldNode *nodeTypes.Node) {
n.neighLock.Lock()
defer n.neighLock.Unlock()
nextHopStr, found := n.neighNextHopByNode6[oldNode.Identity()]
nextHopByLink, found := n.neighNextHopByNode6[oldNode.Identity()]
if !found {
return
}
defer func() { delete(n.neighNextHopByNode6, oldNode.Identity()) }()
n.deleteNeighborCommon(nextHopStr)
for _, nextHopStr := range nextHopByLink {
n.deleteNeighborCommon(nextHopStr)
}
}

func (n *linuxNodeHandler) deleteNeighbor(oldNode *nodeTypes.Node) {
Expand Down Expand Up @@ -1476,7 +1496,7 @@ func (n *linuxNodeHandler) NodeConfigurationChanged(newConfig datapath.LocalNode
n.nodeConfig = newConfig

if n.nodeConfig.EnableIPv4 || n.nodeConfig.EnableIPv6 {
ifaceName := ""
var ifaceNames []string
switch {
case !option.Config.EnableL2NeighDiscovery:
n.enableNeighDiscovery = false
Expand All @@ -1485,34 +1505,42 @@ func (n *linuxNodeHandler) NodeConfigurationChanged(newConfig datapath.LocalNode
return fmt.Errorf("direct routing device is required, but not defined")
}

mac, err := link.GetHardwareAddr(option.Config.DirectRoutingDevice)
var targetDevices []string
targetDevices = append(targetDevices, option.Config.DirectRoutingDevice)
targetDevices = append(targetDevices, option.Config.GetDevices()...)

var err error
ifaceNames, err = filterL2Devices(targetDevices)
if err != nil {
return err
}
ifaceName = option.Config.DirectRoutingDevice
n.enableNeighDiscovery = mac != nil // No need to arping for L2-less devices
n.enableNeighDiscovery = len(ifaceNames) != 0 // No need to arping for L2-less devices
case n.nodeConfig.EnableIPSec &&
option.Config.Tunnel == option.TunnelDisabled &&
len(option.Config.EncryptInterface) != 0:
// When FIB lookup is not supported we need to pick an
// interface so pick first interface in the list. On
// kernels with FIB lookup helpers we do a lookup from
// the datapath side and ignore this value.
ifaceName = option.Config.EncryptInterface[0]
ifaceNames = append(ifaceNames, option.Config.EncryptInterface[0])
n.enableNeighDiscovery = true
}

if n.enableNeighDiscovery {
link, err := netlink.LinkByName(ifaceName)
if err != nil {
return fmt.Errorf("cannot find link by name %s for neighbor discovery: %w",
ifaceName, err)
var neighDiscoveryLinks []netlink.Link
for _, ifaceName := range ifaceNames {
l, err := netlink.LinkByName(ifaceName)
if err != nil {
return fmt.Errorf("cannot find link by name %s for neighbor discovery: %w",
ifaceName, err)
}
neighDiscoveryLinks = append(neighDiscoveryLinks, l)
}

// Store neighDiscoveryLink so that we can remove the ARP
// PERM entries when cilium-agent starts with neigh discovery
// disabled next time.
err = storeNeighLink(option.Config.StateDir, ifaceName)
err := storeNeighLink(option.Config.StateDir, ifaceNames)
if err != nil {
log.WithError(err).Warning("Unable to store neighbor discovery iface." +
" Removing PERM neighbor entries upon cilium-agent init when neighbor" +
Expand All @@ -1522,7 +1550,7 @@ func (n *linuxNodeHandler) NodeConfigurationChanged(newConfig datapath.LocalNode
// neighDiscoveryLink can be accessed by a concurrent insertNeighbor
// goroutine.
n.neighLock.Lock()
n.neighDiscoveryLink = link
n.neighDiscoveryLinks = neighDiscoveryLinks
n.neighLock.Unlock()
}
}
Expand Down Expand Up @@ -1578,6 +1606,26 @@ func (n *linuxNodeHandler) NodeConfigurationChanged(newConfig datapath.LocalNode
return nil
}

func filterL2Devices(devices []string) ([]string, error) {
// Eliminate duplicates
deviceSets := make(map[string]struct{})
for _, d := range devices {
deviceSets[d] = struct{}{}
}

var l2devices []string
for k := range deviceSets {
mac, err := link.GetHardwareAddr(k)
if err != nil {
return nil, err
}
if mac != nil {
l2devices = append(l2devices, k)
}
}
return l2devices, nil
}

// NodeValidateImplementation is called to validate the implementation of the
// node in the datapath
func (n *linuxNodeHandler) NodeValidateImplementation(nodeToValidate nodeTypes.Node) error {
Expand Down Expand Up @@ -1733,13 +1781,13 @@ func (n *linuxNodeHandler) NodeCleanNeighborsLink(l netlink.Link, migrateOnly bo
// deleted (and the new agent instance did not see the delete event during the
// down/up cycle).
func (n *linuxNodeHandler) NodeCleanNeighbors(migrateOnly bool) {
linkName, err := loadNeighLink(option.Config.StateDir)
linkNames, err := loadNeighLink(option.Config.StateDir)
if err != nil {
log.WithError(err).Error("Unable to load neighbor discovery iface name" +
" for removing PERM neighbor entries")
return
}
if len(linkName) == 0 {
if len(linkNames) == 0 {
return
}

Expand All @@ -1752,54 +1800,74 @@ func (n *linuxNodeHandler) NodeCleanNeighbors(migrateOnly bool) {
}
}()

l, err := netlink.LinkByName(linkName)
if err != nil {
// If the link is not found we don't need to keep retrying cleaning
// up the neihbor entries so we can keep successClean=true
if _, ok := err.(netlink.LinkNotFoundError); !ok {
log.WithError(err).WithFields(logrus.Fields{
logfields.Device: linkName,
}).Error("Unable to remove PERM neighbor entries of network device")
successClean = false
for _, linkName := range linkNames {
l, err := netlink.LinkByName(linkName)
if err != nil {
// If the link is not found we don't need to keep retrying cleaning
// up the neihbor entries so we can keep successClean=true
if _, ok := err.(netlink.LinkNotFoundError); !ok {
log.WithError(err).WithFields(logrus.Fields{
logfields.Device: linkName,
}).Error("Unable to remove PERM neighbor entries of network device")
successClean = false
}
continue
}
return
}

successClean = n.NodeCleanNeighborsLink(l, migrateOnly)
successClean = n.NodeCleanNeighborsLink(l, migrateOnly)
}
}

func storeNeighLink(dir string, name string) error {
func storeNeighLink(dir string, names []string) error {
configFileName := filepath.Join(dir, neighFileName)
f, err := os.Create(configFileName)
if err != nil {
return fmt.Errorf("unable to create '%s': %w", configFileName, err)
}
defer f.Close()
nl := NeighLink{Name: name}
err = json.NewEncoder(f).Encode(nl)

var nls []NeighLink
for _, name := range names {
nls = append(nls, NeighLink{Name: name})
}
err = json.NewEncoder(f).Encode(nls)
if err != nil {
return fmt.Errorf("unable to encode '%+v': %w", nl, err)
return fmt.Errorf("unable to encode '%+v': %w", nls, err)
}
return nil
}

func loadNeighLink(dir string) (string, error) {
func loadNeighLink(dir string) ([]string, error) {
configFileName := filepath.Join(dir, neighFileName)
f, err := os.Open(configFileName)
if err != nil {
if os.IsNotExist(err) {
return "", nil
return nil, nil
}
return "", fmt.Errorf("unable to open '%s': %w", configFileName, err)
return nil, fmt.Errorf("unable to open '%s': %w", configFileName, err)
}
defer f.Close()

// Ensure backward compatibility
var nl NeighLink
if err = json.NewDecoder(f).Decode(&nl); err == nil {
if len(nl.Name) > 0 {
return []string{nl.Name}, nil
}
}

err = json.NewDecoder(f).Decode(&nl)
if err != nil {
return "", fmt.Errorf("unable to decode '%s': %w", configFileName, err)
var nls []NeighLink
if _, err := f.Seek(0, io.SeekStart); err != nil {
return nil, err
}
if err := json.NewDecoder(f).Decode(&nls); err != nil {
return nil, fmt.Errorf("unable to decode '%s': %w", configFileName, err)
}
var names []string
for _, nl := range nls {
names = append(names, nl.Name)
}
return nl.Name, nil
return names, nil
}

// NodeDeviceNameWithDefaultRoute returns the node's device name which
Expand Down
Loading

0 comments on commit 628e9dd

Please sign in to comment.