Skip to content

Commit

Permalink
Add netlink subscriber for link manager
Browse files Browse the repository at this point in the history
Now link manager will react to events for links, rather than just
running every so often.

Signed-off-by: Tim Rozet <trozet@redhat.com>
  • Loading branch information
trozet committed Dec 18, 2023
1 parent 815d6f8 commit 4de98f9
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 89 deletions.
10 changes: 3 additions & 7 deletions go-controller/pkg/node/controllers/egressip/egressip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func initController(namespaces []corev1.Namespace, pods []corev1.Pod, egressIPs
if err := watchFactory.Start(); err != nil {
return nil, nil, err
}
linkManager := linkmanager.NewController(node1Name, v4, v6)
linkManager := linkmanager.NewController(node1Name, v4, v6, nil)
c, err := NewController(watchFactory.EgressIPInformer(), watchFactory.NodeInformer(), watchFactory.NamespaceInformer(),
watchFactory.PodCoreInformer(), rm, v4, v6, node1Name, linkManager)
if err != nil {
Expand Down Expand Up @@ -261,12 +261,8 @@ func runController(testNS ns.NetNS, c *Controller) (cleanupFn, error) {
// we do not call start for our controller because the newly created goroutines will not be set to the correct network namespace,
// so we invoke them manually here and call reconcile manually
// normally executed during Run but we call it manually here because run spawns a go routine that we cannot control its netns during test
wg.Add(1)
go testNS.Do(func(netNS ns.NetNS) error {
c.linkManager.Run(stopCh, 10*time.Millisecond, nil)
wg.Done()
return nil
})
c.linkManager.Run(stopCh, wg)

wg.Add(1)
go testNS.Do(func(netNS ns.NetNS) error {
c.iptablesManager.Run(stopCh, 10*time.Millisecond)
Expand Down
8 changes: 2 additions & 6 deletions go-controller/pkg/node/default_node_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error {
}

// create link manager, will work for egress IP as well as monitoring MAC changes to default gw bridge
linkManager := linkmanager.NewController(nc.name, config.IPv4Mode, config.IPv6Mode)
linkManager := linkmanager.NewController(nc.name, config.IPv4Mode, config.IPv6Mode, nc.updateGatewayMAC)

if config.OVNKubernetesFeature.EnableEgressIP && !util.PlatformTypeIsEgressIPCloudProvider() {
c, err := egressip.NewController(nc.watchFactory.EgressIPInformer(), nc.watchFactory.NodeInformer(),
Expand All @@ -1178,11 +1178,7 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error {
klog.Infof("Egress IP for non-OVN managed networks is disabled")
}

nc.wg.Add(1)
go func() {
linkManager.Run(nc.stopChan, 30*time.Second, nc.updateGatewayMAC)
nc.wg.Done()
}()
linkManager.Run(nc.stopChan, nc.wg)

nc.wg.Add(1)
go func() {
Expand Down
221 changes: 147 additions & 74 deletions go-controller/pkg/node/linkmanager/link_network_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,97 @@ type LinkAddress struct {
}

type Controller struct {
mu *sync.Mutex
name string
ipv4Enabled bool
ipv6Enabled bool
store map[string][]netlink.Addr
mu *sync.Mutex
name string
ipv4Enabled bool
ipv6Enabled bool
store map[string][]netlink.Addr
linkHandlerFunc func(link netlink.Link) error
}

// NewController creates a controller to manage linux network interfaces
func NewController(name string, v4, v6 bool) *Controller {
func NewController(name string, v4, v6 bool, linkHandlerFunc func(link netlink.Link) error) *Controller {
return &Controller{
mu: &sync.Mutex{},
name: name,
ipv4Enabled: v4,
ipv6Enabled: v6,
store: make(map[string][]netlink.Addr, 0),
mu: &sync.Mutex{},
name: name,
ipv4Enabled: v4,
ipv6Enabled: v6,
store: make(map[string][]netlink.Addr),
linkHandlerFunc: linkHandlerFunc,
}
}

// Run starts the controller and syncs at least every syncPeriod
// linkHandlerFunc fires as an additional handler when reconcile runs
func (c *Controller) Run(stopCh <-chan struct{}, syncPeriod time.Duration, linkHandlerFunc func(link netlink.Link) error) {
ticker := time.NewTicker(syncPeriod)
defer ticker.Stop()
for {
select {
case <-stopCh:
return
case <-ticker.C:
c.mu.Lock()
c.reconcile(linkHandlerFunc)
c.mu.Unlock()
// linkHandlerFunc fires as an additional handler when sync runs
func (c *Controller) Run(stopCh <-chan struct{}, doneWg *sync.WaitGroup) {
linkSubscribeOptions := netlink.LinkSubscribeOptions{
ErrorCallback: func(err error) {
klog.Errorf("Failed during LinkSubscribe callback: %v", err)
// Note: Not calling sync() from here: it is redundant and unsafe when stopChan is closed.
},
}

subscribe := func() (bool, chan netlink.LinkUpdate, error) {
linkChan := make(chan netlink.LinkUpdate)
if err := netlink.LinkSubscribeWithOptions(linkChan, stopCh, linkSubscribeOptions); err != nil {
return false, nil, err
}
// sync the manager with current addresses on the node
c.sync()
return true, linkChan, nil
}

c.runInternal(stopCh, doneWg, subscribe)
}

type subscribeFn func() (bool, chan netlink.LinkUpdate, error)

// runInternal can be used by testcases to provide a fake subscription function
// rather than using netlink
func (c *Controller) runInternal(stopChan <-chan struct{}, doneWg *sync.WaitGroup, subscribe subscribeFn) {

doneWg.Add(1)
go func() {
defer doneWg.Done()

linkSyncTimer := time.NewTicker(2 * time.Minute)
defer linkSyncTimer.Stop()

subscribed, addrChan, err := subscribe()
if err != nil {
klog.Errorf("Error during netlink subscribe for Link Manager: %v", err)
}

for {
select {
case a, ok := <-addrChan:
linkSyncTimer.Reset(2 * time.Minute)
if !ok {
if subscribed, addrChan, err = subscribe(); err != nil {
klog.Errorf("Error during netlink re-subscribe due to channel closing for Link Manager: %v", err)
}
continue
}
if err := c.syncLinkLocked(a.Link); err != nil {
klog.Errorf("Error while syncing link %q: %v", a.Link.Attrs().Name, err)
}

case <-linkSyncTimer.C:
if subscribed {
klog.V(5).Info("Link manager calling sync() explicitly")
c.sync()
} else {
if subscribed, addrChan, err = subscribe(); err != nil {
klog.Errorf("Error during netlink re-subscribe for IP Manager: %v", err)
}
}
case <-stopChan:
return
}
}
}()

klog.Info("Link manager is running")
}

// AddAddress stores the address in a store and ensures its applied
Expand All @@ -75,8 +132,7 @@ func (c *Controller) AddAddress(address netlink.Addr) error {
// overwrite label to the name of this component in-order to aid address ownership. Label must start with link name.
address.Label = GetAssignedAddressLabel(link.Attrs().Name)
c.addAddressToStore(link.Attrs().Name, address)
c.reconcile(nil)
return nil
return c.syncLink(link)
}

// DelAddress removes the address from the store and ensure its removed from a link
Expand All @@ -91,11 +147,71 @@ func (c *Controller) DelAddress(address netlink.Addr) error {
c.mu.Lock()
defer c.mu.Unlock()
c.delAddressFromStore(link.Attrs().Name, address)
c.reconcile(nil)
return c.syncLink(link)
}

// syncLinkLocked is just a wrapper around syncLink that ensures the mutex is locked in advance
func (c *Controller) syncLinkLocked(link netlink.Link) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.syncLink(link)
}

// syncLink handles link updates
// It MUST be called with the controller locked
func (c *Controller) syncLink(link netlink.Link) error {
if c.linkHandlerFunc != nil {
if err := c.linkHandlerFunc(link); err != nil {
klog.Errorf("Failed to execute link handler function on link: %s, error: %v", link.Attrs().Name, err)
}
}
linkName := link.Attrs().Name
// get all addresses associated with the link depending on which IP families we support
foundAddresses, err := getAllLinkAddressesByIPFamily(link, c.ipv4Enabled, c.ipv6Enabled)
if err != nil {
return fmt.Errorf("failed to get address from link %q: %w", linkName, err)
}
wantedAddresses, found := c.store[linkName]
// cleanup any stale addresses on the link
for _, foundAddress := range foundAddresses {
// we label any address we create, so if we aren't managing a link, we must remove any stale addresses
if foundAddress.Label == GetAssignedAddressLabel(linkName) && !containsAddress(wantedAddresses, foundAddress) {
if err := util.GetNetLinkOps().AddrDel(link, &foundAddress); err != nil && !util.GetNetLinkOps().IsLinkNotFoundError(err) {
klog.Errorf("Link Network Manager: failed to delete address %q from link %q",
foundAddress.String(), linkName)
} else {
klog.Infof("Link Network Manager: successfully removed stale address %q from link %q",
foundAddress.String(), linkName)
}
}
}
// we don't manage this link therefore we don't need to add any addresses
if !found {
return nil
}
// add addresses we want that are not found on the link
for _, addressWanted := range wantedAddresses {
if containsAddress(foundAddresses, addressWanted) {
continue
}
if err = util.GetNetLinkOps().AddrAdd(link, &addressWanted); err != nil {
klog.Errorf("Link manager: failed to add address %q to link %q: %v", addressWanted.String(), linkName, err)
}
// For IPv4, use arping to try to update other hosts ARP caches, in case this IP was
// previously active on another node
if addressWanted.IP.To4() != nil {
if err = arping.GratuitousArpOverIfaceByName(addressWanted.IP, linkName); err != nil {
klog.Errorf("Failed to send a GARP for IP %s over interface %s: %v", addressWanted.IP.String(),
linkName, err)
}
}
klog.Infof("Link manager completed adding address %s to link %s", addressWanted, linkName)
}

return nil
}

func (c *Controller) reconcile(linkHandlerFunc func(link netlink.Link) error) {
func (c *Controller) sync() {
// 1. get all the links on the node
// 2. iterate over the links and get the addresses associated with it
// 3. cleanup any stale addresses from link that we no longer managed
Expand All @@ -106,54 +222,11 @@ func (c *Controller) reconcile(linkHandlerFunc func(link netlink.Link) error) {
klog.Errorf("Link Network Manager: failed to list links: %v", err)
return
}
c.mu.Lock()
defer c.mu.Unlock()
for _, link := range links {
if linkHandlerFunc != nil {
if err := linkHandlerFunc(link); err != nil {
klog.Errorf("Failed to execute link handler function on link: %s, error: %v", link.Attrs().Name, err)
}
}
linkName := link.Attrs().Name
// get all addresses associated with the link depending on which IP families we support
foundAddresses, err := getAllLinkAddressesByIPFamily(link, c.ipv4Enabled, c.ipv6Enabled)
if err != nil {
klog.Errorf("Link Network Manager: failed to get address from link %q", linkName)
continue
}
wantedAddresses, found := c.store[linkName]
// cleanup any stale addresses on the link
for _, foundAddress := range foundAddresses {
// we label any address we create, so if we aren't managing a link, we must remove any stale addresses
if foundAddress.Label == GetAssignedAddressLabel(linkName) && !containsAddress(wantedAddresses, foundAddress) {
if err := util.GetNetLinkOps().AddrDel(link, &foundAddress); err != nil && !util.GetNetLinkOps().IsLinkNotFoundError(err) {
klog.Errorf("Link Network Manager: failed to delete address %q from link %q",
foundAddress.String(), linkName)
} else {
klog.Infof("Link Network Manager: successfully removed stale address %q from link %q",
foundAddress.String(), linkName)
}
}
}
// we don't manage this link therefore we don't need to add any addresses
if !found {
continue
}
// add addresses we want that are not found on the link
for _, addressWanted := range wantedAddresses {
if containsAddress(foundAddresses, addressWanted) {
continue
}
if err = util.GetNetLinkOps().AddrAdd(link, &addressWanted); err != nil {
klog.Errorf("Link manager: failed to add address %q to link %q: %v", addressWanted.String(), linkName, err)
}
// For IPv4, use arping to try to update other hosts ARP caches, in case this IP was
// previously active on another node
if addressWanted.IP.To4() != nil {
if err = arping.GratuitousArpOverIfaceByName(addressWanted.IP, linkName); err != nil {
klog.Errorf("Failed to send a GARP for IP %s over interface %s: %v", addressWanted.IP.String(),
linkName, err)
}
}
klog.Infof("Link manager completed adding address %s to link %s", addressWanted, linkName)
if err := c.syncLink(link); err != nil {
klog.Errorf("Sync Link failed for link %q: %v", link.Attrs().Name, err)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var _ = ginkgo.Describe("Link network manager", func() {
nlMock.On("AddrList", nlLink1Mock, getIPFamilyInt(v4Enabled, v6Enabled)).Return(getLinkAddrs(existingLinkAddr, linkName1), nil)
nlMock.On("AddrList", nlLink2Mock, getIPFamilyInt(v4Enabled, v6Enabled)).Return(getLinkAddrs(existingLinkAddr, linkName2), nil)
nlMock.On("AddrAdd", nlLink1Mock, &expectedAddr).Return(nil)
c = NewController("test", v4Enabled, v6Enabled)
c = NewController("test", v4Enabled, v6Enabled, nil)
c.store = existingStore
err := c.AddAddress(addrToAdd)
expectedResMatcher := gomega.Succeed()
Expand Down Expand Up @@ -169,7 +169,7 @@ var _ = ginkgo.Describe("Link network manager", func() {
nlMock.On("AddrList", nlLink1Mock, getIPFamilyInt(v4Enabled, v6Enabled)).Return(getLinkAddrs(existingLinkAddr, linkName1), nil)
nlMock.On("AddrList", nlLink2Mock, getIPFamilyInt(v4Enabled, v6Enabled)).Return(getLinkAddrs(existingLinkAddr, linkName2), nil)
nlMock.On("AddrDel", nlLink1Mock, &expectedAddr).Return(nil)
c = NewController("test", v4Enabled, v6Enabled)
c = NewController("test", v4Enabled, v6Enabled, nil)
c.store = existingStore
err := c.DelAddress(addrToDel)
expectedResMatcher := gomega.Succeed()
Expand Down

0 comments on commit 4de98f9

Please sign in to comment.