Skip to content

Commit

Permalink
feat: implement KubeSpan identity controller
Browse files Browse the repository at this point in the history
Fixes #4138

When KubeSpan is enabled, Talos automatically generates or loads
KubeSpan identity which consists of Wireguard key pair. ULA address is
calculated based on ClusterID and first NIC MAC address.

Some code was borrowed from #3577.

Example:

```
$ talosctl -n 172.20.0.2 get ksi
NODE         NAMESPACE   TYPE               ID      VERSION   ADDRESS                                       PUBLICKEY
172.20.0.2   kubespan    KubeSpanIdentity   local   1         fd71:6e1d:86be:6302:e871:1bff:feb2:ccee/128   Oak2fBEWngBhwslBxDVgnRNHXs88OAp4kjroSX0uqUE=
```

Additional changes:

* `--with-kubespan` flag for `talosctl cluster create` for quick testing
* validate that cluster discovery (and KubeSpan) requires ClusterID and
ClusterSecret.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
Signed-off-by: Seán C McCord <ulexus@gmail.com>
Co-authored-by: Seán C McCord <ulexus@gmail.com>
  • Loading branch information
smira and Ulexus committed Aug 27, 2021
1 parent da0f6e7 commit caee24b
Show file tree
Hide file tree
Showing 30 changed files with 1,134 additions and 57 deletions.
11 changes: 11 additions & 0 deletions cmd/talosctl/cmd/mgmt/cluster/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ var (
encryptStatePartition bool
encryptEphemeralPartition bool
useVIP bool
enableKubeSpan bool
configPatch string
configPatchControlPlane string
configPatchWorker string
Expand Down Expand Up @@ -382,6 +383,15 @@ func create(ctx context.Context) (err error) {
)
}

if enableKubeSpan {
genOptions = append(genOptions,
generate.WithNetworkOptions(
v1alpha1.WithKubeSpan(),
),
generate.WithClusterDiscovery(),
)
}

defaultInternalLB, defaultEndpoint := provisioner.GetLoadBalancers(request.Network)

