Skip to content

Commit

Permalink
feat: implement KubeSpan peer generation controller
Browse files Browse the repository at this point in the history
Controller watches cluster Affiliates and generates KubeSpanPeerSpecs
from those which are not local node Affiliates and have KubeSpan
configuration attached.

Example:

```
$ talosctl -n 172.20.0.2 get kubespanpeerspecs
NODE         NAMESPACE   TYPE               ID                                             VERSION   LABEL                    ENDPOINTS
172.20.0.2   kubespan    KubeSpanPeerSpec   27E8I+ekrqT21cq2iW6+fDe+H7WBw6q9J7vqLCeswiM=   1         talos-default-worker-1   ["172.20.0.3:51820"]
```

```
$ talosctl -n 172.20.0.3 get kubespanpeerspecs -o yaml
node: 172.20.0.3
metadata:
    namespace: kubespan
    type: KubeSpanPeerSpecs.kubespan.talos.dev
    id: mB6WlFOR66Jx5rtPMIpxJ3s4XHyer9NCzqWPP7idGRo=
    version: 1
    owner: kubespan.PeerSpecController
    phase: running
    created: 2021-09-07T19:26:35Z
    updated: 2021-09-07T19:26:35Z
spec:
    address: fdc8:8aee:4e2d:1202:f073:9cff:fe6c:4d67
    additionalAddresses:
        - 10.244.1.0/32
    endpoints:
        - 172.20.0.2:51820
    label: talos-default-master-1
```

KubeSpan peers will be used to drive configuration of the KubeSpan
networking components.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
  • Loading branch information
smira committed Sep 8, 2021
1 parent 14c69df commit ff90b57
Show file tree
Hide file tree
Showing 7 changed files with 450 additions and 0 deletions.
40 changes: 40 additions & 0 deletions internal/app/machined/pkg/controllers/kubespan/kubespan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package kubespan_test
import (
"context"
"log"
"reflect"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -58,6 +60,44 @@ func (suite *KubeSpanSuite) startRuntime() {
}()
}

func (suite *KubeSpanSuite) assertResourceIDs(md resource.Metadata, expectedIDs []resource.ID) func() error {
return func() error {
l, err := suite.state.List(suite.ctx, md)
if err != nil {
return err
}

actualIDs := make([]resource.ID, 0, len(l.Items))

for _, r := range l.Items {
actualIDs = append(actualIDs, r.Metadata().ID())
}

sort.Strings(expectedIDs)

if !reflect.DeepEqual(actualIDs, expectedIDs) {
return retry.ExpectedErrorf("ids do no match expected %v != actual %v", expectedIDs, actualIDs)
}

return nil
}
}

func (suite *KubeSpanSuite) assertNoResource(md resource.Metadata) func() error {
return func() error {
_, err := suite.state.Get(suite.ctx, md)
if err == nil {
return retry.ExpectedErrorf("resource %s still exists", md)
}

if state.IsNotFoundError(err) {
return nil
}

return err
}
}

