Skip to content

Commit

Permalink
datapath: Fix race with a deleted device after detection
Browse files Browse the repository at this point in the history
If a device was removed after device detection, but before DeviceManager.Listen()
was invoked, then this deletion would be missed and the device would be kept.

Fix the issue by subscribing to link updates and then checking if some links
no longer exist.

While fixing this I noticed that a better way to make sure DeviceManager uses
the right namespace is to use the netlink.Handle instead of pinning the OS thread
and setting the namespace for the thread.

Signed-off-by: Jussi Maki <jussi@isovalent.com>
  • Loading branch information
joamaki authored and joestringer committed May 13, 2022
1 parent 7a1aea7 commit 8941e96
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 67 deletions.
7 changes: 6 additions & 1 deletion daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,11 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma

nd := nodediscovery.NewNodeDiscovery(nodeMngr, mtuConfig, netConf)

devMngr, err := linuxdatapath.NewDeviceManager()
if err != nil {
return nil, nil, err
}

d := Daemon{
ctx: ctx,
cancel: cancel,
Expand All @@ -488,7 +493,7 @@ func NewDaemon(ctx context.Context, cancel context.CancelFunc, epMgr *endpointma
netConf: netConf,
mtuConfig: mtuConfig,
datapath: dp,
deviceManager: linuxdatapath.NewDeviceManager(),
deviceManager: devMngr,
nodeDiscovery: nd,
endpointCreations: newEndpointCreationManager(),
apiLimiterSet: apiLimiterSet,
Expand Down
111 changes: 60 additions & 51 deletions pkg/datapath/linux/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"fmt"
"net"
"runtime"
"sort"
"strings"

Expand Down Expand Up @@ -46,13 +45,25 @@ type DeviceManager struct {
lock.Mutex
devices map[string]struct{}
filter deviceFilter
handle *netlink.Handle
netns netns.NsHandle
}

func NewDeviceManager() *DeviceManager {
func NewDeviceManager() (*DeviceManager, error) {
return NewDeviceManagerAt(netns.None())
}

func NewDeviceManagerAt(netns netns.NsHandle) (*DeviceManager, error) {
handle, err := netlink.NewHandleAt(netns)
if err != nil {
return nil, err
}
return &DeviceManager{
devices: make(map[string]struct{}),
filter: deviceFilter(option.Config.Devices),
}
handle: handle,
netns: netns,
}, err
}

// Detect tries to detect devices to which BPF programs may be loaded.
Expand All @@ -65,11 +76,11 @@ func (dm *DeviceManager) Detect() ([]string, error) {
defer dm.Unlock()
dm.devices = make(map[string]struct{})

if err := expandDevices(); err != nil {
if err := dm.expandDevices(); err != nil {
return nil, err
}

if err := expandDirectRoutingDevice(); err != nil {
if err := dm.expandDirectRoutingDevice(); err != nil {
return nil, err
}

Expand All @@ -91,7 +102,7 @@ func (dm *DeviceManager) Detect() ([]string, error) {
family = netlink.FAMILY_V6
}

routes, err := netlink.RouteListFiltered(family, &routeFilter, routeFilterMask)
routes, err := dm.handle.RouteListFiltered(family, &routeFilter, routeFilterMask)
if err != nil {
return nil, fmt.Errorf("cannot retrieve routes for device detection: %w", err)
}
Expand Down Expand Up @@ -229,7 +240,7 @@ func (dm *DeviceManager) isViableDevice(l3DevOK, hasDefaultRoute bool, link netl
}

if link.Attrs().MasterIndex > 0 {
if master, err := netlink.LinkByIndex(link.Attrs().MasterIndex); err == nil {
if master, err := dm.handle.LinkByIndex(link.Attrs().MasterIndex); err == nil {
switch master.Type() {
case "bridge", "openvswitch":
log.WithField(logfields.Device, name).Debug("Ignoring device attached to bridge")
Expand Down Expand Up @@ -272,7 +283,7 @@ func (dm *DeviceManager) updateDevicesFromRoutes(l3DevOK bool, routes []netlink.

changed := false
for index, info := range linkInfos {
link, err := netlink.LinkByIndex(index)
link, err := dm.handle.LinkByIndex(index)
if err != nil {
log.WithError(err).WithField(logfields.LinkIndex, index).
Warn("Failed to get link by index")
Expand All @@ -296,67 +307,59 @@ func (dm *DeviceManager) updateDevicesFromRoutes(l3DevOK bool, routes []netlink.
return changed
}

func (dm *DeviceManager) handleLinkUpdate(update netlink.LinkUpdate) bool {
if update.Header.Type != unix.RTM_DELLINK {
return false
}

name := update.Attrs().Name

if _, ok := dm.devices[name]; !ok {
return false
}

delete(dm.devices, name)

return true
}

// Listen starts listening to changes to network devices. When devices change the new set
// of devices is sent on the returned channel.
func (dm *DeviceManager) Listen(ctx context.Context) (chan []string, error) {
return dm.listen(ctx, nil)
}

// listen is the internal method to start listening for changes to network devices. This is split from the
// public method to allow tests to listen to devices in specific network namespace.
func (dm *DeviceManager) listen(ctx context.Context, netNS *netns.NsHandle) (chan []string, error) {
l3DevOK := supportL3Dev()
routeChan := make(chan netlink.RouteUpdate)
closeChan := make(chan struct{})

err := netlink.RouteSubscribeWithOptions(routeChan, closeChan,
netlink.RouteSubscribeOptions{
Namespace: netNS,
// List existing routes to make sure we did not miss changes that happened after Detect().
ListExisting: true,
Namespace: &dm.netns,
})
if err != nil {
return nil, err
}

linkChan := make(chan netlink.LinkUpdate)

err = netlink.LinkSubscribeWithOptions(linkChan, closeChan, netlink.LinkSubscribeOptions{
Namespace: netNS,
ListExisting: true,
ListExisting: false,
Namespace: &dm.netns,
})
if err != nil {
return nil, err
}

devicesChan := make(chan []string)
devicesChan := make(chan []string, 1)

go func() {
// If a specific namespace is requested, lock the thread and set the
// threads namespace. This is currently only relevant for testing.
if netNS != nil {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
if err := netns.Set(*netNS); err != nil {
log.WithError(err).Fatal("Failed to set network namespace")
// Find links deleted after Detect()
if allLinks, err := dm.handle.LinkList(); err == nil {
changed := false
linksByName := map[string]struct{}{}
for _, link := range allLinks {
linksByName[link.Attrs().Name] = struct{}{}
}
dm.Lock()
for name := range dm.devices {
if _, exists := linksByName[name]; !exists {
delete(dm.devices, name)
changed = true
}
}
devices := dm.getDevices()
dm.Unlock()

if changed {
log.WithField(logfields.Devices, devices).Info("Devices changed")
devicesChan <- devices
}
}

go func() {
log.Info("Listening for device changes")

for {
Expand All @@ -382,9 +385,15 @@ func (dm *DeviceManager) listen(ctx context.Context, netNS *netns.NsHandle) (cha
}

case update := <-linkChan:
dm.Lock()
devicesChanged = dm.handleLinkUpdate(update)
dm.Unlock()
if update.Header.Type == unix.RTM_DELLINK {
name := update.Attrs().Name
dm.Lock()
if _, ok := dm.devices[name]; ok {
delete(dm.devices, name)
devicesChanged = true
}
dm.Unlock()
}
}

if devicesChanged {
Expand All @@ -405,8 +414,8 @@ func (dm *DeviceManager) AreDevicesRequired() bool {

// expandDevices expands all wildcard device names to concrete devices.
// e.g. device "eth+" expands to "eth0,eth1" etc. Non-matching wildcards are ignored.
func expandDevices() error {
expandedDevices, err := expandDeviceWildcards(option.Config.Devices, option.Devices)
func (dm *DeviceManager) expandDevices() error {
expandedDevices, err := dm.expandDeviceWildcards(option.Config.Devices, option.Devices)
if err != nil {
return err
}
Expand All @@ -415,20 +424,20 @@ func expandDevices() error {
}

// expandDirectRoutingDevice expands all wildcard device names to concrete devices and picks a first one.
func expandDirectRoutingDevice() error {
func (dm *DeviceManager) expandDirectRoutingDevice() error {
if option.Config.DirectRoutingDevice == "" {
return nil
}
expandedDevices, err := expandDeviceWildcards([]string{option.Config.DirectRoutingDevice}, option.DirectRoutingDevice)
expandedDevices, err := dm.expandDeviceWildcards([]string{option.Config.DirectRoutingDevice}, option.DirectRoutingDevice)
if err != nil {
return err
}
option.Config.DirectRoutingDevice = expandedDevices[0]
return nil
}

func expandDeviceWildcards(devices []string, option string) ([]string, error) {
allLinks, err := netlink.LinkList()
func (dm *DeviceManager) expandDeviceWildcards(devices []string, option string) ([]string, error) {
allLinks, err := dm.handle.LinkList()
if err != nil {
return nil, fmt.Errorf("device wildcard expansion failed to fetch devices: %w", err)
}
Expand Down
76 changes: 61 additions & 15 deletions pkg/datapath/linux/devices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ func (s *DevicesSuite) TearDownTest(c *C) {

func (s *DevicesSuite) TestDetect(c *C) {
s.withFreshNetNS(c, func() {
dm := NewDeviceManager()
dm, err := NewDeviceManager()
c.Assert(err, IsNil)

option.Config.Devices = []string{}
option.Config.DirectRoutingDevice = ""

Expand Down Expand Up @@ -175,14 +177,17 @@ func (s *DevicesSuite) TestExpandDevices(c *C) {
c.Assert(createDummy("other1", "192.168.3.4/24", false), IsNil)
c.Assert(createDummy("unmatched", "192.168.4.5/24", false), IsNil)

dm, err := NewDeviceManager()
c.Assert(err, IsNil)

// 1. Check expansion works and non-matching prefixes are ignored
option.Config.Devices = []string{"dummy+", "missing+", "other0+" /* duplicates: */, "dum+", "other0", "other1"}
c.Assert(expandDevices(), IsNil)
c.Assert(dm.expandDevices(), IsNil)
c.Assert(option.Config.Devices, checker.DeepEquals, []string{"dummy0", "dummy1", "other0", "other1"})

// 2. Check that expansion fails if devices are specified but yields empty expansion
option.Config.Devices = []string{"none+"}
c.Assert(expandDevices(), NotNil)
c.Assert(dm.expandDevices(), NotNil)
})
}

Expand All @@ -192,14 +197,17 @@ func (s *DevicesSuite) TestExpandDirectRoutingDevice(c *C) {
c.Assert(createDummy("dummy1", "192.168.1.2/24", false), IsNil)
c.Assert(createDummy("unmatched", "192.168.4.5/24", false), IsNil)

dm, err := NewDeviceManager()
c.Assert(err, IsNil)

// 1. Check expansion works and non-matching prefixes are ignored
option.Config.DirectRoutingDevice = "dummy+"
c.Assert(expandDirectRoutingDevice(), IsNil)
c.Assert(dm.expandDirectRoutingDevice(), IsNil)
c.Assert(option.Config.DirectRoutingDevice, Equals, "dummy0")

// 2. Check that expansion fails if directRoutingDevice is specified but yields empty expansion
option.Config.DirectRoutingDevice = "none+"
c.Assert(expandDirectRoutingDevice(), NotNil)
c.Assert(dm.expandDirectRoutingDevice(), NotNil)
})
}

Expand All @@ -210,13 +218,11 @@ func (s *DevicesSuite) TestListenForNewDevices(c *C) {

timeout := time.After(time.Second)

netns, err := netns.Get()
c.Assert(err, IsNil)

option.Config.Devices = []string{}
dm := NewDeviceManager()
dm, err := NewDeviceManager()
c.Assert(err, IsNil)

devicesChan, err := dm.listen(ctx, &netns)
devicesChan, err := dm.Listen(ctx)
c.Assert(err, IsNil)

// Create the IPv4 & IPv6 devices that should be detected.
Expand Down Expand Up @@ -271,13 +277,11 @@ func (s *DevicesSuite) TestListenForNewDevicesFiltered(c *C) {

timeout := time.After(time.Second)

netns, err := netns.Get()
c.Assert(err, IsNil)

option.Config.Devices = []string{"dummy+"}
dm := NewDeviceManager()
dm, err := NewDeviceManager()
c.Assert(err, IsNil)

devicesChan, err := dm.listen(ctx, &netns)
devicesChan, err := dm.Listen(ctx)
c.Assert(err, IsNil)

// Create the IPv4 & IPv6 devices that should be detected.
Expand All @@ -301,6 +305,48 @@ func (s *DevicesSuite) TestListenForNewDevicesFiltered(c *C) {
})
}

func (s *DevicesSuite) TestListenAfterDelete(c *C) {
s.withFreshNetNS(c, func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

timeout := time.After(time.Second)

option.Config.Devices = []string{"dummy+"}
dm, err := NewDeviceManager()
c.Assert(err, IsNil)

c.Assert(createDummy("dummy0", "192.168.1.2/24", false), IsNil)
c.Assert(createDummy("dummy1", "2001:db8::face/64", true), IsNil)

// Detect the devices
devices, err := dm.Detect()
c.Assert(err, IsNil)
c.Assert(devices, checker.DeepEquals, []string{"dummy0", "dummy1"})

// Delete one of the devices before listening
link, err := netlink.LinkByName("dummy1")
c.Assert(err, IsNil)
err = netlink.LinkDel(link)
c.Assert(err, IsNil)

// Now start listening to device changes. We expect the dummy1 to
// be deleted.
devicesChan, err := dm.Listen(ctx)
c.Assert(err, IsNil)

passed := false
for !passed {
select {
case <-timeout:
c.Fatal("Test timed out")
case devices := <-devicesChan:
passed, _ = checker.DeepEqual(devices, []string{"dummy0"})
}
}
})
}

func (s *DevicesSuite) withFreshNetNS(c *C, test func()) {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
Expand Down

0 comments on commit 8941e96

Please sign in to comment.