Skip to content

Commit

Permalink
Fix static pod UID generation and cleanup
Browse files Browse the repository at this point in the history
Partial revert of 53d8510

This reverts static pod UID generation to use the same logic as the
kubelet, in favor of manually removing stale static pods for etcd and
the apiserver. The fixed pod UID was an attempt to trick the kubelet
into syncing static pod containers even when the apiserver was
unavailable, but it had the downside of preventing it from ever updating
the mirror pods. This was confusing for end users who expect the pods
visible via the apiserver to reflect what is actually running on the
nodes.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Jul 26, 2023
1 parent 3aaa57a commit d4bed77
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 55 deletions.
2 changes: 1 addition & 1 deletion pkg/rke2/rke2.go
Expand Up @@ -103,7 +103,7 @@ func Server(clx *cli.Context, cfg Config) error {
}
dataDir := clx.String("data-dir")
cmds.ServerConfig.StartupHooks = append(cmds.ServerConfig.StartupHooks,
checkStaticManifests(cmds.AgentConfig.ContainerRuntimeEndpoint, dataDir),
reconcileStaticPods(cmds.AgentConfig.ContainerRuntimeEndpoint, dataDir),
setNetworkPolicies(cisMode, defaultNamespaces),
setClusterRoles(),
restrictServiceAccounts(cisMode, defaultNamespaces),
Expand Down
98 changes: 58 additions & 40 deletions pkg/rke2/spw.go
Expand Up @@ -2,7 +2,6 @@ package rke2

import (
"context"
"encoding/json"
"os"
"path/filepath"
"sync"
Expand All @@ -11,6 +10,7 @@ import (
"github.com/k3s-io/k3s/pkg/agent/cri"
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/pkg/errors"
"github.com/rancher/rke2/pkg/staticpod"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -22,11 +22,11 @@ type containerInfo struct {
Config *runtimeapi.ContainerConfig `json:"config,omitempty"`
}

// checkStaticManifests validates that the pods started with rke2 match the static manifests
// provided in /var/lib/rancher/rke2/agent/pod-manifests. When restarting rke2, it takes time
// for any changes to static manifests to be pulled by kubelet. Additionally this prevents errors
// where something is wrong with the static manifests and RKE2 starts anyways.
func checkStaticManifests(containerRuntimeEndpoint, dataDir string) cmds.StartupHook {
// reconcileStaticPods validates that the running pods for etcd and kube-apiserver match the static pod
// manifests provided in /var/lib/rancher/rke2/agent/pod-manifests. If any old pods are found, they are
// manually terminated, as the kubelet cannot be relied upon to terminate old pod when the apiserver is
// not available.
func reconcileStaticPods(containerRuntimeEndpoint, dataDir string) cmds.StartupHook {
return func(ctx context.Context, wg *sync.WaitGroup, args cmds.StartupHookArgs) error {
go func() {
defer wg.Done()
Expand All @@ -51,22 +51,22 @@ func checkStaticManifests(containerRuntimeEndpoint, dataDir string) cmds.Startup
// Since split-role servers exist, we don't care if no manifest is found
continue
}
logrus.Infof("Container for %s not found (%v), retrying", pod, err)
logrus.Infof("Pod for %s not synced (%v), retrying", pod, err)
return false, nil
}
logrus.Infof("Container for %s is running", pod)
logrus.Infof("Pod for %s is synced", pod)
}
return true, nil
}); err != nil {
logrus.Fatalf("Failed waiting for static pods to deploy: %v", err)
logrus.Fatalf("Failed waiting for static pods to sync: %v", err)
}
}()
return nil
}
}

// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and
// verified as present and running with the current pod hash in the container runtime.
// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and verified as present
// and exclusively running with the current pod uid. If old pods are found, they will be terminated and an error returned.
func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServiceClient, manifestFile string) error {
f, err := os.Open(manifestFile)
if err != nil {
Expand All @@ -81,43 +81,61 @@ func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServi
return errors.Wrap(err, "failed to decode manifest")
}

var podHash string
for _, env := range podManifest.Spec.Containers[0].Env {
if env.Name == "POD_HASH" {
podHash = env.Value
break
}
}

filter := &runtimeapi.ContainerFilter{
State: &runtimeapi.ContainerStateValue{
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
},
filter := &runtimeapi.PodSandboxFilter{
LabelSelector: map[string]string{
"io.kubernetes.pod.uid": string(podManifest.UID),
"component": podManifest.Labels["component"],
"io.kubernetes.pod.namespace": podManifest.Namespace,
"tier": podManifest.Labels["tier"],
},
}

resp, err := cRuntime.ListContainers(ctx, &runtimeapi.ListContainersRequest{Filter: filter})
resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter})
if err != nil {
return errors.Wrap(err, "failed to list containers")
return errors.Wrap(err, "failed to list pods")
}

