Skip to content

Commit

Permalink
fix: multiple fixes for KubeSpan and Wireguard implementation
Browse files Browse the repository at this point in the history
* calculate covering IPPrefixes for the KubeSpan peer `AllowedIPs`,
check for overlap
* don't use KubeSpan IP as potential node endpoint (inception!)
* allow Wireguard config to be applied which doesn't change peer
endpoint
* support for pre-shared Wireguard peer keys

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
Signed-off-by: Seán C McCord <ulexus@gmail.com>
Co-authored-by: Seán C McCord <ulexus@gmail.com>
  • Loading branch information
smira and Ulexus committed Sep 10, 2021
1 parent b1bd642 commit 5ca1fb8
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ func (ctrl *KubernetesPullController) Run(ctx context.Context, r controller.Runt
continue
}

logger.Debug("waiting for kubelet client config", zap.String("file", constants.KubeletKubeconfig))

if err = conditions.WaitForFileToExist(constants.KubeletKubeconfig).Wait(ctx); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,15 @@ func (ctrl *LocalAffiliateController) Run(ctx context.Context, r controller.Runt
spec.KubeSpan.AdditionalAddresses = append([]netaddr.IPPrefix(nil), ksAdditionalAddresses.(*network.NodeAddress).TypedSpec().Addresses...)

nodeIPs := addresses.(*network.NodeAddress).TypedSpec().IPs()
endpoints := make([]netaddr.IPPort, len(nodeIPs))
endpoints := make([]netaddr.IPPort, 0, len(nodeIPs))

for i := range nodeIPs {
endpoints[i] = netaddr.IPPortFrom(nodeIPs[i], constants.KubeSpanDefaultPort)
if nodeIPs[i] == spec.KubeSpan.Address {
// skip kubespan local address
continue
}

endpoints = append(endpoints, netaddr.IPPortFrom(nodeIPs[i], constants.KubeSpanDefaultPort))
}

spec.KubeSpan.Endpoints = endpoints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func (suite *LocalAffiliateSuite) TestGeneration() {
suite.Require().NoError(ksIdentity.TypedSpec().UpdateAddress("8XuV9TZHW08DOk3bVxQjH9ih_TBKjnh-j44tsCLSBzo=", mac))
suite.Require().NoError(suite.state.Create(suite.ctx, ksIdentity))

// add KS address to the list of node addresses, it should be ignored in the endpoints
oldVersion := nonK8sAddresses.Metadata().Version()
nonK8sAddresses.TypedSpec().Addresses = append(nonK8sAddresses.TypedSpec().Addresses, ksIdentity.TypedSpec().Address)
nonK8sAddresses.Metadata().BumpVersion()
suite.Require().NoError(suite.state.Update(suite.ctx, oldVersion, nonK8sAddresses))

onlyK8sAddresses := network.NewNodeAddress(network.NamespaceName, network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterOnlyK8s))
onlyK8sAddresses.TypedSpec().Addresses = []netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.1.0/24")}
suite.Require().NoError(suite.state.Create(suite.ctx, onlyK8sAddresses))
Expand All @@ -88,7 +94,11 @@ func (suite *LocalAffiliateSuite) TestGeneration() {
suite.assertResource(*cluster.NewAffiliate(cluster.NamespaceName, nodeIdentity.TypedSpec().NodeID).Metadata(), func(r resource.Resource) error {
spec := r.(*cluster.Affiliate).TypedSpec()

suite.Assert().Equal([]netaddr.IP{netaddr.MustParseIP("172.20.0.2"), netaddr.MustParseIP("10.5.0.1")}, spec.Addresses)
if len(spec.Addresses) < 3 {
return retry.ExpectedErrorf("not reconciled yet")
}

suite.Assert().Equal([]netaddr.IP{netaddr.MustParseIP("172.20.0.2"), netaddr.MustParseIP("10.5.0.1"), ksIdentity.TypedSpec().Address.IP()}, spec.Addresses)
suite.Assert().Equal("example1", spec.Hostname)
suite.Assert().Equal("example1.com", spec.Nodename)
suite.Assert().Equal(machine.TypeWorker, spec.MachineType)
Expand All @@ -111,7 +121,7 @@ func (suite *LocalAffiliateSuite) TestGeneration() {
))

// disable discovery, local affiliate should be removed
oldVersion := discoveryConfig.Metadata().Version()
oldVersion = discoveryConfig.Metadata().Version()
discoveryConfig.TypedSpec().DiscoveryEnabled = false
discoveryConfig.Metadata().BumpVersion()
suite.Require().NoError(suite.state.Update(suite.ctx, oldVersion, discoveryConfig))
Expand Down
38 changes: 34 additions & 4 deletions internal/app/machined/pkg/controllers/kubespan/peer_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func (ctrl *PeerSpecController) Run(ctx context.Context, r controller.Runtime, l
if cfg != nil && localIdentity != nil && cfg.(*kubespan.Config).TypedSpec().Enabled {
localAffiliateID := localIdentity.(*cluster.Identity).TypedSpec().NodeID

peerIPSets := make(map[string]*netaddr.IPSet, len(affiliates.Items))

affiliateLoop:
for _, affiliate := range affiliates.Items {
if affiliate.Metadata().ID() == localAffiliateID {
// skip local affiliate, it's not a peer
Expand All @@ -103,12 +106,39 @@ func (ctrl *PeerSpecController) Run(ctx context.Context, r controller.Runtime, l
continue
}

var builder netaddr.IPSetBuilder

for _, ipPrefix := range spec.KubeSpan.AdditionalAddresses {
builder.AddPrefix(ipPrefix)
}

builder.Add(spec.KubeSpan.Address)

var ipSet *netaddr.IPSet

ipSet, err = builder.IPSet()
if err != nil {
logger.Warn("failed building list of IP ranges for the peer", zap.String("ignored_peer", spec.KubeSpan.PublicKey), zap.String("label", spec.Nodename), zap.Error(err))

continue
}

for otherPublicKey, otherIPSet := range peerIPSets {
if otherIPSet.Overlaps(ipSet) {
logger.Warn("peer address overlap", zap.String("ignored_peer", spec.KubeSpan.PublicKey), zap.String("other_peer", otherPublicKey))

continue affiliateLoop
}
}

peerIPSets[spec.KubeSpan.PublicKey] = ipSet

if err = r.Modify(ctx, kubespan.NewPeerSpec(kubespan.NamespaceName, spec.KubeSpan.PublicKey), func(res resource.Resource) error {
*res.(*kubespan.PeerSpec).TypedSpec() = kubespan.PeerSpecSpec{
Address: spec.KubeSpan.Address,
AdditionalAddresses: append([]netaddr.IPPrefix(nil), spec.KubeSpan.AdditionalAddresses...),
Endpoints: append([]netaddr.IPPort(nil), spec.KubeSpan.Endpoints...),
Label: spec.Nodename,
Address: spec.KubeSpan.Address,
AllowedIPs: ipSet.Prefixes(),
Endpoints: append([]netaddr.IPPort(nil), spec.KubeSpan.Endpoints...),
Label: spec.Nodename,
}

return nil
Expand Down
74 changes: 71 additions & 3 deletions internal/app/machined/pkg/controllers/kubespan/peer_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package kubespan_test

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -58,7 +59,7 @@ func (suite *PeerSpecSuite) TestReconcile() {
KubeSpan: cluster.KubeSpanAffiliateSpec{
PublicKey: "PLPNBddmTgHJhtw0vxltq1ZBdPP9RNOEUd5JjJZzBRY=",
Address: netaddr.MustParseIP("fd50:8d60:4238:6302:f857:23ff:fe21:d1e0"),
AdditionalAddresses: []netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.3.1/24")},
AdditionalAddresses: []netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.3.1/24"), netaddr.MustParseIPPrefix("10.244.3.0/32")},
Endpoints: []netaddr.IPPort{netaddr.MustParseIPPort("10.0.0.2:51820"), netaddr.MustParseIPPort("192.168.3.4:51820")},
},
}
Expand Down Expand Up @@ -121,7 +122,7 @@ func (suite *PeerSpecSuite) TestReconcile() {
spec := res.(*kubespan.PeerSpec).TypedSpec()

suite.Assert().Equal("fd50:8d60:4238:6302:f857:23ff:fe21:d1e0", spec.Address.String())
suite.Assert().Equal([]netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.3.1/24")}, spec.AdditionalAddresses)
suite.Assert().Equal("[10.244.3.0/24 fd50:8d60:4238:6302:f857:23ff:fe21:d1e0/128]", fmt.Sprintf("%v", spec.AllowedIPs))
suite.Assert().Equal([]netaddr.IPPort{netaddr.MustParseIPPort("10.0.0.2:51820"), netaddr.MustParseIPPort("192.168.3.4:51820")}, spec.Endpoints)
suite.Assert().Equal("bar", spec.Label)

Expand All @@ -137,7 +138,7 @@ func (suite *PeerSpecSuite) TestReconcile() {
spec := res.(*kubespan.PeerSpec).TypedSpec()

suite.Assert().Equal("fdc8:8aee:4e2d:1202:f073:9cff:fe6c:4d67", spec.Address.String())
suite.Assert().Equal([]netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.4.1/24")}, spec.AdditionalAddresses)
suite.Assert().Equal("[10.244.4.0/24 fdc8:8aee:4e2d:1202:f073:9cff:fe6c:4d67/128]", fmt.Sprintf("%v", spec.AllowedIPs))
suite.Assert().Equal([]netaddr.IPPort{netaddr.MustParseIPPort("192.168.3.6:51820")}, spec.Endpoints)
suite.Assert().Equal("worker-2", spec.Label)

Expand Down Expand Up @@ -165,6 +166,73 @@ func (suite *PeerSpecSuite) TestReconcile() {
))
}

func (suite *PeerSpecSuite) TestIPOverlap() {
suite.statePath = suite.T().TempDir()

suite.Require().NoError(suite.runtime.RegisterController(&kubespanctrl.PeerSpecController{}))

suite.startRuntime()

stateMount := runtimeres.NewMountStatus(v1alpha1.NamespaceName, constants.StatePartitionLabel)

suite.Assert().NoError(suite.state.Create(suite.ctx, stateMount))

cfg := kubespan.NewConfig(config.NamespaceName, kubespan.ConfigID)
cfg.TypedSpec().Enabled = true

suite.Require().NoError(suite.state.Create(suite.ctx, cfg))

nodeIdentity := cluster.NewIdentity(cluster.NamespaceName, cluster.LocalIdentity)
suite.Require().NoError(nodeIdentity.TypedSpec().Generate())
suite.Require().NoError(suite.state.Create(suite.ctx, nodeIdentity))

affiliate1 := cluster.NewAffiliate(cluster.NamespaceName, "7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC")
*affiliate1.TypedSpec() = cluster.AffiliateSpec{
NodeID: "7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC",
Nodename: "bar",
MachineType: machine.TypeControlPlane,
KubeSpan: cluster.KubeSpanAffiliateSpec{
PublicKey: "PLPNBddmTgHJhtw0vxltq1ZBdPP9RNOEUd5JjJZzBRY=",
Address: netaddr.MustParseIP("fd50:8d60:4238:6302:f857:23ff:fe21:d1e0"),
AdditionalAddresses: []netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.3.1/24"), netaddr.MustParseIPPrefix("10.244.3.0/32")},
Endpoints: []netaddr.IPPort{netaddr.MustParseIPPort("10.0.0.2:51820"), netaddr.MustParseIPPort("192.168.3.4:51820")},
},
}

affiliate2 := cluster.NewAffiliate(cluster.NamespaceName, "9dwHNUViZlPlIervqX9Qo256RUhrfhgO0xBBnKcKl4F")
*affiliate2.TypedSpec() = cluster.AffiliateSpec{
NodeID: "9dwHNUViZlPlIervqX9Qo256RUhrfhgO0xBBnKcKl4F",
Hostname: "worker-1",
Nodename: "worker-1",
MachineType: machine.TypeWorker,
KubeSpan: cluster.KubeSpanAffiliateSpec{
PublicKey: "Zr5ewpUm2Ywo1c+/59WFKIBjZ3c/nVbIWsT5elbjwCU=",
Address: netaddr.MustParseIP("fd50:8d60:4238:6302:f857:23ff:fe21:d1e0"),
AdditionalAddresses: []netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.3.128/28"), netaddr.MustParseIPPrefix("192.168.3.0/24")},
Endpoints: []netaddr.IPPort{netaddr.MustParseIPPort("10.0.0.2:51820"), netaddr.MustParseIPPort("192.168.3.4:51820")},
},
}

for _, r := range []resource.Resource{affiliate1, affiliate2} {
suite.Require().NoError(suite.state.Create(suite.ctx, r))
}

// affiliate2 shouldn't be rendered as a peer, as its AdditionalAddresses overlap with affiliate1 addresses
suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResourceIDs(resource.NewMetadata(kubespan.NamespaceName, kubespan.PeerSpecType, "", resource.VersionUndefined),
[]resource.ID{
affiliate1.TypedSpec().KubeSpan.PublicKey,
},
),
))

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertNoResource(
resource.NewMetadata(kubespan.NamespaceName, kubespan.PeerSpecType, affiliate2.TypedSpec().KubeSpan.PublicKey, resource.VersionUndefined),
),
))
}

func TestPeerSpecSuite(t *testing.T) {
suite.Run(t, new(PeerSpecSuite))
}
16 changes: 8 additions & 8 deletions pkg/resources/kubespan/peer_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ type PeerSpec struct {

// PeerSpecSpec describes PeerSpec state.
type PeerSpecSpec struct {
Address netaddr.IP `yaml:"address"`
AdditionalAddresses []netaddr.IPPrefix `yaml:"additionalAddresses"`
Endpoints []netaddr.IPPort `yaml:"endpoints"`
Label string `yaml:"label"`
Address netaddr.IP `yaml:"address"`
AllowedIPs []netaddr.IPPrefix `yaml:"allowedIPs"`
Endpoints []netaddr.IPPort `yaml:"endpoints"`
Label string `yaml:"label"`
}

// NewPeerSpec initializes a PeerSpec resource.
Expand Down Expand Up @@ -62,10 +62,10 @@ func (r *PeerSpec) DeepCopy() resource.Resource {
return &PeerSpec{
md: r.md,
spec: PeerSpecSpec{
Address: r.spec.Address,
AdditionalAddresses: append([]netaddr.IPPrefix(nil), r.spec.AdditionalAddresses...),
Endpoints: append([]netaddr.IPPort(nil), r.spec.Endpoints...),
Label: r.spec.Label,
Address: r.spec.Address,
AllowedIPs: append([]netaddr.IPPrefix(nil), r.spec.AllowedIPs...),
Endpoints: append([]netaddr.IPPort(nil), r.spec.Endpoints...),
Label: r.spec.Label,
},
}
}
Expand Down
37 changes: 35 additions & 2 deletions pkg/resources/network/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,18 +263,27 @@ type WireguardSpec struct {
// WireguardPeer describes a single peer.
type WireguardPeer struct {
PublicKey string `yaml:"publicKey"`
PresharedKey string `yaml:"presharedKey"`
Endpoint string `yaml:"endpoint"`
PersistentKeepaliveInterval time.Duration `yaml:"persistenKeepaliveInterval"`
PersistentKeepaliveInterval time.Duration `yaml:"persistentKeepaliveInterval"`
AllowedIPs []netaddr.IPPrefix `yaml:"allowedIPs"`
}

// Equal checks two WireguardPeer structs for equality.
//
// `spec` is considered to be the result of getting current Wireguard configuration,
// while `other` is the new (updated configuration).
func (peer *WireguardPeer) Equal(other *WireguardPeer) bool {
if peer.PublicKey != other.PublicKey {
return false
}

if peer.Endpoint != other.Endpoint {
if peer.PresharedKey != other.PresharedKey {
return false
}

// if the Endpoint is not set in `other`, don't consider this to be a change
if other.Endpoint != "" && peer.Endpoint != other.Endpoint {
return false
}

Expand Down Expand Up @@ -307,6 +316,9 @@ func (spec *WireguardSpec) IsZero() bool {
// Equal checks two WireguardSpecs for equality.
//
// Both specs should be sorted before calling this method.
//
// `spec` is considered to be the result of getting current Wireguard configuration,
// while `other` is the new (updated configuration).
func (spec *WireguardSpec) Equal(other *WireguardSpec) bool {
if spec.PrivateKey != other.PrivateKey {
return false
Expand Down Expand Up @@ -397,6 +409,19 @@ func (spec *WireguardSpec) Encode(existing *WireguardSpec) (*wgtypes.Config, err
return err
}

var presharedKey *wgtypes.Key

if peer.PresharedKey != "" {
var parsedKey wgtypes.Key

parsedKey, err = wgtypes.ParseKey(peer.PresharedKey)
if err != nil {
return err
}

presharedKey = &parsedKey
}

var endpoint *net.UDPAddr

if peer.Endpoint != "" {
Expand All @@ -415,7 +440,9 @@ func (spec *WireguardSpec) Encode(existing *WireguardSpec) (*wgtypes.Config, err
cfg.Peers = append(cfg.Peers, wgtypes.PeerConfig{
PublicKey: pubKey,
Endpoint: endpoint,
PresharedKey: presharedKey,
PersistentKeepaliveInterval: &peer.PersistentKeepaliveInterval,
ReplaceAllowedIPs: true,
AllowedIPs: allowedIPs,
})

Expand Down Expand Up @@ -494,6 +521,12 @@ func (spec *WireguardSpec) Decode(dev *wgtypes.Device) {
spec.Peers[i].Endpoint = dev.Peers[i].Endpoint.String()
}

var zeroKey wgtypes.Key

if dev.Peers[i].PresharedKey != zeroKey {
spec.Peers[i].PresharedKey = dev.Peers[i].PresharedKey.String()
}

spec.Peers[i].PersistentKeepaliveInterval = dev.Peers[i].PersistentKeepaliveInterval
spec.Peers[i].AllowedIPs = make([]netaddr.IPPrefix, len(dev.Peers[i].AllowedIPs))

Expand Down
1 change: 1 addition & 0 deletions pkg/resources/network/link_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (r *LinkSpec) ResourceDefinition() meta.ResourceDefinitionSpec {
Aliases: []resource.Type{},
DefaultNamespace: NamespaceName,
PrintColumns: []meta.PrintColumn{},
Sensitivity: meta.Sensitive,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/resources/network/link_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (r *LinkStatus) ResourceDefinition() meta.ResourceDefinitionSpec {
JSONPath: `{.linkState}`,
},
},
Sensitivity: meta.Sensitive,
}
}

Expand Down

0 comments on commit 5ca1fb8

Please sign in to comment.