func (suite *KubeSpanSuite) assertResource(md resource.Metadata, check func(res resource.Resource) error) func() error {
return func() error {
r, err := suite.state.Get(suite.ctx, md)
Expand Down
142 changes: 142 additions & 0 deletions internal/app/machined/pkg/controllers/kubespan/peer_spec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package kubespan

import (
"context"
"fmt"

"github.com/AlekSi/pointer"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"
"inet.af/netaddr"

"github.com/talos-systems/talos/pkg/resources/cluster"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/kubespan"
)

// PeerSpecController watches cluster.Affiliates updates PeerSpec.
type PeerSpecController struct{}

// Name implements controller.Controller interface.
func (ctrl *PeerSpecController) Name() string {
return "kubespan.PeerSpecController"
}

// Inputs implements controller.Controller interface.
func (ctrl *PeerSpecController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: kubespan.ConfigType,
ID: pointer.ToString(kubespan.ConfigID),
Kind: controller.InputWeak,
},
{
Namespace: cluster.NamespaceName,
Type: cluster.AffiliateType,
Kind: controller.InputWeak,
},
{
Namespace: cluster.NamespaceName,
Type: cluster.IdentityType,
ID: pointer.ToString(cluster.LocalIdentity),
Kind: controller.InputWeak,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *PeerSpecController) Outputs() []controller.Output {
return []controller.Output{
{
Type: kubespan.PeerSpecType,
Kind: controller.OutputExclusive,
},
}
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo,cyclop
func (ctrl *PeerSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
cfg, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, kubespan.ConfigType, kubespan.ConfigID, resource.VersionUndefined))
if err != nil && !state.IsNotFoundError(err) {
return fmt.Errorf("error getting kubespan configuration: %w", err)
}

localIdentity, err := r.Get(ctx, resource.NewMetadata(cluster.NamespaceName, cluster.IdentityType, cluster.LocalIdentity, resource.VersionUndefined))
if err != nil && !state.IsNotFoundError(err) {
return fmt.Errorf("error getting first MAC address: %w", err)
}

affiliates, err := r.List(ctx, resource.NewMetadata(cluster.NamespaceName, cluster.AffiliateType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing cluster affiliates: %w", err)
}

touchedIDs := make(map[resource.ID]struct{})

if cfg != nil && localIdentity != nil && cfg.(*kubespan.Config).TypedSpec().Enabled {
localAffiliateID := localIdentity.(*cluster.Identity).TypedSpec().NodeID

for _, affiliate := range affiliates.Items {
if affiliate.Metadata().ID() == localAffiliateID {
// skip local affiliate, it's not a peer
continue
}

spec := affiliate.(*cluster.Affiliate).TypedSpec()

if spec.KubeSpan.PublicKey == "" {
// no kubespan information, skip it
continue
}

if err = r.Modify(ctx, kubespan.NewPeerSpec(kubespan.NamespaceName, spec.KubeSpan.PublicKey), func(res resource.Resource) error {
*res.(*kubespan.PeerSpec).TypedSpec() = kubespan.PeerSpecSpec{
Address: spec.KubeSpan.Address,
AdditionalAddresses: append([]netaddr.IPPrefix(nil), spec.KubeSpan.AdditionalAddresses...),
Endpoints: append([]netaddr.IPPort(nil), spec.KubeSpan.Endpoints...),
Label: spec.Nodename,
}

return nil
}); err != nil {
return err
}

touchedIDs[spec.KubeSpan.PublicKey] = struct{}{}
}
}

// list keys for cleanup
list, err := r.List(ctx, resource.NewMetadata(kubespan.NamespaceName, kubespan.PeerSpecType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
}

for _, res := range list.Items {
if res.Metadata().Owner() != ctrl.Name() {
continue
}

if _, ok := touchedIDs[res.Metadata().ID()]; !ok {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up specs: %w", err)
}
}
}
}
}
}
170 changes: 170 additions & 0 deletions internal/app/machined/pkg/controllers/kubespan/peer_spec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package kubespan_test

import (
"testing"
"time"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"
"inet.af/netaddr"

kubespanctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/kubespan"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/cluster"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/kubespan"
runtimeres "github.com/talos-systems/talos/pkg/resources/runtime"
"github.com/talos-systems/talos/pkg/resources/v1alpha1"
)

type PeerSpecSuite struct {
KubeSpanSuite

statePath string
}

