Skip to content

Commit

Permalink
feat: implement Kubernetes cluster discovery registry
Browse files Browse the repository at this point in the history
This implements pushing to and pulling from Kubernetes cluster discovery
registry which is simply using extra Talos annotations on the Node
resources.

Note: cluster discovery is still disabled by default.

This means that each Talos node is going to push data from its own local
`Affiliate` structure to the `Node` resource, and also watches the other
`Node`s to scrape data to build `Affiliate`s from each other cluster
member.

Further down the pipeline, `Affiliate` is converted to a cluster
`Member` which is an easy way to see the cluster membership.

In its current form, `talosctl get members` is mostly equivalent to
`kubectl get nodes`, but as we add more registries, it will become more
powerful.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
  • Loading branch information
smira committed Sep 3, 2021
1 parent 2c66e1b commit af66221
Show file tree
Hide file tree
Showing 22 changed files with 1,536 additions and 5 deletions.
110 changes: 110 additions & 0 deletions internal/app/machined/pkg/controllers/cluster/affiliate_merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package cluster

import (
"context"
"fmt"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"go.uber.org/zap"

"github.com/talos-systems/talos/pkg/resources/cluster"
)

// AffiliateMergeController merges raw Affiliates from the RawNamespaceName into final representation in the NamespaceName.
type AffiliateMergeController struct{}

// Name implements controller.Controller interface.
func (ctrl *AffiliateMergeController) Name() string {
return "cluster.AffiliateMergeController"
}

// Inputs implements controller.Controller interface.
func (ctrl *AffiliateMergeController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: cluster.RawNamespaceName,
Type: cluster.AffiliateType,
Kind: controller.InputWeak,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *AffiliateMergeController) Outputs() []controller.Output {
return []controller.Output{
{
Type: cluster.AffiliateType,
Kind: controller.OutputShared,
},
}
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *AffiliateMergeController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}

mergedAffiliates := make(map[resource.ID]*cluster.AffiliateSpec)

rawAffiliates, err := r.List(ctx, resource.NewMetadata(cluster.RawNamespaceName, cluster.AffiliateType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing affiliates")
}

for _, rawAffiliate := range rawAffiliates.Items {
affiliateSpec := rawAffiliate.(*cluster.Affiliate).TypedSpec()
id := affiliateSpec.NodeID

if affiliate, ok := mergedAffiliates[id]; ok {
affiliate.Merge(affiliateSpec)
} else {
mergedAffiliates[id] = affiliateSpec
}
}

touchedIDs := make(map[resource.ID]struct{})

for id, affiliateSpec := range mergedAffiliates {
affiliateSpec := affiliateSpec

if err = r.Modify(ctx, cluster.NewAffiliate(cluster.NamespaceName, id), func(res resource.Resource) error {
*res.(*cluster.Affiliate).TypedSpec() = *affiliateSpec

return nil
}); err != nil {
return err
}

touchedIDs[id] = struct{}{}
}

// list keys for cleanup
list, err := r.List(ctx, resource.NewMetadata(cluster.NamespaceName, cluster.AffiliateType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
}

for _, res := range list.Items {
if res.Metadata().Owner() != ctrl.Name() {
continue
}

if _, ok := touchedIDs[res.Metadata().ID()]; !ok {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up specs: %w", err)
}
}
}
}
}
133 changes: 133 additions & 0 deletions internal/app/machined/pkg/controllers/cluster/affiliate_merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package cluster_test

import (
"testing"
"time"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"
"inet.af/netaddr"

clusterctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/cluster"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/resources/cluster"
)

type AffiliateMergeSuite struct {
ClusterSuite
}

