Skip to content

Commit

Permalink
fix: watch bufer overrun for RouteStatus
Browse files Browse the repository at this point in the history
Fixes #8157

This PR contains two fixes, both related to the same problem.

Several routes for different links but  same IPv6 destination might exist
at the same time, so route resource ID should handle that. The problem
was that these routes were mis-reported causing internally updates for
the same resources multiple times (equal to the number of the links).

Don't trigger controllers more often than 10 times/seconds (with burst of
5) for kernel notifications. This ensures Talos doesn't try to reflect
current state of the network subsystem too often as resources, which
causes excessive CPU usage and might potentially lead to the buffer
overrun under high rate of changes.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira committed Jan 17, 2024
1 parent cc06b5d commit 474eccd
Show file tree
Hide file tree
Showing 16 changed files with 152 additions and 19 deletions.
2 changes: 1 addition & 1 deletion internal/app/machined/pkg/controllers/kubespan/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo
if err = safe.WriterModify(ctx, r,
network.NewRouteSpec(
network.ConfigNamespaceName,
network.LayeredID(network.ConfigOperator, network.RouteID(spec.Table, spec.Family, spec.Destination, spec.Gateway, spec.Priority)),
network.LayeredID(network.ConfigOperator, network.RouteID(spec.Table, spec.Family, spec.Destination, spec.Gateway, spec.Priority, spec.OutLinkName)),
),
func(r *network.RouteSpec) error {
*r.TypedSpec() = spec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (suite *ManagerSuite) TestReconcile() {
netip.Prefix{},
netip.Addr{},
1,
"kubespan",
),
),
func(res *network.RouteSpec, asrt *assert.Assertions) {},
Expand All @@ -160,6 +161,7 @@ func (suite *ManagerSuite) TestReconcile() {
netip.Prefix{},
netip.Addr{},
1,
"kubespan",
),
),
func(res *network.RouteSpec, asrt *assert.Assertions) {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (ctrl *AddressSpecController) Outputs() []controller.Output {
//nolint:gocyclo
func (ctrl *AddressSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
// watch link changes as some address might need to be re-applied if the link appears
watcher, err := watch.NewRtNetlink(r, unix.RTMGRP_LINK)
watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (ctrl *AddressStatusController) Outputs() []controller.Output {
//
//nolint:gocyclo
func (ctrl *AddressStatusController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
watcher, err := watch.NewRtNetlink(r, unix.RTMGRP_LINK|unix.RTMGRP_IPV4_IFADDR|unix.RTMGRP_IPV6_IFADDR)
watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK|unix.RTMGRP_IPV4_IFADDR|unix.RTMGRP_IPV6_IFADDR)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/machined/pkg/controllers/network/link_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (ctrl *LinkSpecController) Run(ctx context.Context, r controller.Runtime, l
}

// watch link changes as some routes might need to be re-applied if the link appears
watcher, err := watch.NewRtNetlink(r, unix.RTMGRP_LINK)
watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/app/machined/pkg/controllers/network/link_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ func (ctrl *LinkStatusController) Run(ctx context.Context, r controller.Runtime,
// create watch connections to rtnetlink and ethtool via genetlink
// these connections are used only to join multicast groups and receive notifications on changes
// other connections are used to send requests and receive responses, as we can't mix the notifications and request/responses
rtnetlinkWatcher, err := watch.NewRtNetlink(r, unix.RTMGRP_LINK)
rtnetlinkWatcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK)
if err != nil {
return err
}

defer rtnetlinkWatcher.Done()

ethtoolWatcher, err := watch.NewEthtool(r)
ethtoolWatcher, err := watch.NewEthtool(watch.NewDefaultRateLimitedTrigger(ctx, r))
if err != nil {
logger.Warn("ethtool watcher failed to start", zap.Error(err))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,10 @@ func (ctrl *OperatorSpecController) reconcileOperatorOutputs(ctx context.Context
if err := apply(
network.NewRouteSpec(
network.ConfigNamespaceName,
fmt.Sprintf("%s/%s", op.Operator.Prefix(), network.RouteID(routeSpec.Table, routeSpec.Family, routeSpec.Destination, routeSpec.Gateway, routeSpec.Priority)),
fmt.Sprintf("%s/%s",
op.Operator.Prefix(),
network.RouteID(routeSpec.Table, routeSpec.Family, routeSpec.Destination, routeSpec.Gateway, routeSpec.Priority, routeSpec.OutLinkName),
),
),
func(r resource.Resource) {
*r.(*network.RouteSpec).TypedSpec() = routeSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,10 @@ func (ctrl *PlatformConfigController) apply(ctx context.Context, r controller.Ru
idBuilder: func(spec interface{}) (resource.ID, error) {
routeSpec := spec.(network.RouteSpecSpec) //nolint:errcheck,forcetypeassert

return network.LayeredID(network.ConfigPlatform, network.RouteID(routeSpec.Table, routeSpec.Family, routeSpec.Destination, routeSpec.Gateway, routeSpec.Priority)), nil
return network.LayeredID(
network.ConfigPlatform,
network.RouteID(routeSpec.Table, routeSpec.Family, routeSpec.Destination, routeSpec.Gateway, routeSpec.Priority, routeSpec.OutLinkName),
), nil
},
resourceBuilder: func(id string) resource.Resource {
return network.NewRouteSpec(network.ConfigNamespaceName, id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (ctrl *RouteConfigController) apply(ctx context.Context, r controller.Runti

for _, route := range routes {
route := route
id := network.LayeredID(route.ConfigLayer, network.RouteID(route.Table, route.Family, route.Destination, route.Gateway, route.Priority))
id := network.LayeredID(route.ConfigLayer, network.RouteID(route.Table, route.Family, route.Destination, route.Gateway, route.Priority, route.OutLinkName))

if err := r.Modify(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (suite *RouteConfigSuite) TestMachineConfiguration() {

suite.assertRoutes(
[]string{
"configuration/inet6/2001:470:6d:30e:8ed2:b60c:9d2f:803b//1024",
"configuration/eth2/inet6/2001:470:6d:30e:8ed2:b60c:9d2f:803b//1024",
"configuration/inet4/10.0.3.1/10.0.3.0/24/1024",
"configuration/inet4/192.168.0.25/192.168.0.0/18/25",
"configuration/inet4/192.244.0.1/192.244.0.0/24/1024",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (ctrl *RouteMergeController) Run(ctx context.Context, r controller.Runtime,

for _, res := range list.Items {
route := res.(*network.RouteSpec) //nolint:errcheck,forcetypeassert
id := network.RouteID(route.TypedSpec().Table, route.TypedSpec().Family, route.TypedSpec().Destination, route.TypedSpec().Gateway, route.TypedSpec().Priority)
id := network.RouteID(route.TypedSpec().Table, route.TypedSpec().Family, route.TypedSpec().Destination, route.TypedSpec().Gateway, route.TypedSpec().Priority, route.TypedSpec().OutLinkName)

existing, ok := routes[id]
if ok && existing.TypedSpec().ConfigLayer > route.TypedSpec().ConfigLayer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (ctrl *RouteSpecController) Outputs() []controller.Output {
//nolint:gocyclo
func (ctrl *RouteSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
// watch link changes as some routes might need to be re-applied if the link appears
watcher, err := watch.NewRtNetlink(r, unix.RTMGRP_LINK|unix.RTMGRP_IPV4_ROUTE)
watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK|unix.RTMGRP_IPV4_ROUTE)
if err != nil {
return err
}
Expand Down
8 changes: 5 additions & 3 deletions internal/app/machined/pkg/controllers/network/route_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (ctrl *RouteStatusController) Outputs() []controller.Output {
//
//nolint:gocyclo
func (ctrl *RouteStatusController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
watcher, err := watch.NewRtNetlink(r, unix.RTMGRP_IPV4_MROUTE|unix.RTMGRP_IPV4_ROUTE|unix.RTMGRP_IPV6_MROUTE|unix.RTMGRP_IPV6_ROUTE)
watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_IPV4_MROUTE|unix.RTMGRP_IPV4_ROUTE|unix.RTMGRP_IPV6_MROUTE|unix.RTMGRP_IPV6_ROUTE)
if err != nil {
return err
}
Expand Down Expand Up @@ -104,7 +104,9 @@ func (ctrl *RouteStatusController) Run(ctx context.Context, r controller.Runtime
dstPrefix := netip.PrefixFrom(dstAddr, int(route.DstLength))
srcAddr, _ := netip.AddrFromSlice(route.Attributes.Src)
gatewayAddr, _ := netip.AddrFromSlice(route.Attributes.Gateway)
id := network.RouteID(nethelpers.RoutingTable(route.Table), nethelpers.Family(route.Family), dstPrefix, gatewayAddr, route.Attributes.Priority)
outLinkName := linkLookup[route.Attributes.OutIface]

id := network.RouteID(nethelpers.RoutingTable(route.Table), nethelpers.Family(route.Family), dstPrefix, gatewayAddr, route.Attributes.Priority, outLinkName)

if err = r.Modify(ctx, network.NewRouteStatus(network.NamespaceName, id), func(r resource.Resource) error {
status := r.(*network.RouteStatus).TypedSpec()
Expand All @@ -114,7 +116,7 @@ func (ctrl *RouteStatusController) Run(ctx context.Context, r controller.Runtime
status.Source = srcAddr
status.Gateway = gatewayAddr
status.OutLinkIndex = route.Attributes.OutIface
status.OutLinkName = linkLookup[route.Attributes.OutIface]
status.OutLinkName = outLinkName
status.Priority = route.Attributes.Priority
status.Table = nethelpers.RoutingTable(route.Table)
status.Scope = nethelpers.Scope(route.Scope)
Expand Down
74 changes: 74 additions & 0 deletions internal/app/machined/pkg/controllers/network/watch/trigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package watch

import (
"context"

"golang.org/x/time/rate"
)

// RateLimitedTrigger wraps a Trigger with rate limiting.
type RateLimitedTrigger struct {
trigger Trigger
limiter *rate.Limiter
ch chan struct{}
}

// Interface check.
var _ Trigger = &RateLimitedTrigger{}

// NewRateLimitedTrigger creates a new RateLimitedTrigger with specified params.
//
// Trigger's goroutine exists when the context is canceled.
func NewRateLimitedTrigger(ctx context.Context, trigger Trigger, rateLimit rate.Limit, burst int) *RateLimitedTrigger {
t := &RateLimitedTrigger{
trigger: trigger,
limiter: rate.NewLimiter(rateLimit, burst),
ch: make(chan struct{}),
}

go t.run(ctx)

return t
}

// NewDefaultRateLimitedTrigger creates a new RateLimitedTrigger with default params.
func NewDefaultRateLimitedTrigger(ctx context.Context, trigger Trigger) *RateLimitedTrigger {
const (
defaultRate = 10 // 10 events per second
defaultBurst = 5 // 5 events
)

return NewRateLimitedTrigger(ctx, trigger, defaultRate, defaultBurst)
}

// QueueReconcile implements Trigger interface.
//
// The event is queued if the goroutine is ready to accept it (otherwise it's already
// busy processing a previous event).
// This function returns immediately.
func (t *RateLimitedTrigger) QueueReconcile() {
select {
case t.ch <- struct{}{}:
default:
}
}

func (t *RateLimitedTrigger) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-t.ch:
}

if err := t.limiter.Wait(ctx); err != nil {
return
}

t.trigger.QueueReconcile()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package watch_test

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/network/watch"
)

type mockTrigger struct {
count atomic.Int64
}

func (t *mockTrigger) QueueReconcile() {
t.count.Add(1)
}

func (t *mockTrigger) Get() int64 {
return t.count.Load()
}

func TestRateLimitedTrigger(t *testing.T) {
mock := &mockTrigger{}

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

trigger := watch.NewRateLimitedTrigger(ctx, mock, 10, 5)

start := time.Now()

for time.Since(start) < time.Second {
trigger.QueueReconcile()
}

assert.InDelta(t, int64(14), mock.Get(), 5)
}
12 changes: 8 additions & 4 deletions pkg/machinery/resources/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,21 @@ func LinkID(linkName string) string {
}

// RouteID builds ID (primary key) for the route.
func RouteID(table nethelpers.RoutingTable, family nethelpers.Family, destination netip.Prefix, gateway netip.Addr, priority uint32) string {
func RouteID(table nethelpers.RoutingTable, family nethelpers.Family, destination netip.Prefix, gateway netip.Addr, priority uint32, outLinkName string) string {
dst, _ := destination.MarshalText() //nolint:errcheck
gw, _ := gateway.MarshalText() //nolint:errcheck

tablePrefix := ""
prefix := ""

if table != nethelpers.TableMain {
tablePrefix = fmt.Sprintf("%s/", table)
prefix = fmt.Sprintf("%s/", table)
}

return fmt.Sprintf("%s%s/%s/%s/%d", tablePrefix, family, string(gw), string(dst), priority)
if family == nethelpers.FamilyInet6 {
prefix += fmt.Sprintf("%s/", outLinkName)
}

return fmt.Sprintf("%s%s/%s/%s/%d", prefix, family, string(gw), string(dst), priority)
}

// OperatorID builds ID (primary key) for the operators.
Expand Down

0 comments on commit 474eccd

Please sign in to comment.