for _, container := range resp.Containers {
resp, err := cRuntime.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{ContainerId: container.Id, Verbose: true})
if err != nil {
return errors.Wrap(err, "failed to get container status")
}
info := &containerInfo{}
err = json.Unmarshal([]byte(resp.Info["info"]), &info)
if err != nil || info.Config == nil {
return errors.Wrap(err, "failed to unmarshal container config")
var currentPod, stalePod bool
for _, pod := range resp.Items {
if pod.Annotations["kubernetes.io/config.source"] != "file" {
continue
}
for _, env := range info.Config.Envs {
if env.Key == "POD_HASH" && env.Value == podHash {
return nil
if pod.Labels["io.kubernetes.pod.uid"] == string(podManifest.UID) {
// Only mark the pod with matching UID as current if it is actually ready
currentPod = pod.State == runtimeapi.PodSandboxState_SANDBOX_READY
} else {
stalePod = true

// Stop containers with 10 second timeout before removing the pod. This sends the
// containers SIGTERM, and then SIGKILL 10 seconds later. Removing the pod directly just
// sends the containers SIGKILL immediately which allows no time for orderly shutdown.
cfilter := &runtimeapi.ContainerFilter{
LabelSelector: map[string]string{
"io.kubernetes.pod.uid": pod.Labels["io.kubernetes.pod.uid"],
},
}
cresp, err := cRuntime.ListContainers(ctx, &runtimeapi.ListContainersRequest{Filter: cfilter})
if err != nil {
logrus.Warnf("Failed to list containers for pod %s: %v", pod.Metadata.Name, err)
} else {
for _, container := range cresp.Containers {
logrus.Infof("Stopping %s container for previous %s pod", container.Metadata.Name, pod.Metadata.Name)
if _, err := cRuntime.StopContainer(ctx, &runtimeapi.StopContainerRequest{ContainerId: container.Id, Timeout: *staticpod.DefaultTerminationGracePeriodSeconds}); err != nil {
logrus.Warnf("Failed to stop container %s for pod %s: %v", container.Metadata.Name, pod.Metadata.Name, err)
}
}
}

logrus.Infof("Deleting previous %s pod", pod.Metadata.Name)
if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil {
logrus.Warnf("Failed to remove old pod %s: %v", pod.Metadata.Name, err)
}
}
}
return errors.New("no matching container found")

if stalePod {
return errors.New("waiting for termination of old pod")
}
if !currentPod {
return errors.New("no current running pod found")
}
return nil
}
29 changes: 15 additions & 14 deletions pkg/staticpod/staticpod.go
Expand Up @@ -23,13 +23,18 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/kubernetes/pkg/util/hash"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"
)

const (
extraMountPrefix = "extra-mount"
)

var (
DefaultTerminationGracePeriodSeconds = pointer.Int64(10)
)

type ProbeConf struct {
InitialDelaySeconds int32
TimeoutSeconds int32
Expand Down Expand Up @@ -94,22 +99,16 @@ func Run(dir string, args Args) error {

manifestPath := filepath.Join(dir, args.Command+".yaml")

// Generate a stable UID based on the manifest path. This allows the kubelet to reconcile the pod's
// containers even when the apiserver is unavailable. If the UID is not stable, the kubelet
// will consider the manifest change as two separate add/remove operations, and may start the new pod
// before terminating the old one. Cleanup of removed pods is disabled until all sources have synced,
// so if the apiserver is down, the newly added pod may get stuck in a crash loop due to the old pod
// still using its ports. See https://github.com/rancher/rke2/issues/3387
// We hash the completed pod manifest use that as the UID; this mimics what upstream does:
// https://github.com/kubernetes/kubernetes/blob/v1.24.0/pkg/kubelet/config/common.go#L58-68
// We manually terminate static pods with incorrect UIDs, as the kubelet cannot be relied
// upon to clean up the old one while the apiserver is down.
// See https://github.com/rancher/rke2/issues/3387 and https://github.com/rancher/rke2/issues/3725
hasher := md5.New()
fmt.Fprint(hasher, manifestPath)
pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:]))

// Append a hash of the completed pod manifest to the container environment for later use when checking
// to see if the pod has been updated. It's fine that setting this changes the actual hash; we
// just need a stable values that we can compare between the file on disk and the running
// container to see if the kubelet has reconciled yet.
hash.DeepHashObject(hasher, pod)
pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, v1.EnvVar{Name: "POD_HASH", Value: hex.EncodeToString(hasher.Sum(nil)[0:])})
fmt.Fprintf(hasher, "host:%s", os.Getenv("NODE_NAME"))
fmt.Fprintf(hasher, "file:%s", manifestPath)
pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:]))

b, err := yaml.Marshal(pod)
if err != nil {
Expand Down Expand Up @@ -213,6 +212,8 @@ func pod(args Args) (*v1.Pod, error) {
HostNetwork: true,
PriorityClassName: "system-cluster-critical",
SecurityContext: args.SecurityContext,

TerminationGracePeriodSeconds: DefaultTerminationGracePeriodSeconds,
},
}

Expand Down

0 comments on commit d4bed77

Please sign in to comment.