Skip to content

Commit f7cadbc

Browse files
committed
fix: handle duplicate peer updates
Don't send peer updates to Wireguard when it's not required. Make all logging go via zap, convert Wireguard internal logger to use zap. Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
1 parent 0755b24 commit f7cadbc

File tree

5 files changed

+99
-22
lines changed

5 files changed

+99
-22
lines changed

cmd/siderolink-agent/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"os"
1313
"os/signal"
1414

15+
"go.uber.org/zap"
1516
"golang.org/x/sync/errgroup"
1617
"google.golang.org/grpc"
1718
)
@@ -31,9 +32,14 @@ func main() {
3132
}
3233

3334
func run(ctx context.Context) error {
35+
logger, err := zap.NewDevelopment()
36+
if err != nil {
37+
return fmt.Errorf("error creating logger")
38+
}
39+
3440
eg, ctx := errgroup.WithContext(ctx)
3541

36-
if err := sideroLink(ctx, eg); err != nil {
42+
if err := sideroLink(ctx, eg, logger); err != nil {
3743
return fmt.Errorf("SideroLink: %w", err)
3844
}
3945

cmd/siderolink-agent/siderolink.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"net"
1111

12+
"go.uber.org/zap"
1213
"golang.org/x/sync/errgroup"
1314
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
1415
"google.golang.org/grpc"
@@ -24,7 +25,7 @@ var sideroLinkFlags struct {
2425
apiEndpoint string
2526
}
2627

27-
func sideroLink(ctx context.Context, eg *errgroup.Group) error {
28+
func sideroLink(ctx context.Context, eg *errgroup.Group, logger *zap.Logger) error {
2829
lis, err := net.Listen("tcp", sideroLinkFlags.apiEndpoint)
2930
if err != nil {
3031
return fmt.Errorf("error listening for gRPC API: %w", err)
@@ -61,7 +62,7 @@ func sideroLink(ctx context.Context, eg *errgroup.Group) error {
6162
pb.RegisterProvisionServiceServer(s, srv)
6263

6364
eg.Go(func() error {
64-
return wgDevice.Run(ctx, srv)
65+
return wgDevice.Run(ctx, logger, srv)
6566
})
6667

6768
eg.Go(func() error {

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786
77
github.com/stretchr/testify v1.7.0
88
github.com/talos-systems/talos/pkg/machinery v0.14.0-alpha.1.0.20211118180932-1ffa8e048008
9+
go.uber.org/zap v1.18.1
910
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
1011
golang.zx2c4.com/wireguard v0.0.0-20211109020618-685490f568cf
1112
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20211109202428-0073765f69ba
@@ -31,6 +32,8 @@ require (
3132
github.com/mdlayher/socket v0.0.0-20211102153432-57e3fa563ecb // indirect
3233
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
3334
github.com/pmezard/go-difflib v1.0.0 // indirect
35+
go.uber.org/atomic v1.7.0 // indirect
36+
go.uber.org/multierr v1.7.0 // indirect
3437
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
3538
go4.org/unsafe/assume-no-moving-gc v0.0.0-20211027215541-db492cf91b37 // indirect
3639
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa // indirect

go.sum

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd
1313
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
1414
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
1515
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
16+
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
1617
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
1718
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
1819
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@@ -215,12 +216,16 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
215216
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
216217
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
217218
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
219+
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
218220
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
219221
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
222+
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
220223
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
221224
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
222225
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
226+
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
223227
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
228+
go.uber.org/zap v1.18.1 h1:CSUJ2mjFszzEWt4CdKISEuChVIXGBn3lAPwkRGyVrc4=
224229
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
225230
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 h1:UXLjNohABv4S58tHmeuIZDO6e3mHpW2Dx33gaNt03LE=
226231
go4.org/intern v0.0.0-20211027215823-ae77deb06f29/go.mod h1:cS2ma+47FKrLPdXFpr7CuxiTW3eyJbWew4qx0qtQWDA=
@@ -243,6 +248,7 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx
243248
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
244249
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
245250
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
251+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
246252
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
247253
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
248254
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@@ -455,6 +461,7 @@ gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
455461
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
456462
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
457463
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
464+
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
458465
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
459466
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
460467
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

pkg/wireguard/wireguard.go

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"os"
1313

1414
"github.com/jsimonetti/rtnetlink/rtnl"
15+
"go.uber.org/zap"
1516
"golang.zx2c4.com/wireguard/conn"
1617
"golang.zx2c4.com/wireguard/device"
1718
"golang.zx2c4.com/wireguard/ipc"
@@ -56,7 +57,7 @@ func NewDevice(address netaddr.IPPrefix, privateKey wgtypes.Key, listenPort uint
5657
}
5758

5859
// Run the device.
59-
func (dev *Device) Run(ctx context.Context, peers PeerSource) error {
60+
func (dev *Device) Run(ctx context.Context, logger *zap.Logger, peers PeerSource) error {
6061
client, err := wgctrl.New()
6162
if err != nil {
6263
return fmt.Errorf("error initializing Wireguard client: %w", err)
@@ -71,10 +72,10 @@ func (dev *Device) Run(ctx context.Context, peers PeerSource) error {
7172

7273
defer rtnlClient.Close() //nolint:errcheck
7374

74-
logger := device.NewLogger(
75-
device.LogLevelVerbose,
76-
fmt.Sprintf("(%s) ", interfaceName),
77-
)
75+
wgLogger := &device.Logger{
76+
Verbosef: logger.Sugar().Debugf,
77+
Errorf: logger.Sugar().Errorf,
78+
}
7879

7980
uapi, err := ipc.UAPIListen(interfaceName, dev.fileUAPI)
8081
if err != nil {
@@ -83,7 +84,7 @@ func (dev *Device) Run(ctx context.Context, peers PeerSource) error {
8384

8485
defer uapi.Close() //nolint:errcheck
8586

86-
device := device.NewDevice(dev.tun, conn.NewDefaultBind(), logger)
87+
device := device.NewDevice(dev.tun, conn.NewDefaultBind(), wgLogger)
8788

8889
defer device.Close()
8990

@@ -124,6 +125,8 @@ func (dev *Device) Run(ctx context.Context, peers PeerSource) error {
124125
return fmt.Errorf("error bringing link up: %w", err)
125126
}
126127

128+
logger.Info("wireguard device set up", zap.String("interface", interfaceName), zap.Stringer("address", dev.address))
129+
127130
for {
128131
select {
129132
case <-ctx.Done():
@@ -133,24 +136,81 @@ func (dev *Device) Run(ctx context.Context, peers PeerSource) error {
133136
case <-device.Wait():
134137
return nil
135138
case peerEvent := <-peers.EventCh():
136-
cfg := wgtypes.Config{
137-
Peers: []wgtypes.PeerConfig{
138-
{
139-
PublicKey: peerEvent.PubKey,
140-
Remove: peerEvent.Remove,
141-
ReplaceAllowedIPs: true,
142-
AllowedIPs: []net.IPNet{
143-
*netaddr.IPPrefixFrom(peerEvent.Address, peerEvent.Address.BitLen()).IPNet(),
144-
},
145-
},
146-
},
139+
if err := dev.handlePeerEvent(client, logger, peerEvent); err != nil {
140+
return err
141+
}
142+
}
143+
}
144+
}
145+
146+
func (dev *Device) checkDuplicateUpdate(client *wgctrl.Client, logger *zap.Logger, peerEvent PeerEvent) (bool, error) {
147+
oldCfg, err := client.Device(interfaceName)
148+
if err != nil {
149+
return false, fmt.Errorf("error retrieving Wireguard configuration: %w", err)
150+
}
151+
152+
// check if this update can be skipped
153+
pubKey := peerEvent.PubKey.String()
154+
155+
for _, oldPeer := range oldCfg.Peers {
156+
if oldPeer.PublicKey.String() == pubKey {
157+
if len(oldPeer.AllowedIPs) != 1 {
158+
break
147159
}
148160

149-
if err = client.ConfigureDevice(interfaceName, cfg); err != nil {
150-
return fmt.Errorf("error configuring Wireguard peers: %w", err)
161+
if prefix, ok := netaddr.FromStdIPNet(&oldPeer.AllowedIPs[0]); ok {
162+
if prefix.IP() == peerEvent.Address {
163+
// skip the update
164+
logger.Info("skipping peer update", zap.String("public_key", pubKey))
165+
166+
return true, nil
167+
}
151168
}
169+
170+
break
152171
}
153172
}
173+
174+
return false, nil
175+
}
176+
177+
func (dev *Device) handlePeerEvent(client *wgctrl.Client, logger *zap.Logger, peerEvent PeerEvent) error {
178+
if !peerEvent.Remove {
179+
skipEvent, err := dev.checkDuplicateUpdate(client, logger, peerEvent)
180+
if err != nil {
181+
return err
182+
}
183+
184+
if skipEvent {
185+
return nil
186+
}
187+
}
188+
189+
cfg := wgtypes.Config{
190+
Peers: []wgtypes.PeerConfig{
191+
{
192+
PublicKey: peerEvent.PubKey,
193+
Remove: peerEvent.Remove,
194+
},
195+
},
196+
}
197+
198+
if !peerEvent.Remove {
199+
cfg.Peers[0].ReplaceAllowedIPs = true
200+
cfg.Peers[0].AllowedIPs = []net.IPNet{
201+
*netaddr.IPPrefixFrom(peerEvent.Address, peerEvent.Address.BitLen()).IPNet(),
202+
}
203+
204+
logger.Info("updating peer", zap.Stringer("public_key", peerEvent.PubKey), zap.Stringer("address", peerEvent.Address))
205+
} else {
206+
logger.Info("removing peer", zap.Stringer("public_key", peerEvent.PubKey))
207+
}
208+
209+
if err := client.ConfigureDevice(interfaceName, cfg); err != nil {
210+
return fmt.Errorf("error configuring Wireguard peers: %w", err)
211+
}
212+
213+
return nil
154214
}
155215

156216
// Close the device.

0 commit comments

Comments
 (0)