func (suite *PeerSpecSuite) TestReconcile() {
suite.statePath = suite.T().TempDir()

suite.Require().NoError(suite.runtime.RegisterController(&kubespanctrl.PeerSpecController{}))

suite.startRuntime()

stateMount := runtimeres.NewMountStatus(v1alpha1.NamespaceName, constants.StatePartitionLabel)

suite.Assert().NoError(suite.state.Create(suite.ctx, stateMount))

cfg := kubespan.NewConfig(config.NamespaceName, kubespan.ConfigID)
cfg.TypedSpec().Enabled = true

suite.Require().NoError(suite.state.Create(suite.ctx, cfg))

nodeIdentity := cluster.NewIdentity(cluster.NamespaceName, cluster.LocalIdentity)
suite.Require().NoError(nodeIdentity.TypedSpec().Generate())
suite.Require().NoError(suite.state.Create(suite.ctx, nodeIdentity))

affiliate1 := cluster.NewAffiliate(cluster.NamespaceName, "7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC")
*affiliate1.TypedSpec() = cluster.AffiliateSpec{
NodeID: "7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC",
Hostname: "foo.com",
Nodename: "bar",
MachineType: machine.TypeControlPlane,
Addresses: []netaddr.IP{netaddr.MustParseIP("192.168.3.4")},
KubeSpan: cluster.KubeSpanAffiliateSpec{
PublicKey: "PLPNBddmTgHJhtw0vxltq1ZBdPP9RNOEUd5JjJZzBRY=",
Address: netaddr.MustParseIP("fd50:8d60:4238:6302:f857:23ff:fe21:d1e0"),
AdditionalAddresses: []netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.3.1/24")},
Endpoints: []netaddr.IPPort{netaddr.MustParseIPPort("10.0.0.2:51820"), netaddr.MustParseIPPort("192.168.3.4:51820")},
},
}

affiliate2 := cluster.NewAffiliate(cluster.NamespaceName, "9dwHNUViZlPlIervqX9Qo256RUhrfhgO0xBBnKcKl4F")
*affiliate2.TypedSpec() = cluster.AffiliateSpec{
NodeID: "9dwHNUViZlPlIervqX9Qo256RUhrfhgO0xBBnKcKl4F",
Hostname: "worker-1",
Nodename: "worker-1",
MachineType: machine.TypeWorker,
Addresses: []netaddr.IP{netaddr.MustParseIP("192.168.3.5")},
}

affiliate3 := cluster.NewAffiliate(cluster.NamespaceName, "xCnFFfxylOf9i5ynhAkt6ZbfcqaLDGKfIa3gwpuaxe7F")
*affiliate3.TypedSpec() = cluster.AffiliateSpec{
NodeID: "xCnFFfxylOf9i5ynhAkt6ZbfcqaLDGKfIa3gwpuaxe7F",
MachineType: machine.TypeWorker,
Nodename: "worker-2",
Addresses: []netaddr.IP{netaddr.MustParseIP("192.168.3.6")},
KubeSpan: cluster.KubeSpanAffiliateSpec{
PublicKey: "mB6WlFOR66Jx5rtPMIpxJ3s4XHyer9NCzqWPP7idGRo",
Address: netaddr.MustParseIP("fdc8:8aee:4e2d:1202:f073:9cff:fe6c:4d67"),
AdditionalAddresses: []netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.4.1/24")},
Endpoints: []netaddr.IPPort{netaddr.MustParseIPPort("192.168.3.6:51820")},
},
}

// local node affiliate, should be skipped as a peer
affiliate4 := cluster.NewAffiliate(cluster.NamespaceName, nodeIdentity.TypedSpec().NodeID)
*affiliate4.TypedSpec() = cluster.AffiliateSpec{
NodeID: nodeIdentity.TypedSpec().NodeID,
MachineType: machine.TypeWorker,
Addresses: []netaddr.IP{netaddr.MustParseIP("192.168.3.7")},
KubeSpan: cluster.KubeSpanAffiliateSpec{
PublicKey: "27E8I+ekrqT21cq2iW6+fDe+H7WBw6q9J7vqLCeswiM=",
Address: netaddr.MustParseIP("fdc8:8aee:4e2d:1202:f073:9cff:fe6c:4d67"),
AdditionalAddresses: []netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.5.1/24")},
Endpoints: []netaddr.IPPort{netaddr.MustParseIPPort("192.168.3.7:51820")},
},
}

for _, r := range []resource.Resource{affiliate1, affiliate2, affiliate3, affiliate4} {
suite.Require().NoError(suite.state.Create(suite.ctx, r))
}

// affiliate2 shouldn't be rendered as a peer, as it doesn't have kubespan data
suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResourceIDs(resource.NewMetadata(kubespan.NamespaceName, kubespan.PeerSpecType, "", resource.VersionUndefined),
[]resource.ID{
affiliate1.TypedSpec().KubeSpan.PublicKey,
affiliate3.TypedSpec().KubeSpan.PublicKey,
},
),
))

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(
resource.NewMetadata(kubespan.NamespaceName, kubespan.PeerSpecType, affiliate1.TypedSpec().KubeSpan.PublicKey, resource.VersionUndefined),
func(res resource.Resource) error {
spec := res.(*kubespan.PeerSpec).TypedSpec()

suite.Assert().Equal("fd50:8d60:4238:6302:f857:23ff:fe21:d1e0", spec.Address.String())
suite.Assert().Equal([]netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.3.1/24")}, spec.AdditionalAddresses)
suite.Assert().Equal([]netaddr.IPPort{netaddr.MustParseIPPort("10.0.0.2:51820"), netaddr.MustParseIPPort("192.168.3.4:51820")}, spec.Endpoints)
suite.Assert().Equal("bar", spec.Label)

return nil
},
),
))

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(
resource.NewMetadata(kubespan.NamespaceName, kubespan.PeerSpecType, affiliate3.TypedSpec().KubeSpan.PublicKey, resource.VersionUndefined),
func(res resource.Resource) error {
spec := res.(*kubespan.PeerSpec).TypedSpec()

suite.Assert().Equal("fdc8:8aee:4e2d:1202:f073:9cff:fe6c:4d67", spec.Address.String())
suite.Assert().Equal([]netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.4.1/24")}, spec.AdditionalAddresses)
suite.Assert().Equal([]netaddr.IPPort{netaddr.MustParseIPPort("192.168.3.6:51820")}, spec.Endpoints)
suite.Assert().Equal("worker-2", spec.Label)

return nil
},
),
))

