Skip to content

Commit

Permalink
feat: make kubelet service apply changes immediately
Browse files Browse the repository at this point in the history
The gist is that `kubelet` service code only manages the container
lifecycle, while `kubelet` configuration is managed now in the
controllers and resources.

New resources:

* `secrets.Kubelet` contains Kubelet PKI derived directly from the
machine configuration
* `k8s.KubeletConfig` contains Kubelet non-secret config derived
directly from the machine configuration
* `k8s.NodeIPConfig` contains configuration on picking up Node IP for
the kubelet (from machine configuration)
* `k8s.NodeIP` contains actual Node IPs picked from the node addresses
based on `NodeIPConfig`
* `k8s.KubeletSpec` contains final `kubelet` container configuration,
including merged arguments, KubeletConfig, etc. It is derived from
`KubeletConfig`, `Nodename` and `NodeIP`.

Final controller `KubeletServiceController` writes down configuration
and PKI to disk, and manages restart/start of the `kubelet` service
which is a pure wrapper around container lifecycle.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
  • Loading branch information
smira committed Dec 3, 2021
1 parent 4f5d9da commit d2fd7c2
Show file tree
Hide file tree
Showing 35 changed files with 2,324 additions and 330 deletions.
3 changes: 3 additions & 0 deletions hack/release.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ Generated archive does not contain any secret information so it is safe to send
[notes.kubelet]
title = "Kubelet"
description = """\
Kubelet configuration can be updated without node restart (`.machine.kubelet` section of machine configuration) with commands
`talosctl edit mc --immediate`, `talosctl apply-config --immediate`, `talosctl patch mc --immediate`.
Kubelet service can now be restarted with `talosctl service kubelet restart`.
Kubelet node IP configuration (`.machine.kubelet.nodeIP.validSubnets`) can now include negative subnet matches (prefixed with `!`).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (ctrl *KubernetesPullController) Inputs() []controller.Input {
Kind: controller.InputWeak,
},
{
Namespace: k8s.ControlPlaneNamespaceName,
Namespace: k8s.NamespaceName,
Type: k8s.NodenameType,
ID: pointer.ToString(k8s.NodenameID),
Kind: controller.InputWeak,
Expand Down Expand Up @@ -105,7 +105,7 @@ func (ctrl *KubernetesPullController) Run(ctx context.Context, r controller.Runt
return err
}

nodename, err := r.Get(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.NodenameType, k8s.NodenameID, resource.VersionUndefined))
nodename, err := r.Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.NodenameType, k8s.NodenameID, resource.VersionUndefined))
if err != nil {
if !state.IsNotFoundError(err) {
return fmt.Errorf("error getting nodename: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (ctrl *LocalAffiliateController) Inputs() []controller.Input {
Kind: controller.InputWeak,
},
{
Namespace: k8s.ControlPlaneNamespaceName,
Namespace: k8s.NamespaceName,
Type: k8s.NodenameType,
ID: pointer.ToString(k8s.NodenameID),
Kind: controller.InputWeak,
Expand Down Expand Up @@ -126,7 +126,7 @@ func (ctrl *LocalAffiliateController) Run(ctx context.Context, r controller.Runt
continue
}

nodename, err := r.Get(ctx, resource.NewMetadata(k8s.ControlPlaneNamespaceName, k8s.NodenameType, k8s.NodenameID, resource.VersionUndefined))
nodename, err := r.Get(ctx, resource.NewMetadata(k8s.NamespaceName, k8s.NodenameType, k8s.NodenameID, resource.VersionUndefined))
if err != nil {
if !state.IsNotFoundError(err) {
return fmt.Errorf("error getting nodename: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (suite *LocalAffiliateSuite) TestGeneration() {
hostnameStatus.TypedSpec().Hostname = "example1"
suite.Require().NoError(suite.state.Create(suite.ctx, hostnameStatus))

nodename := k8s.NewNodename(k8s.ControlPlaneNamespaceName, k8s.NodenameID)
nodename := k8s.NewNodename(k8s.NamespaceName, k8s.NodenameID)
nodename.TypedSpec().Nodename = "example1.com"
suite.Require().NoError(suite.state.Create(suite.ctx, nodename))

Expand Down
108 changes: 108 additions & 0 deletions internal/app/machined/pkg/controllers/k8s/kubelet_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package k8s

import (
"context"
"fmt"
"net"

"github.com/AlekSi/pointer"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"

"github.com/talos-systems/talos/pkg/machinery/resources/config"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
)

// KubeletConfigController renders manifests based on templates and config/secrets.
type KubeletConfigController struct{}

// Name implements controller.Controller interface.
func (ctrl *KubeletConfigController) Name() string {
return "k8s.KubeletConfigController"
}

// Inputs implements controller.Controller interface.
func (ctrl *KubeletConfigController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: config.MachineConfigType,
ID: pointer.ToString(config.V1Alpha1ID),
Kind: controller.InputWeak,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *KubeletConfigController) Outputs() []controller.Output {
return []controller.Output{
{
Type: k8s.KubeletConfigType,
Kind: controller.OutputExclusive,
},
}
}

// Run implements controller.Controller interface.
func (ctrl *KubeletConfigController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}

cfg, err := r.Get(ctx, resource.NewMetadata(config.NamespaceName, config.MachineConfigType, config.V1Alpha1ID, resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
continue
}

return fmt.Errorf("error getting config: %w", err)
}

cfgProvider := cfg.(*config.MachineConfig).Config()

if err = r.Modify(
ctx,
k8s.NewKubeletConfig(k8s.NamespaceName, k8s.KubeletID),
func(r resource.Resource) error {
kubeletConfig := r.(*k8s.KubeletConfig).TypedSpec()

kubeletConfig.Image = cfgProvider.Machine().Kubelet().Image()

kubeletConfig.ClusterDNS = cfgProvider.Machine().Kubelet().ClusterDNS()

if len(kubeletConfig.ClusterDNS) == 0 {
var addrs []net.IP

addrs, err = cfgProvider.Cluster().Network().DNSServiceIPs()
if err != nil {
return fmt.Errorf("error building DNS service IPs: %w", err)
}

kubeletConfig.ClusterDNS = make([]string, 0, len(addrs))

for _, addr := range addrs {
kubeletConfig.ClusterDNS = append(kubeletConfig.ClusterDNS, addr.String())
}
}

kubeletConfig.ClusterDomain = cfgProvider.Cluster().Network().DNSDomain()
kubeletConfig.ExtraArgs = cfgProvider.Machine().Kubelet().ExtraArgs()
kubeletConfig.ExtraMounts = cfgProvider.Machine().Kubelet().ExtraMounts()
kubeletConfig.CloudProviderExternal = cfgProvider.Cluster().ExternalCloudProvider().Enabled()

return nil
},
); err != nil {
return fmt.Errorf("error modifying KubeletConfig resource: %w", err)
}
}
}
208 changes: 208 additions & 0 deletions internal/app/machined/pkg/controllers/k8s/kubelet_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

//nolint:dupl
package k8s_test

import (
"context"
"log"
"net/url"
"sync"
"testing"
"time"

"github.com/cosi-project/runtime/pkg/controller/runtime"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"github.com/cosi-project/runtime/pkg/state/impl/inmem"
"github.com/cosi-project/runtime/pkg/state/impl/namespaced"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"

k8sctrl "github.com/talos-systems/talos/internal/app/machined/pkg/controllers/k8s"
"github.com/talos-systems/talos/pkg/logging"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/machinery/resources/config"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
)

type KubeletConfigSuite struct {
suite.Suite

state state.State

runtime *runtime.Runtime
wg sync.WaitGroup

ctx context.Context
ctxCancel context.CancelFunc
}

func (suite *KubeletConfigSuite) SetupTest() {
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 3*time.Minute)

suite.state = state.WrapCore(namespaced.NewState(inmem.Build))

var err error

suite.runtime, err = runtime.NewRuntime(suite.state, logging.Wrap(log.Writer()))
suite.Require().NoError(err)

suite.Require().NoError(suite.runtime.RegisterController(&k8sctrl.KubeletConfigController{}))

suite.startRuntime()
}

func (suite *KubeletConfigSuite) startRuntime() {
suite.wg.Add(1)

go func() {
defer suite.wg.Done()

suite.Assert().NoError(suite.runtime.Run(suite.ctx))
}()
}

func (suite *KubeletConfigSuite) TestReconcile() {
u, err := url.Parse("https://foo:6443")
suite.Require().NoError(err)

cfg := config.NewMachineConfig(&v1alpha1.Config{
ConfigVersion: "v1alpha1",
MachineConfig: &v1alpha1.MachineConfig{
MachineKubelet: &v1alpha1.KubeletConfig{
KubeletImage: "kubelet",
KubeletClusterDNS: []string{"10.0.0.1"},
KubeletExtraArgs: map[string]string{
"enable-feature": "foo",
},
KubeletExtraMounts: []v1alpha1.ExtraMount{
{
Mount: specs.Mount{
Destination: "/tmp",
Source: "/var",
Type: "tmpfs",
},
},
},
},
},
ClusterConfig: &v1alpha1.ClusterConfig{
ControlPlane: &v1alpha1.ControlPlaneConfig{
Endpoint: &v1alpha1.Endpoint{
URL: u,
},
},
ExternalCloudProviderConfig: &v1alpha1.ExternalCloudProviderConfig{
ExternalEnabled: true,
},
ClusterNetwork: &v1alpha1.ClusterNetworkConfig{
DNSDomain: "service.svc",
},
},
})

suite.Require().NoError(suite.state.Create(suite.ctx, cfg))

suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
func() error {
kubeletConfig, err := suite.state.Get(suite.ctx, resource.NewMetadata(k8s.NamespaceName, k8s.KubeletConfigType, k8s.KubeletID, resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
return retry.ExpectedError(err)
}

return err
}

spec := kubeletConfig.(*k8s.KubeletConfig).TypedSpec()

suite.Assert().Equal("kubelet", spec.Image)
suite.Assert().Equal([]string{"10.0.0.1"}, spec.ClusterDNS)
suite.Assert().Equal("service.svc", spec.ClusterDomain)
suite.Assert().Equal(
map[string]string{
"enable-feature": "foo",
},
spec.ExtraArgs)
suite.Assert().Equal(
[]specs.Mount{
{
Destination: "/tmp",
Source: "/var",
Type: "tmpfs",
},
},
spec.ExtraMounts)
suite.Assert().True(spec.CloudProviderExternal)

return nil
},
))
}

func (suite *KubeletConfigSuite) TestReconcileDefaults() {
u, err := url.Parse("https://foo:6443")
suite.Require().NoError(err)

cfg := config.NewMachineConfig(&v1alpha1.Config{
ConfigVersion: "v1alpha1",
MachineConfig: &v1alpha1.MachineConfig{
MachineKubelet: &v1alpha1.KubeletConfig{
KubeletImage: "kubelet",
},
},
ClusterConfig: &v1alpha1.ClusterConfig{
ControlPlane: &v1alpha1.ControlPlaneConfig{
Endpoint: &v1alpha1.Endpoint{
URL: u,
},
},
ClusterNetwork: &v1alpha1.ClusterNetworkConfig{
ServiceSubnet: []string{constants.DefaultIPv4ServiceNet},
},
},
})

suite.Require().NoError(suite.state.Create(suite.ctx, cfg))

suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
func() error {
kubeletConfig, err := suite.state.Get(suite.ctx, resource.NewMetadata(k8s.NamespaceName, k8s.KubeletConfigType, k8s.KubeletID, resource.VersionUndefined))
if err != nil {
if state.IsNotFoundError(err) {
return retry.ExpectedError(err)
}

return err
}

spec := kubeletConfig.(*k8s.KubeletConfig).TypedSpec()

suite.Assert().Equal("kubelet", spec.Image)
suite.Assert().Equal([]string{"10.96.0.10"}, spec.ClusterDNS)
suite.Assert().Equal("", spec.ClusterDomain)
suite.Assert().Empty(spec.ExtraArgs)
suite.Assert().Empty(spec.ExtraMounts)
suite.Assert().False(spec.CloudProviderExternal)

return nil
},
))
}

func (suite *KubeletConfigSuite) TearDownTest() {
suite.T().Log("tear down")

suite.ctxCancel()

suite.wg.Wait()
}

func TestKubeletConfigSuite(t *testing.T) {
suite.Run(t, new(KubeletConfigSuite))
}

0 comments on commit d2fd7c2

Please sign in to comment.