Skip to content

Commit

Permalink
feat: provide building of local Affiliate structure (for the node)
Browse files Browse the repository at this point in the history
Fixes #4139

This builds the local (for the node) `Affiliate` structure which
describes node for the cluster discovery. Dependending on the
configuration, KubeSpan information might be included as well.

`NodeAddresses` were updated to hold CIDRs instead of simple IPs.

The `Affiliate` will be pushed to the registries, while `Affiliate`s for
other nodes will be fetched back from the registries.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
  • Loading branch information
smira committed Sep 3, 2021
1 parent d69bd2a commit 2c66e1b
Show file tree
Hide file tree
Showing 27 changed files with 931 additions and 128 deletions.
108 changes: 108 additions & 0 deletions internal/app/machined/pkg/controllers/cluster/cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package cluster_test

import (
"context"
"log"
"sync"
"time"

"github.com/cosi-project/runtime/pkg/controller/runtime"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"github.com/cosi-project/runtime/pkg/state/impl/inmem"
"github.com/cosi-project/runtime/pkg/state/impl/namespaced"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"

"github.com/talos-systems/talos/pkg/logging"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
"github.com/talos-systems/talos/pkg/resources/config"
)

type ClusterSuite struct {
suite.Suite

state state.State

runtime *runtime.Runtime
wg sync.WaitGroup

ctx context.Context
ctxCancel context.CancelFunc
}

func (suite *ClusterSuite) SetupTest() {
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 3*time.Minute)

suite.state = state.WrapCore(namespaced.NewState(inmem.Build))

var err error

logger := logging.Wrap(log.Writer())

suite.runtime, err = runtime.NewRuntime(suite.state, logger)
suite.Require().NoError(err)
}

func (suite *ClusterSuite) startRuntime() {
suite.wg.Add(1)

go func() {
defer suite.wg.Done()

suite.Assert().NoError(suite.runtime.Run(suite.ctx))
}()
}

func (suite *ClusterSuite) assertResource(md resource.Metadata, check func(res resource.Resource) error) func() error {
return func() error {
r, err := suite.state.Get(suite.ctx, md)
if err != nil {
if state.IsNotFoundError(err) {
return retry.ExpectedError(err)
}

return err
}

return check(r)
}
}

func (suite *ClusterSuite) assertNoResource(md resource.Metadata) func() error {
return func() error {
_, err := suite.state.Get(suite.ctx, md)
if err == nil {
return retry.ExpectedErrorf("resource %s still exists", md)
}

if state.IsNotFoundError(err) {
return nil
}

return err
}
}

func (suite *ClusterSuite) TearDownTest() {
suite.T().Log("tear down")

suite.ctxCancel()

suite.wg.Wait()

// trigger updates in resources to stop watch loops
err := suite.state.Create(context.Background(), config.NewMachineConfig(&v1alpha1.Config{
ConfigVersion: "v1alpha1",
MachineConfig: &v1alpha1.MachineConfig{},
}))
if state.IsConflictError(err) {
err = suite.state.Destroy(context.Background(), config.NewMachineConfig(nil).Metadata())
}

suite.Assert().NoError(err)
}
102 changes: 102 additions & 0 deletions internal/app/machined/pkg/controllers/cluster/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package cluster

import (
"context"
"fmt"

"github.com/AlekSi/pointer"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"

"github.com/talos-systems/talos/pkg/resources/cluster"
"github.com/talos-systems/talos/pkg/resources/config"
)

// ConfigController watches v1alpha1.Config, updates discovery config.
type ConfigController struct{}

// Name implements controller.Controller interface.
func (ctrl *ConfigController) Name() string {
return "cluster.ConfigController"
}

// Inputs implements controller.Controller interface.
func (ctrl *ConfigController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: config.MachineConfigType,
ID: pointer.ToString(config.V1Alpha1ID),
Kind: controller.InputWeak,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *ConfigController) Outputs() []controller.Output {
return []controller.Output{
{
Type: cluster.ConfigType,
Kind: controller.OutputExclusive,
},
}
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *ConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
cfg, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, config.MachineConfigType, config.V1Alpha1ID, resource.VersionUndefined))
if err != nil {
if !state.IsNotFoundError(err) {
return fmt.Errorf("error getting config: %w", err)
}
}

touchedIDs := make(map[resource.ID]struct{})

if cfg != nil {
c := cfg.(*config.MachineConfig).Config()

if err = r.Modify(ctx, cluster.NewConfig(config.NamespaceName, cluster.ConfigID), func(res resource.Resource) error {
res.(*cluster.Config).TypedSpec().DiscoveryEnabled = c.Cluster().Discovery().Enabled()

return nil
}); err != nil {
return err
}

touchedIDs[cluster.ConfigID] = struct{}{}
}

// list keys for cleanup
list, err := r.List(ctx, resource.NewMetadata(config.NamespaceName, cluster.ConfigType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
}

for _, res := range list.Items {
if res.Metadata().Owner() != ctrl.Name() {
continue
}

if _, ok := touchedIDs[res.Metadata().ID()]; !ok {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up specs: %w", err)
}
}
}
}
}
}
87 changes: 87 additions & 0 deletions internal/app/machined/pkg/controllers/cluster/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package cluster_test

import (
"testing"
"time"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"

clusterctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/cluster"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
"github.com/talos-systems/talos/pkg/resources/cluster"
"github.com/talos-systems/talos/pkg/resources/config"
)

type ConfigSuite struct {
ClusterSuite
}

func (suite *ConfigSuite) TestReconcileConfig() {
suite.Require().NoError(suite.runtime.RegisterController(&clusterctrl.ConfigController{}))

suite.startRuntime()

cfg := config.NewMachineConfig(&v1alpha1.Config{
ConfigVersion: "v1alpha1",
ClusterConfig: &v1alpha1.ClusterConfig{
ClusterDiscoveryConfig: v1alpha1.ClusterDiscoveryConfig{
DiscoveryEnabled: true,
},
},
})

suite.Require().NoError(suite.state.Create(suite.ctx, cfg))

specMD := resource.NewMetadata(config.NamespaceName, cluster.ConfigType, cluster.ConfigID, resource.VersionUndefined)

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(
specMD,
func(res resource.Resource) error {
spec := res.(*cluster.Config).TypedSpec()

suite.Assert().True(spec.DiscoveryEnabled)

return nil
},
),
))
}

func (suite *ConfigSuite) TestReconcileDisabled() {
suite.Require().NoError(suite.runtime.RegisterController(&clusterctrl.ConfigController{}))

suite.startRuntime()

cfg := config.NewMachineConfig(&v1alpha1.Config{
ConfigVersion: "v1alpha1",
MachineConfig: &v1alpha1.MachineConfig{},
ClusterConfig: &v1alpha1.ClusterConfig{},
})

suite.Require().NoError(suite.state.Create(suite.ctx, cfg))

specMD := resource.NewMetadata(config.NamespaceName, cluster.ConfigType, cluster.ConfigID, resource.VersionUndefined)

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(
specMD,
func(res resource.Resource) error {
spec := res.(*cluster.Config).TypedSpec()

suite.Assert().False(spec.DiscoveryEnabled)

return nil
},
),
))
}

func TestConfigSuite(t *testing.T) {
suite.Run(t, new(ConfigSuite))
}

0 comments on commit 2c66e1b

Please sign in to comment.