// disabling kubespan should remove all peers
oldVersion := cfg.Metadata().Version()
cfg.TypedSpec().Enabled = false
cfg.Metadata().BumpVersion()

suite.Require().NoError(suite.state.Update(suite.ctx, oldVersion, cfg))

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertNoResource(
resource.NewMetadata(kubespan.NamespaceName, kubespan.PeerSpecType, affiliate1.TypedSpec().KubeSpan.PublicKey, resource.VersionUndefined),
),
))
suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertNoResource(
resource.NewMetadata(kubespan.NamespaceName, kubespan.PeerSpecType, affiliate3.TypedSpec().KubeSpan.PublicKey, resource.VersionUndefined),
),
))
}

func TestPeerSpecSuite(t *testing.T) {
suite.Run(t, new(PeerSpecSuite))
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (ctrl *Controller) Run(ctx context.Context) error {
&k8s.RenderSecretsStaticPodController{},
&kubespan.ConfigController{},
&kubespan.IdentityController{},
&kubespan.PeerSpecController{},
&network.AddressConfigController{
Cmdline: procfs.ProcCmdline(),
V1Alpha1Mode: ctrl.v1alpha1Runtime.State().Platform().Mode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func NewState() (*State, error) {
&k8s.SecretsStatus{},
&kubespan.Config{},
&kubespan.Identity{},
&kubespan.PeerSpec{},
&network.AddressStatus{},
&network.AddressSpec{},
&network.HardwareAddr{},
Expand Down
1 change: 1 addition & 0 deletions pkg/resources/kubespan/kubespan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestRegisterResource(t *testing.T) {
for _, resource := range []resource.Resource{
&kubespan.Config{},
&kubespan.Identity{},
&kubespan.PeerSpec{},
} {
assert.NoError(t, resourceRegistry.Register(ctx, resource))
}
Expand Down

0 comments on commit ff90b57

Please sign in to comment.