func (suite *AffiliateMergeSuite) TestReconcileDefault() {
suite.startRuntime()

suite.Require().NoError(suite.runtime.RegisterController(&clusterctrl.AffiliateMergeController{}))

affiliate1 := cluster.NewAffiliate(cluster.RawNamespaceName, "k8s/7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC")
*affiliate1.TypedSpec() = cluster.AffiliateSpec{
NodeID: "7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC",
Hostname: "foo.com",
Nodename: "bar",
MachineType: machine.TypeControlPlane,
Addresses: []netaddr.IP{netaddr.MustParseIP("192.168.3.4")},
KubeSpan: cluster.KubeSpanAffiliateSpec{
PublicKey: "PLPNBddmTgHJhtw0vxltq1ZBdPP9RNOEUd5JjJZzBRY=",
Address: netaddr.MustParseIP("fd50:8d60:4238:6302:f857:23ff:fe21:d1e0"),
AdditionalAddresses: []netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.3.1/24")},
Endpoints: []netaddr.IPPort{netaddr.MustParseIPPort("10.0.0.2:51820"), netaddr.MustParseIPPort("192.168.3.4:51820")},
},
}

affiliate2 := cluster.NewAffiliate(cluster.RawNamespaceName, "service/7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC")
*affiliate2.TypedSpec() = cluster.AffiliateSpec{
NodeID: "7x1SuC8Ege5BGXdAfTEff5iQnlWZLfv9h1LGMxA2pYkC",
Hostname: "foo.com",
Nodename: "bar",
MachineType: machine.TypeControlPlane,
Addresses: []netaddr.IP{netaddr.MustParseIP("192.168.3.4"), netaddr.MustParseIP("10.5.0.2")},
}

affiliate3 := cluster.NewAffiliate(cluster.RawNamespaceName, "service/9dwHNUViZlPlIervqX9Qo256RUhrfhgO0xBBnKcKl4F")
*affiliate3.TypedSpec() = cluster.AffiliateSpec{
NodeID: "9dwHNUViZlPlIervqX9Qo256RUhrfhgO0xBBnKcKl4F",
Hostname: "worker-1",
Nodename: "worker-1",
MachineType: machine.TypeWorker,
Addresses: []netaddr.IP{netaddr.MustParseIP("192.168.3.5")},
}

for _, r := range []resource.Resource{affiliate1, affiliate2, affiliate3} {
suite.Require().NoError(suite.state.Create(suite.ctx, r))
}

// there should be two merged affiliates: one from affiliate1+affiliate2, and another from affiliate3
suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(*cluster.NewAffiliate(cluster.NamespaceName, affiliate1.TypedSpec().NodeID).Metadata(), func(r resource.Resource) error {
spec := r.(*cluster.Affiliate).TypedSpec()

suite.Assert().Equal(affiliate1.TypedSpec().NodeID, spec.NodeID)
suite.Assert().Equal([]netaddr.IP{netaddr.MustParseIP("192.168.3.4"), netaddr.MustParseIP("10.5.0.2")}, spec.Addresses)
suite.Assert().Equal("foo.com", spec.Hostname)
suite.Assert().Equal("bar", spec.Nodename)
suite.Assert().Equal(machine.TypeControlPlane, spec.MachineType)
suite.Assert().Equal(netaddr.MustParseIP("fd50:8d60:4238:6302:f857:23ff:fe21:d1e0"), spec.KubeSpan.Address)
suite.Assert().Equal("PLPNBddmTgHJhtw0vxltq1ZBdPP9RNOEUd5JjJZzBRY=", spec.KubeSpan.PublicKey)
suite.Assert().Equal([]netaddr.IPPrefix{netaddr.MustParseIPPrefix("10.244.3.1/24")}, spec.KubeSpan.AdditionalAddresses)
suite.Assert().Equal([]netaddr.IPPort{netaddr.MustParseIPPort("10.0.0.2:51820"), netaddr.MustParseIPPort("192.168.3.4:51820")}, spec.KubeSpan.Endpoints)

return nil
}),
))

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(*cluster.NewAffiliate(cluster.NamespaceName, affiliate3.TypedSpec().NodeID).Metadata(), func(r resource.Resource) error {
spec := r.(*cluster.Affiliate).TypedSpec()

suite.Assert().Equal(affiliate3.TypedSpec().NodeID, spec.NodeID)
suite.Assert().Equal([]netaddr.IP{netaddr.MustParseIP("192.168.3.5")}, spec.Addresses)
suite.Assert().Equal("worker-1", spec.Hostname)
suite.Assert().Equal("worker-1", spec.Nodename)
suite.Assert().Equal(machine.TypeWorker, spec.MachineType)
suite.Assert().Zero(spec.KubeSpan.PublicKey)

return nil
}),
))

// remove affiliate2, KubeSpan information should eventually go away
suite.Require().NoError(suite.state.Destroy(suite.ctx, affiliate1.Metadata()))

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(*cluster.NewAffiliate(cluster.NamespaceName, affiliate1.TypedSpec().NodeID).Metadata(), func(r resource.Resource) error {
spec := r.(*cluster.Affiliate).TypedSpec()

suite.Assert().Equal(affiliate1.TypedSpec().NodeID, spec.NodeID)

if spec.KubeSpan.PublicKey != "" {
return retry.ExpectedErrorf("not reconciled yet")
}

suite.Assert().Zero(spec.KubeSpan.Address)
suite.Assert().Zero(spec.KubeSpan.PublicKey)
suite.Assert().Zero(spec.KubeSpan.AdditionalAddresses)
suite.Assert().Zero(spec.KubeSpan.Endpoints)

return nil
}),
))

// remove affiliate3, merged affiliate should be removed
suite.Require().NoError(suite.state.Destroy(suite.ctx, affiliate3.Metadata()))

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertNoResource(*cluster.NewAffiliate(cluster.NamespaceName, affiliate3.TypedSpec().NodeID).Metadata()),
))
}

func TestAffiliateMergeSuite(t *testing.T) {
suite.Run(t, new(AffiliateMergeSuite))
}
4 changes: 4 additions & 0 deletions internal/app/machined/pkg/controllers/cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (ctrl *ConfigController) Run(ctx context.Context, r controller.Runtime, log
if err = r.Modify(ctx, cluster.NewConfig(config.NamespaceName, cluster.ConfigID), func(res resource.Resource) error {
res.(*cluster.Config).TypedSpec().DiscoveryEnabled = c.Cluster().Discovery().Enabled()

if c.Cluster().Discovery().Enabled() {
res.(*cluster.Config).TypedSpec().RegistryKubernetesEnabled = c.Cluster().Discovery().Registries().Kubernetes().Enabled()
}

return nil
}); err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions internal/app/machined/pkg/controllers/cluster/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (suite *ConfigSuite) TestReconcileConfig() {
spec := res.(*cluster.Config).TypedSpec()

suite.Assert().True(spec.DiscoveryEnabled)
suite.Assert().True(spec.RegistryKubernetesEnabled)

return nil
},
Expand Down Expand Up @@ -75,6 +76,7 @@ func (suite *ConfigSuite) TestReconcileDisabled() {
spec := res.(*cluster.Config).TypedSpec()

suite.Assert().False(spec.DiscoveryEnabled)
suite.Assert().False(spec.RegistryKubernetesEnabled)

return nil
},
Expand Down

0 comments on commit af66221

Please sign in to comment.