Skip to content

Commit

Permalink
fix: retry in the fixed amount of time if grpc relay failed
Browse files Browse the repository at this point in the history
Before this commit, if tunnel failed with error, it would never restart again until `siderolink.TunnelType` event happen.
For most of the time it's a good idea, because it might mean that destination has changed.

But tunnel can also fail because allowed peer list is not yet loaded on newly started Omni instance.

Because of that, we want to try again and not be tied to the runtime event channel.

Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
  • Loading branch information
DmitriyMV committed Apr 3, 2024
1 parent d320498 commit 71d90ba
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,8 @@ func (suite *AddressMergeSuite) TestMergeFlapping() {
resource.VersionUndefined,
),
"foo",
); err != nil {
if err != nil && !state.IsNotFoundError(err) {
return err
}
); err != nil && !state.IsNotFoundError(err) {
return err
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,8 @@ func (suite *LinkMergeSuite) TestMergeFlapping() {
resource.VersionUndefined,
),
"foo",
); err != nil {
if err != nil && !state.IsNotFoundError(err) {
return err
}
); err != nil && !state.IsNotFoundError(err) {
return err
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,8 @@ func (suite *OperatorMergeSuite) TestMergeFlapping() {
resource.VersionUndefined,
),
"foo",
); err != nil {
if err != nil && !state.IsNotFoundError(err) {
return err
}
); err != nil && !state.IsNotFoundError(err) {
return err
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,8 @@ func (suite *RouteMergeSuite) TestMergeFlapping() {
resource.VersionUndefined,
),
"foo",
); err != nil {
if err != nil && !state.IsNotFoundError(err) {
return err
}
); err != nil && !state.IsNotFoundError(err) {
return err
}
}

Expand Down
98 changes: 52 additions & 46 deletions internal/app/machined/pkg/controllers/siderolink/userspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"fmt"
"net/netip"
"sync/atomic"
"time"

