Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix static pod UID generation and cleanup #4508

Merged
merged 1 commit into from Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/rke2/rke2.go
Expand Up @@ -103,7 +103,7 @@ func Server(clx *cli.Context, cfg Config) error {
}
dataDir := clx.String("data-dir")
cmds.ServerConfig.StartupHooks = append(cmds.ServerConfig.StartupHooks,
checkStaticManifests(cmds.AgentConfig.ContainerRuntimeEndpoint, dataDir),
reconcileStaticPods(cmds.AgentConfig.ContainerRuntimeEndpoint, dataDir),
setNetworkPolicies(cisMode, defaultNamespaces),
setClusterRoles(),
restrictServiceAccounts(cisMode, defaultNamespaces),
Expand Down
74 changes: 34 additions & 40 deletions pkg/rke2/spw.go
Expand Up @@ -2,7 +2,6 @@ package rke2

import (
"context"
"encoding/json"
"os"
"path/filepath"
"sync"
Expand All @@ -22,11 +21,11 @@ type containerInfo struct {
Config *runtimeapi.ContainerConfig `json:"config,omitempty"`
}

// checkStaticManifests validates that the pods started with rke2 match the static manifests
// provided in /var/lib/rancher/rke2/agent/pod-manifests. When restarting rke2, it takes time
// for any changes to static manifests to be pulled by kubelet. Additionally this prevents errors
// where something is wrong with the static manifests and RKE2 starts anyways.
func checkStaticManifests(containerRuntimeEndpoint, dataDir string) cmds.StartupHook {
// reconcileStaticPods validates that the running pods for etcd and kube-apiserver match the static pod
// manifests provided in /var/lib/rancher/rke2/agent/pod-manifests. If any old pods are found, they are
// manually terminated, as the kubelet cannot be relied upon to terminate old pod when the apiserver is
// not available.
func reconcileStaticPods(containerRuntimeEndpoint, dataDir string) cmds.StartupHook {
return func(ctx context.Context, wg *sync.WaitGroup, args cmds.StartupHookArgs) error {
go func() {
defer wg.Done()
Expand All @@ -51,22 +50,22 @@ func checkStaticManifests(containerRuntimeEndpoint, dataDir string) cmds.Startup
// Since split-role servers exist, we don't care if no manifest is found
continue
}
logrus.Infof("Container for %s not found (%v), retrying", pod, err)
logrus.Infof("Pod for %s not synced (%v), retrying", pod, err)
return false, nil
}
logrus.Infof("Container for %s is running", pod)
logrus.Infof("Pod for %s is synced", pod)
}
return true, nil
}); err != nil {
logrus.Fatalf("Failed waiting for static pods to deploy: %v", err)
logrus.Fatalf("Failed waiting for static pods to sync: %v", err)
}
}()
return nil
}
}

// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and
// verified as present and running with the current pod hash in the container runtime.
// checkManifestDeployed returns an error if the static pod's manifest cannot be decoded and verified as present
// and exclusively running with the current pod uid. If old pods are found, they will be terminated and an error returned.
func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServiceClient, manifestFile string) error {
f, err := os.Open(manifestFile)
if err != nil {
Expand All @@ -81,43 +80,38 @@ func checkManifestDeployed(ctx context.Context, cRuntime runtimeapi.RuntimeServi
return errors.Wrap(err, "failed to decode manifest")
}

var podHash string
for _, env := range podManifest.Spec.Containers[0].Env {
if env.Name == "POD_HASH" {
podHash = env.Value
break
}
}

filter := &runtimeapi.ContainerFilter{
State: &runtimeapi.ContainerStateValue{
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
},
filter := &runtimeapi.PodSandboxFilter{
LabelSelector: map[string]string{
"io.kubernetes.pod.uid": string(podManifest.UID),
"component": podManifest.Labels["component"],
"io.kubernetes.pod.namespace": podManifest.Namespace,
"tier": podManifest.Labels["tier"],
},
}

resp, err := cRuntime.ListContainers(ctx, &runtimeapi.ListContainersRequest{Filter: filter})
resp, err := cRuntime.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{Filter: filter})
if err != nil {
return errors.Wrap(err, "failed to list containers")
return errors.Wrap(err, "failed to list pods")
}

for _, container := range resp.Containers {
resp, err := cRuntime.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{ContainerId: container.Id, Verbose: true})
if err != nil {
return errors.Wrap(err, "failed to get container status")
var currentPod, stalePod bool
for _, pod := range resp.Items {
if pod.Annotations["kubernetes.io/config.source"] != "file" {
continue
}
info := &containerInfo{}
err = json.Unmarshal([]byte(resp.Info["info"]), &info)
if err != nil || info.Config == nil {
return errors.Wrap(err, "failed to unmarshal container config")
}
for _, env := range info.Config.Envs {
if env.Key == "POD_HASH" && env.Value == podHash {
return nil
if pod.Labels["io.kubernetes.pod.uid"] == string(podManifest.UID) {
currentPod = pod.State == runtimeapi.PodSandboxState_SANDBOX_READY
} else {
stalePod = true
if _, err := cRuntime.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{PodSandboxId: pod.Id}); err != nil {
logrus.Warnf("Failed to terminate old %s pod: %v", pod.Metadata.Name, err)
}
}
}
return errors.New("no matching container found")

if stalePod {
return errors.New("waiting for termination of old pod")
}
if !currentPod {
return errors.New("no current running pod found")
}
return nil
}
21 changes: 7 additions & 14 deletions pkg/staticpod/staticpod.go
Expand Up @@ -94,22 +94,15 @@ func Run(dir string, args Args) error {

manifestPath := filepath.Join(dir, args.Command+".yaml")

// Generate a stable UID based on the manifest path. This allows the kubelet to reconcile the pod's
// containers even when the apiserver is unavailable. If the UID is not stable, the kubelet
// will consider the manifest change as two separate add/remove operations, and may start the new pod
// before terminating the old one. Cleanup of removed pods is disabled until all sources have synced,
// so if the apiserver is down, the newly added pod may get stuck in a crash loop due to the old pod
// still using its ports. See https://github.com/rancher/rke2/issues/3387
// We hash the completed pod manifest use that as the UID; this mimics what upstream does:
// https://github.com/kubernetes/kubernetes/blob/v1.24.0/pkg/kubelet/config/common.go#L58-68
// We manually terminate static pods with incorrect UIDs, as the kubelet cannot be relied
// upon to clean up the old one while the apiserver is down.
// See https://github.com/rancher/rke2/issues/3387 and https://github.com/rancher/rke2/issues/3725
hasher := md5.New()
fmt.Fprint(hasher, manifestPath)
pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:]))

// Append a hash of the completed pod manifest to the container environment for later use when checking
// to see if the pod has been updated. It's fine that setting this changes the actual hash; we
// just need a stable values that we can compare between the file on disk and the running
// container to see if the kubelet has reconciled yet.
hash.DeepHashObject(hasher, pod)
pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, v1.EnvVar{Name: "POD_HASH", Value: hex.EncodeToString(hasher.Sum(nil)[0:])})
fmt.Fprintf(hasher, "file:%s", manifestPath)
pod.UID = types.UID(hex.EncodeToString(hasher.Sum(nil)[0:]))

b, err := yaml.Marshal(pod)
if err != nil {
Expand Down