if defaultInternalLB == "" {
Expand Down Expand Up @@ -818,6 +828,7 @@ func init() {
createCmd.Flags().BoolVar(&encryptEphemeralPartition, "encrypt-ephemeral", false, "enable ephemeral partition encryption")
createCmd.Flags().StringVar(&talosVersion, "talos-version", "", "the desired Talos version to generate config for (if not set, defaults to image version)")
createCmd.Flags().BoolVar(&useVIP, "use-vip", false, "use a virtual IP for the controlplane endpoint instead of the loadbalancer")
createCmd.Flags().BoolVar(&enableKubeSpan, "with-kubespan", false, "enable KubeSpan system")
createCmd.Flags().StringVar(&configPatch, "config-patch", "", "patch generated machineconfigs (applied to all node types)")
createCmd.Flags().StringVar(&configPatchControlPlane, "config-patch-control-plane", "", "patch generated machineconfigs (applied to 'init' and 'controlplane' types)")
createCmd.Flags().StringVar(&configPatchWorker, "config-patch-worker", "", "patch generated machineconfigs (applied to 'worker' type)")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (
github.com/mdlayher/ethtool v0.0.0-20210210192532-2b88debcdd43
github.com/mdlayher/genetlink v1.0.0
github.com/mdlayher/netlink v1.4.1
github.com/mdlayher/netx v0.0.0-20200512211805-669a06fde734
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417
github.com/packethost/packngo v0.19.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,8 @@ github.com/mdlayher/netlink v1.3.0/go.mod h1:xK/BssKuwcRXHrtN04UBkwQ6dY9VviGGuri
github.com/mdlayher/netlink v1.4.0/go.mod h1:dRJi5IABcZpBD2A3D0Mv/AiX8I9uDEu5oGkAVrekmf8=
github.com/mdlayher/netlink v1.4.1 h1:I154BCU+mKlIf7BgcAJB2r7QjveNPty6uNY1g9ChVfI=
github.com/mdlayher/netlink v1.4.1/go.mod h1:e4/KuJ+s8UhfUpO9z00/fDZZmhSrs+oxyqAS9cNgn6Q=
github.com/mdlayher/netx v0.0.0-20200512211805-669a06fde734 h1:DzkgdcT/W7794xU5P7GdZvok/lJECZ8g4xS+vMNLREI=
github.com/mdlayher/netx v0.0.0-20200512211805-669a06fde734/go.mod h1:iN5Y6R8oOaC0KMzLtw/dqCJ2ZOipmk+bncXKStCHr7Q=
github.com/mdlayher/raw v0.0.0-20190313224157-43dbcdd7739d/go.mod h1:r1fbeITl2xL/zLbVnNHFyOzQJTgr/3fpf1lJX/cjzR8=
github.com/mdlayher/raw v0.0.0-20190606142536-fef19f00fc18/go.mod h1:7EpbotpCmVZcu+KCX4g9WaRNuu11uyhiW7+Le1dKawg=
github.com/mdlayher/raw v0.0.0-20191009151244-50f2db8cc065 h1:aFkJ6lx4FPip+S+Uw4aTegFMct9shDvP+79PsSxpm3w=
Expand Down
53 changes: 2 additions & 51 deletions internal/app/machined/pkg/controllers/cluster/node_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,15 @@ package cluster
import (
"context"
"fmt"
"os"
"path/filepath"
"reflect"

"github.com/AlekSi/pointer"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"
"gopkg.in/yaml.v3"

"github.com/talos-systems/talos/internal/app/machined/pkg/controllers"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/resources/cluster"
Expand Down Expand Up @@ -60,53 +58,6 @@ func (ctrl *NodeIdentityController) Outputs() []controller.Output {
}
}

func loadOrNewFromState(statePath, path string, empty interface{}, generate func(interface{}) error) error {
fullPath := filepath.Join(statePath, path)

f, err := os.OpenFile(fullPath, os.O_RDONLY, 0)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("error reading state file: %w", err)
}

// file doesn't exist yet, generate new value and save it
if f == nil {
if err = generate(empty); err != nil {
return err
}

f, err = os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0o600)
if err != nil {
return fmt.Errorf("error creating state file: %w", err)
}

defer f.Close() //nolint:errcheck

encoder := yaml.NewEncoder(f)
if err = encoder.Encode(empty); err != nil {
return fmt.Errorf("error marshaling: %w", err)
}

if err = encoder.Close(); err != nil {
return err
}

return f.Close()
}

// read existing cached value
defer f.Close() //nolint:errcheck

if err = yaml.NewDecoder(f).Decode(empty); err != nil {
return fmt.Errorf("error unmarshaling: %w", err)
}

if reflect.ValueOf(empty).Elem().IsZero() {
return fmt.Errorf("value is still zero after unmarshaling")
}

return f.Close()
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
Expand Down Expand Up @@ -136,7 +87,7 @@ func (ctrl *NodeIdentityController) Run(ctx context.Context, r controller.Runtim

var localIdentity cluster.IdentitySpec

if err := loadOrNewFromState(ctrl.StatePath, constants.NodeIdentityFilename, &localIdentity, func(v interface{}) error {
if err := controllers.LoadOrNewFromFile(filepath.Join(ctrl.StatePath, constants.NodeIdentityFilename), &localIdentity, func(v interface{}) error {
return v.(*cluster.IdentitySpec).Generate()
}); err != nil {
return fmt.Errorf("error caching node identity: %w", err)
Expand Down
103 changes: 103 additions & 0 deletions internal/app/machined/pkg/controllers/kubespan/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package kubespan

import (
"context"
"fmt"

"github.com/AlekSi/pointer"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"

"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/kubespan"
)

// ConfigController watches v1alpha1.Config, updates KubeSpan config.
type ConfigController struct{}

// Name implements controller.Controller interface.
func (ctrl *ConfigController) Name() string {
return "kubespan.ConfigController"
}

// Inputs implements controller.Controller interface.
func (ctrl *ConfigController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: config.MachineConfigType,
ID: pointer.ToString(config.V1Alpha1ID),
Kind: controller.InputWeak,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *ConfigController) Outputs() []controller.Output {
return []controller.Output{
{
Type: kubespan.ConfigType,
Kind: controller.OutputExclusive,
},
}
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *ConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
cfg, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, config.MachineConfigType, config.V1Alpha1ID, resource.VersionUndefined))
if err != nil {
if !state.IsNotFoundError(err) {
return fmt.Errorf("error getting config: %w", err)
}
}

touchedIDs := make(map[resource.ID]struct{})

if cfg != nil {
c := cfg.(*config.MachineConfig).Config()

if err = r.Modify(ctx, kubespan.NewConfig(config.NamespaceName, kubespan.ConfigID), func(res resource.Resource) error {
res.(*kubespan.Config).TypedSpec().Enabled = c.Machine().Network().KubeSpan().Enabled()
res.(*kubespan.Config).TypedSpec().ClusterID = c.Cluster().ID()

return nil
}); err != nil {
return err
}

touchedIDs[kubespan.ConfigID] = struct{}{}
}

// list keys for cleanup
list, err := r.List(ctx, resource.NewMetadata(config.NamespaceName, kubespan.ConfigType, "", resource.VersionUndefined))
if err != nil {
return fmt.Errorf("error listing resources: %w", err)
}

for _, res := range list.Items {
if res.Metadata().Owner() != ctrl.Name() {
continue
}

if _, ok := touchedIDs[res.Metadata().ID()]; !ok {
if err = r.Destroy(ctx, res.Metadata()); err != nil {
return fmt.Errorf("error cleaning up specs: %w", err)
}
}
}
}
}
}
93 changes: 93 additions & 0 deletions internal/app/machined/pkg/controllers/kubespan/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package kubespan_test

import (
"testing"
"time"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"

kubespanctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/kubespan"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
"github.com/talos-systems/talos/pkg/resources/config"
"github.com/talos-systems/talos/pkg/resources/kubespan"
)

type ConfigSuite struct {
KubeSpanSuite
}

func (suite *ConfigSuite) TestReconcileConfig() {
suite.Require().NoError(suite.runtime.RegisterController(&kubespanctrl.ConfigController{}))

suite.startRuntime()

cfg := config.NewMachineConfig(&v1alpha1.Config{
ConfigVersion: "v1alpha1",
MachineConfig: &v1alpha1.MachineConfig{
MachineNetwork: &v1alpha1.NetworkConfig{
NetworkKubeSpan: v1alpha1.NetworkKubeSpan{
KubeSpanEnabled: true,
},
},
},
ClusterConfig: &v1alpha1.ClusterConfig{
ClusterID: "8XuV9TZHW08DOk3bVxQjH9ih_TBKjnh-j44tsCLSBzo=",
},
})

suite.Require().NoError(suite.state.Create(suite.ctx, cfg))

specMD := resource.NewMetadata(config.NamespaceName, kubespan.ConfigType, kubespan.ConfigID, resource.VersionUndefined)

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(
specMD,
func(res resource.Resource) error {
spec := res.(*kubespan.Config).TypedSpec()

suite.Assert().True(spec.Enabled)
suite.Assert().Equal("8XuV9TZHW08DOk3bVxQjH9ih_TBKjnh-j44tsCLSBzo=", spec.ClusterID)

return nil
},
),
))
}

func (suite *ConfigSuite) TestReconcileDisabled() {
suite.Require().NoError(suite.runtime.RegisterController(&kubespanctrl.ConfigController{}))

suite.startRuntime()

cfg := config.NewMachineConfig(&v1alpha1.Config{
ConfigVersion: "v1alpha1",
MachineConfig: &v1alpha1.MachineConfig{},
ClusterConfig: &v1alpha1.ClusterConfig{},
})

suite.Require().NoError(suite.state.Create(suite.ctx, cfg))

specMD := resource.NewMetadata(config.NamespaceName, kubespan.ConfigType, kubespan.ConfigID, resource.VersionUndefined)

suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
suite.assertResource(
specMD,
func(res resource.Resource) error {
spec := res.(*kubespan.Config).TypedSpec()

suite.Assert().False(spec.Enabled)

return nil
},
),
))
}

func TestConfigSuite(t *testing.T) {
suite.Run(t, new(ConfigSuite))
}

0 comments on commit caee24b

Please sign in to comment.