"github.com/cosi-project/runtime/pkg/controller"
Expand Down Expand Up @@ -61,14 +60,18 @@ func (ctrl *UserspaceWireguardController) Outputs() []controller.Output {
func (ctrl *UserspaceWireguardController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
eg, ctx := errgroup.WithContext(ctx)

relayRetryTimer := time.NewTimer(0)

safeReset(relayRetryTimer, 0)

var (
tunnelDevice atomic.Pointer[wgtunnel.TunnelDevice]
tunnelRelay atomic.Pointer[tunnelProps]
tunnelDevice tunnelDeviceProps
tunnelRelay tunnelProps
)

defer func() {
tunnelRelay.Load().Relay().Close()
tunnelDevice.Load().Close()
tunnelRelay.relay.Close()
tunnelDevice.device.Close()
}()

const (
Expand All @@ -85,37 +88,39 @@ func (ctrl *UserspaceWireguardController) Run(ctx context.Context, r controller.
case <-ctx.Done():
return ctxutil.Cause(ctx)
case <-r.EventCh():
case <-relayRetryTimer.C:
}

res, err := safe.ReaderGetByID[*siderolink.Tunnel](ctx, r, siderolink.TunnelID)
if err != nil {
if state.IsNotFoundError(err) {
tunnelRelay.Load().Relay().Close()
tunnelDevice.Load().Close()
tunnelRelay.relay.Close()
tunnelDevice.device.Close()

continue
}

return fmt.Errorf("failed to read link spec: %w", err)
}

if td := tunnelDevice.Load(); td.IsClosed() {
td.Close()
if tunnelDevice.device.IsClosed() {
tunnelDevice.device.Close()

td, err = wgtunnel.NewTunnelDevice(res.TypedSpec().LinkName, res.TypedSpec().MTU, qp, ctrl.makeLogger(logger))
dev, err := wgtunnel.NewTunnelDevice(res.TypedSpec().LinkName, res.TypedSpec().MTU, qp, ctrl.makeLogger(logger))
if err != nil {
return fmt.Errorf("failed to create tunnel device: %w", err)
}

tunnelDevice.Store(td)
// Store in outer scope because modifying the same variable will lead to the data race below
tunnelDevice = tunnelDeviceProps{device: dev, linkName: res.TypedSpec().LinkName, mtu: res.TypedSpec().MTU}

logger.Info("wg over grpc tunnel device created", zap.String("link_name", res.TypedSpec().LinkName))

eg.Go(func() error {
logger.Debug("tunnel device running")
defer logger.Debug("tunnel device exited")

return td.Run()
return dev.Run()
})
}

Expand All @@ -124,16 +129,21 @@ func (ctrl *UserspaceWireguardController) Run(ctx context.Context, r controller.
return fmt.Errorf("failed to parse siderolink API endpoint: %w", err)
}

if tn := tunnelRelay.Load(); tn.Relay().IsClosed() || tn.DstHost() != ep.Host || tn.OurAddrPort() != res.TypedSpec().NodeAddress {
tn.Relay().Close()
dstHost := ep.Host
ourAddrPort := res.TypedSpec().NodeAddress

if tunnelRelay.relay.IsClosed() ||
tunnelRelay.dstHost != ep.Host ||
tunnelRelay.ourAddrPort != res.TypedSpec().NodeAddress {
// Reset timer because we are going to start tunnel anyway
safeReset(relayRetryTimer, 0)

dstHost := ep.Host
ourAddrPort := res.TypedSpec().NodeAddress
tunnelRelay.relay.Close()

logger.Info(
"updating tunnel relay",
zap.String("old_endpoint", tn.DstHost()),
zap.Stringer("old_node_address", tn.OurAddrPort()),
zap.String("old_endpoint", tunnelRelay.dstHost),
zap.Stringer("old_node_address", tunnelRelay.ourAddrPort),
zap.String("new_endpoint", dstHost),
zap.Stringer("new_node_address", ourAddrPort),
)
Expand All @@ -143,14 +153,15 @@ func (ctrl *UserspaceWireguardController) Run(ctx context.Context, r controller.
return fmt.Errorf("failed to create tunnel relay: %w", err)
}

tunnelRelay.Store(newTunnelProps(relay, dstHost, ourAddrPort))
// Store in outer scope because modifying the same variable will lead to the data race below
tunnelRelay = tunnelProps{relay: relay, dstHost: dstHost, ourAddrPort: ourAddrPort}

eg.Go(func() error {
logger.Debug("running tunnel relay")

err := relay.Run(ctx, ctrl.makeLogger(logger))
if err == nil {
logger.Info("tunnel relay exited gracefully",
logger.Debug("tunnel relay exited gracefully",
zap.String("endpoint", dstHost),
zap.Stringer("node_address", ourAddrPort),
)
Expand All @@ -161,18 +172,35 @@ func (ctrl *UserspaceWireguardController) Run(ctx context.Context, r controller.
// Relay returned an error, close the relay and print the error, device should be kept running.
relay.Close()

const retryIn = 5 * time.Second

logger.Error("tunnel relay failed, retrying",
zap.Duration("timeout", retryIn),
zap.String("endpoint", dstHost),
zap.Stringer("node_address", ourAddrPort),
zap.Error(err),
)

safeReset(relayRetryTimer, retryIn)

return nil
})
}
}
}

func safeReset(timer *time.Timer, in time.Duration) {
if !timer.Stop() {
<-timer.C
}

if in == 0 {
return
}

timer.Reset(in)
}

// makeLogger ensures that we do not spam like crazy into our ring buffer loggers unless we explicitly want to.
func (ctrl *UserspaceWireguardController) makeLogger(logger *zap.Logger) *zap.Logger {
if ctrl.DebugDataStream {
Expand All @@ -182,36 +210,14 @@ func (ctrl *UserspaceWireguardController) makeLogger(logger *zap.Logger) *zap.Lo
return logger.WithOptions(zap.IncreaseLevel(zap.InfoLevel))
}

func newTunnelProps(relay *wggrpc.Relay, dstHost string, ourAddrPort netip.AddrPort) *tunnelProps {
return &tunnelProps{relay: relay, dstHost: dstHost, ourAddrPort: ourAddrPort}
}

type tunnelProps struct {
relay *wggrpc.Relay
dstHost string
ourAddrPort netip.AddrPort
}

func (t *tunnelProps) Relay() *wggrpc.Relay {
if t == nil {
return nil
}

return t.relay
}

func (t *tunnelProps) DstHost() string {
if t == nil {
return "<invalid_host>"
}

return t.dstHost
}

func (t *tunnelProps) OurAddrPort() netip.AddrPort {
if t == nil {
return netip.AddrPort{}
}

return t.ourAddrPort
type tunnelDeviceProps struct {
device *wgtunnel.TunnelDevice
linkName string
mtu int
}

0 comments on commit 71d90ba

Please sign in to comment.