Skip to content

Commit

Permalink
feat: upgrade kubelet version in talosctl upgrade-k8s
Browse files Browse the repository at this point in the history
Fixes #4656

As now changes to kubelet configuration can be applied without a reboot,
`talosctl upgrade-k8s` can handle the kubelet upgrades as well.

The gist is simply modifying machine config and waiting for `Node`
version to be updated, rest of the code is required for reliability of
the process.

Also fixed a bug in the API while watching deleted items with
tombstones.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
(cherry picked from commit 97ffa7a)
  • Loading branch information
smira committed Dec 13, 2021
1 parent 4daff78 commit fe2e953
Show file tree
Hide file tree
Showing 9 changed files with 624 additions and 56 deletions.
1 change: 1 addition & 0 deletions cmd/talosctl/cmd/talos/upgrade-k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
upgradeK8sCmd.Flags().StringVar(&upgradeOptions.ToVersion, "to", constants.DefaultKubernetesVersion, "the Kubernetes control plane version to upgrade to")
upgradeK8sCmd.Flags().StringVar(&upgradeOptions.ControlPlaneEndpoint, "endpoint", "", "the cluster control plane endpoint")
upgradeK8sCmd.Flags().BoolVar(&upgradeOptions.DryRun, "dry-run", false, "skip the actual upgrade and show the upgrade plan instead")
upgradeK8sCmd.Flags().BoolVar(&upgradeOptions.UpgradeKubelet, "upgrade-kubelet", true, "upgrade kubelet service")
cli.Should(upgradeK8sCmd.MarkFlagRequired("to"))
addCommand(upgradeK8sCmd)
}
Expand Down
5 changes: 4 additions & 1 deletion hack/release.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ Talos now supports setting MTU and Virtual IPs on VLAN interfaces.
[notes.upgrades]
title = "Kubernetes Upgrade Enhancements"
description="""\
`talosctl upgrade-k8s` now syncs all Talos manifest resources generated from templates.
`talosctl upgrade-k8s` was improved to:
* sync all boostrap manifest resources in the Kubernetes cluster with versions bundled with current version Talos
* upgrade `kubelet` to the version of the control plane components (without node reboot)
So there is no need to update CoreDNS, Flannel container manually after running `upgrade-k8s` anymore.
"""
Expand Down
2 changes: 1 addition & 1 deletion internal/app/resources/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func marshalResource(r resource.Resource) (*resourceapi.Resource, error) {

spec := &resourceapi.Spec{}

if r.Spec() != nil {
if !resource.IsTombstone(r) && r.Spec() != nil {
var err error

spec.Yaml, err = yaml.Marshal(r.Spec())
Expand Down
11 changes: 9 additions & 2 deletions internal/integration/provision/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type upgradeSpec struct {
TargetVersion string
TargetK8sVersion string

SkipKubeletUpgrade bool

MasterNodes int
WorkerNodes int

Expand Down Expand Up @@ -95,6 +97,9 @@ func upgradePreviousToStable() upgradeSpec {
TargetVersion: stableRelease,
TargetK8sVersion: stableK8sVersion,

// TODO: remove when StableVersion >= 0.14.0-beta.0
SkipKubeletUpgrade: true,

MasterNodes: DefaultSettings.MasterNodes,
WorkerNodes: DefaultSettings.WorkerNodes,
}
Expand Down Expand Up @@ -565,7 +570,7 @@ func (suite *UpgradeSuite) upgradeNode(client *talosclient.Client, node provisio
suite.waitForClusterHealth()
}

func (suite *UpgradeSuite) upgradeKubernetes(fromVersion, toVersion string) {
func (suite *UpgradeSuite) upgradeKubernetes(fromVersion, toVersion string, skipKubeletUpgrade bool) {
if fromVersion == toVersion {
suite.T().Logf("skipping Kubernetes upgrade, as versions are equal %q -> %q", fromVersion, toVersion)

Expand All @@ -579,6 +584,8 @@ func (suite *UpgradeSuite) upgradeKubernetes(fromVersion, toVersion string) {
ToVersion: toVersion,

ControlPlaneEndpoint: suite.controlPlaneEndpoint,

UpgradeKubelet: !skipKubeletUpgrade,
}

suite.Require().NoError(kubernetes.UpgradeTalosManaged(suite.ctx, suite.clusterAccess, options))
Expand Down Expand Up @@ -643,7 +650,7 @@ func (suite *UpgradeSuite) TestRolling() {
suite.assertSameVersionCluster(client, suite.spec.TargetVersion)

// upgrade Kubernetes if required
suite.upgradeKubernetes(suite.spec.SourceK8sVersion, suite.spec.TargetK8sVersion)
suite.upgradeKubernetes(suite.spec.SourceK8sVersion, suite.spec.TargetK8sVersion, suite.spec.SkipKubeletUpgrade)

// run e2e test
suite.runE2E(suite.spec.TargetK8sVersion)
Expand Down
317 changes: 317 additions & 0 deletions pkg/cluster/kubernetes/kubelet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package kubernetes

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/state"
"github.com/talos-systems/go-retry/retry"
"gopkg.in/yaml.v3"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/talos-systems/talos/pkg/kubernetes"
"github.com/talos-systems/talos/pkg/machinery/client"
v1alpha1config "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/machinery/resources/k8s"
"github.com/talos-systems/talos/pkg/machinery/resources/v1alpha1"
)

const kubelet = "kubelet"

func upgradeKubelet(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions) error {
if !options.UpgradeKubelet {
options.Log("skipped updating kubelet")

return nil
}

options.Log("updating kubelet to version %q", options.ToVersion)

for _, node := range append(append([]string(nil), options.masterNodes...), options.workerNodes...) {
if err := upgradeKubeletOnNode(ctx, cluster, options, node); err != nil {
return fmt.Errorf("error updating node %q: %w", node, err)
}
}

return nil
}

func serviceSpecFromResource(r resource.Resource) (*v1alpha1.ServiceSpec, error) {
marshaled, err := resource.MarshalYAML(r)
if err != nil {
return nil, err
}

yml, err := yaml.Marshal(marshaled)
if err != nil {
return nil, err
}

var mock struct {
Spec v1alpha1.ServiceSpec `yaml:"spec"`
}

if err = yaml.Unmarshal(yml, &mock); err != nil {
return nil, err
}

return &mock.Spec, nil
}

//nolint:gocyclo,cyclop
func upgradeKubeletOnNode(ctx context.Context, cluster UpgradeProvider, options UpgradeOptions, node string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

c, err := cluster.Client()
if err != nil {
return fmt.Errorf("error building Talos API client: %w", err)
}

ctx = client.WithNodes(ctx, node)

options.Log(" > %q: starting update", node)

watchClient, err := c.Resources.Watch(ctx, v1alpha1.NamespaceName, v1alpha1.ServiceType, kubelet)
if err != nil {
return fmt.Errorf("error watching service: %w", err)
}

// first response is resource definition
_, err = watchClient.Recv()
if err != nil {
return fmt.Errorf("error watching service: %w", err)
}

// second is the initial state
watchInitial, err := watchClient.Recv()
if err != nil {
return fmt.Errorf("error watching service: %w", err)
}

if watchInitial.EventType != state.Created {
return fmt.Errorf("unexpected event type: %s", watchInitial.EventType)
}

initialService, err := serviceSpecFromResource(watchInitial.Resource)
if err != nil {
return fmt.Errorf("error inspecting service: %w", err)
}

if !initialService.Running || !initialService.Healthy {
return fmt.Errorf("kubelet is not healthy")
}

// find out current kubelet version, as the machine config might have a missing image field,
// look it up from the kubelet spec

kubeletSpec, err := c.Resources.Get(ctx, k8s.NamespaceName, k8s.KubeletSpecType, kubelet)
if err != nil {
return fmt.Errorf("error fetching kubelet spec: %w", err)
}

if len(kubeletSpec) != 1 {
return fmt.Errorf("unexpected number of responses: %d", len(kubeletSpec))
}

skipWait := false

err = patchNodeConfig(ctx, cluster, node, upgradeKubeletPatcher(options, kubeletSpec[0].Resource))
if err != nil {
if errors.Is(err, errUpdateSkipped) {
skipWait = true
} else {
return fmt.Errorf("error patching node config: %w", err)
}
}

if options.DryRun {
return nil
}

options.Log(" > %q: machine configuration patched", node)

if !skipWait {
options.Log(" > %q: waiting for kubelet restart", node)

// first, wait for kubelet to go down
for {
var watchUpdated client.WatchResponse

watchUpdated, err = watchClient.Recv()
if err != nil {
return fmt.Errorf("error watching service: %w", err)
}

if watchUpdated.EventType == state.Destroyed {
break
}
}

// now wait for kubelet to go up & healthy
for {
var watchUpdated client.WatchResponse

watchUpdated, err = watchClient.Recv()
if err != nil {
return fmt.Errorf("error watching service: %w", err)
}

if watchUpdated.EventType == state.Created || watchUpdated.EventType == state.Updated {
var service *v1alpha1.ServiceSpec

service, err = serviceSpecFromResource(watchUpdated.Resource)
if err != nil {
return fmt.Errorf("error inspecting service: %w", err)
}

if service.Running && service.Healthy {
break
}
}
}
}

options.Log(" > %q: waiting for node update", node)

if err = retry.Constant(3*time.Minute, retry.WithUnits(10*time.Second)).Retry(func() error {
return checkNodeKubeletVersion(ctx, cluster, node, "v"+options.ToVersion)
}); err != nil {
return err
}

options.Log(" < %q: successfully updated", node)

return nil
}

func upgradeKubeletPatcher(options UpgradeOptions, kubeletSpec resource.Resource) func(config *v1alpha1config.Config) error {
return func(config *v1alpha1config.Config) error {
if config.MachineConfig == nil {
config.MachineConfig = &v1alpha1config.MachineConfig{}
}

if config.MachineConfig.MachineKubelet == nil {
config.MachineConfig.MachineKubelet = &v1alpha1config.KubeletConfig{}
}

var (
any *resource.Any
ok bool
)

if any, ok = kubeletSpec.(*resource.Any); !ok {
return fmt.Errorf("unexpected resource type")
}

oldImage := any.Value().(map[string]interface{})["image"].(string) //nolint:errcheck,forcetypeassert

logUpdate := func(oldImage string) {
parts := strings.Split(oldImage, ":")
version := options.FromVersion

if len(parts) > 1 {
version = parts[1]
}

options.Log(" > update %s: %s -> %s", kubelet, version, options.ToVersion)

if options.DryRun {
options.Log(" > skipped in dry-run")
}
}

image := fmt.Sprintf("%s:v%s", constants.KubeletImage, options.ToVersion)

if oldImage == image {
return errUpdateSkipped
}

logUpdate(oldImage)

if options.DryRun {
return errUpdateSkipped
}

config.MachineConfig.MachineKubelet.KubeletImage = image

return nil
}
}

//nolint:gocyclo
func checkNodeKubeletVersion(ctx context.Context, cluster UpgradeProvider, nodeToCheck, version string) error {
k8sClient, err := cluster.K8sHelper(ctx)
if err != nil {
return fmt.Errorf("error building kubernetes client: %w", err)
}

nodes, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
if kubernetes.IsRetryableError(err) {
return retry.ExpectedError(err)
}

return err
}

nodeFound := false

for _, node := range nodes.Items {
matchingNode := false

for _, address := range node.Status.Addresses {
if address.Address == nodeToCheck {
matchingNode = true

break
}
}

if !matchingNode {
continue
}

nodeFound = true

if node.Status.NodeInfo.KubeletVersion != version {
return retry.ExpectedError(fmt.Errorf("node version mismatch: got %q, expected %q", node.Status.NodeInfo.KubeletVersion, version))
}

ready := false

for _, condition := range node.Status.Conditions {
if condition.Type != v1.NodeReady {
continue
}

if condition.Status == v1.ConditionTrue {
ready = true

break
}
}

if !ready {
return retry.ExpectedError(fmt.Errorf("node is not ready"))
}

break
}

if !nodeFound {
return retry.ExpectedError(fmt.Errorf("node %q not found", nodeToCheck))
}

return nil
}

0 comments on commit fe2e953

Please sign in to comment.