-
Notifications
You must be signed in to change notification settings - Fork 38.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stop 'kubectl drain' deleting pods with local storage. #26667
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,9 +27,8 @@ import ( | |
|
||
"k8s.io/kubernetes/pkg/api" | ||
"k8s.io/kubernetes/pkg/api/meta" | ||
"k8s.io/kubernetes/pkg/controller" | ||
// "k8s.io/kubernetes/pkg/api/unversioned" | ||
client "k8s.io/kubernetes/pkg/client/unversioned" | ||
"k8s.io/kubernetes/pkg/controller" | ||
"k8s.io/kubernetes/pkg/fields" | ||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" | ||
"k8s.io/kubernetes/pkg/kubectl/resource" | ||
|
@@ -43,14 +42,31 @@ type DrainOptions struct { | |
Force bool | ||
GracePeriodSeconds int | ||
IgnoreDaemonsets bool | ||
DeleteLocalData bool | ||
mapper meta.RESTMapper | ||
nodeInfo *resource.Info | ||
out io.Writer | ||
typer runtime.ObjectTyper | ||
} | ||
|
||
// Takes a pod and returns a bool indicating whether or not to operate on the | ||
// pod, an optional warning message, and an optional fatal error. | ||
type podFilter func(api.Pod) (include bool, w *warning, f *fatal) | ||
type warning struct { | ||
string | ||
} | ||
type fatal struct { | ||
string | ||
} | ||
|
||
const ( | ||
cordon_long = `Mark node as unschedulable. | ||
kDaemonsetFatal = "DaemonSet-managed pods (use --ignore-daemonsets to ignore)" | ||
kDaemonsetWarning = "Ignoring DaemonSet-managed pods" | ||
kLocalStorageFatal = "pods with local storage (use --delete-local-data to override)" | ||
kLocalStorageWarning = "Deleting pods with local storage" | ||
kUnmanagedFatal = "pods not managed by ReplicationController, ReplicaSet, Job, or DaemonSet (use --force to override)" | ||
kUnmanagedWarning = "Deleting pods not managed by ReplicationController, ReplicaSet, Job, or DaemonSet" | ||
cordon_long = `Mark node as unschedulable. | ||
` | ||
cordon_example = `# Mark node "foo" as unschedulable. | ||
kubectl cordon foo | ||
|
@@ -136,6 +152,7 @@ func NewCmdDrain(f *cmdutil.Factory, out io.Writer) *cobra.Command { | |
} | ||
cmd.Flags().BoolVar(&options.Force, "force", false, "Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet.") | ||
cmd.Flags().BoolVar(&options.IgnoreDaemonsets, "ignore-daemonsets", false, "Ignore DaemonSet-managed pods.") | ||
cmd.Flags().BoolVar(&options.DeleteLocalData, "delete-local-data", false, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).") | ||
cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.") | ||
return cmd | ||
} | ||
|
@@ -195,148 +212,151 @@ func (o *DrainOptions) RunDrain() error { | |
return nil | ||
} | ||
|
||
// getPodsForDeletion returns all the pods we're going to delete. If there are | ||
// any unmanaged pods and the user didn't pass --force, we return that list in | ||
// an error. | ||
func (o *DrainOptions) getPodsForDeletion() ([]api.Pod, error) { | ||
pods, unreplicatedPodNames, daemonSetPodNames, err := GetPodsForDeletionOnNodeDrain( | ||
o.client, | ||
o.nodeInfo.Name, | ||
o.factory.Decoder(true), | ||
o.Force, | ||
o.IgnoreDaemonsets, | ||
) | ||
func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, error) { | ||
switch sr.Reference.Kind { | ||
case "ReplicationController": | ||
return o.client.ReplicationControllers(sr.Reference.Namespace).Get(sr.Reference.Name) | ||
case "DaemonSet": | ||
return o.client.DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name) | ||
case "Job": | ||
return o.client.ExtensionsClient.Jobs(sr.Reference.Namespace).Get(sr.Reference.Name) | ||
case "ReplicaSet": | ||
return o.client.ExtensionsClient.ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name) | ||
} | ||
return nil, fmt.Errorf("Unknown controller kind %q", sr.Reference.Kind) | ||
} | ||
|
||
func (o *DrainOptions) getPodCreator(pod api.Pod) (*api.SerializedReference, error) { | ||
creatorRef, found := pod.ObjectMeta.Annotations[controller.CreatedByAnnotation] | ||
if !found { | ||
return nil, nil | ||
} | ||
|
||
// Now verify that the specified creator actually exists. | ||
sr := &api.SerializedReference{} | ||
if err := runtime.DecodeInto(o.factory.Decoder(true), []byte(creatorRef), sr); err != nil { | ||
return nil, err | ||
} | ||
// We assume the only reason for an error is because the controller is | ||
// gone/missing, not for any other cause. TODO(mml): something more | ||
// sophisticated than this | ||
_, err := o.getController(sr) | ||
if err != nil { | ||
return []api.Pod{}, err | ||
return nil, err | ||
} | ||
return sr, nil | ||
} | ||
|
||
daemonSetErrors := !o.IgnoreDaemonsets && len(daemonSetPodNames) > 0 | ||
unreplicatedErrors := !o.Force && len(unreplicatedPodNames) > 0 | ||
func (o *DrainOptions) unreplicatedFilter(pod api.Pod) (bool, *warning, *fatal) { | ||
sr, err := o.getPodCreator(pod) | ||
if err != nil { | ||
return false, nil, &fatal{err.Error()} | ||
} | ||
if sr != nil { | ||
return true, nil, nil | ||
} | ||
if !o.Force { | ||
return false, nil, &fatal{kUnmanagedFatal} | ||
} | ||
return true, &warning{kUnmanagedWarning}, nil | ||
} | ||
|
||
switch { | ||
case daemonSetErrors && unreplicatedErrors: | ||
return []api.Pod{}, errors.New(unmanagedMsg(unreplicatedPodNames, daemonSetPodNames, true)) | ||
case daemonSetErrors && !unreplicatedErrors: | ||
return []api.Pod{}, errors.New(unmanagedMsg([]string{}, daemonSetPodNames, true)) | ||
case unreplicatedErrors && !daemonSetErrors: | ||
return []api.Pod{}, errors.New(unmanagedMsg(unreplicatedPodNames, []string{}, true)) | ||
func (o *DrainOptions) daemonsetFilter(pod api.Pod) (bool, *warning, *fatal) { | ||
// Note that we return false in all cases where the pod is DaemonSet managed, | ||
// regardless of flags. We never delete them, the only question is whether | ||
// their presence constitutes an error. | ||
sr, err := o.getPodCreator(pod) | ||
if err != nil { | ||
return false, nil, &fatal{err.Error()} | ||
} | ||
if sr == nil || sr.Reference.Kind != "DaemonSet" { | ||
return true, nil, nil | ||
} | ||
if _, err := o.client.DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name); err != nil { | ||
return false, nil, &fatal{err.Error()} | ||
} | ||
if !o.IgnoreDaemonsets { | ||
return false, nil, &fatal{kDaemonsetFatal} | ||
} | ||
return false, &warning{kDaemonsetWarning}, nil | ||
} | ||
|
||
if len(unreplicatedPodNames) > 0 { | ||
fmt.Fprintf(o.out, "WARNING: About to delete these %s\n", unmanagedMsg(unreplicatedPodNames, []string{}, false)) | ||
func mirrorPodFilter(pod api.Pod) (bool, *warning, *fatal) { | ||
if _, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey]; found { | ||
return false, nil, nil | ||
} | ||
if len(daemonSetPodNames) > 0 { | ||
fmt.Fprintf(o.out, "WARNING: Skipping %s\n", unmanagedMsg([]string{}, daemonSetPodNames, false)) | ||
return true, nil, nil | ||
} | ||
|
||
func hasLocalStorage(pod api.Pod) bool { | ||
for _, volume := range pod.Spec.Volumes { | ||
if volume.EmptyDir != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Matt and I discussed this and eventually decided to leave it out. We don't know what people are going to do after the drain. They might be doing something that doesn't delete all the data (for example, upgrade OS and reboot). Arguably we should have yet another flag to control whether to consider hostPath, but for simplicity we decided to just block on the case we know for sure will be deleted, namely emptyDir. For cluster scale-down, the node goes away "forever" so it makes sense to block on hostPath. But for |
||
return true | ||
} | ||
} | ||
|
||
return pods, nil | ||
return false | ||
} | ||
|
||
func (o *DrainOptions) localStorageFilter(pod api.Pod) (bool, *warning, *fatal) { | ||
if !hasLocalStorage(pod) { | ||
return true, nil, nil | ||
} | ||
if !o.DeleteLocalData { | ||
return false, nil, &fatal{kLocalStorageFatal} | ||
} | ||
return true, &warning{kLocalStorageWarning}, nil | ||
} | ||
|
||
// GetPodsForDeletionOnNodeDrain returns pods that should be deleted on node drain as well as some extra information | ||
// about possibly problematic pods (unreplicated and deamon sets). | ||
func GetPodsForDeletionOnNodeDrain(client *client.Client, nodename string, decoder runtime.Decoder, force bool, | ||
ignoreDeamonSet bool) (pods []api.Pod, unreplicatedPodNames []string, daemonSetPodNames []string, finalError error) { | ||
// Map of status message to a list of pod names having that status. | ||
type podStatuses map[string][]string | ||
|
||
func (ps podStatuses) Message() string { | ||
msgs := []string{} | ||
|
||
for key, pods := range ps { | ||
msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", "))) | ||
} | ||
return strings.Join(msgs, "; ") | ||
} | ||
|
||
pods = []api.Pod{} | ||
unreplicatedPodNames = []string{} | ||
daemonSetPodNames = []string{} | ||
podList, err := client.Pods(api.NamespaceAll).List(api.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodename})}) | ||
// getPodsForDeletion returns all the pods we're going to delete. If there are | ||
// any pods preventing us from deleting, we return that list in an error. | ||
func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) { | ||
podList, err := o.client.Pods(api.NamespaceAll).List(api.ListOptions{ | ||
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeInfo.Name})}) | ||
if err != nil { | ||
return []api.Pod{}, []string{}, []string{}, err | ||
return pods, err | ||
} | ||
|
||
ws := podStatuses{} | ||
fs := podStatuses{} | ||
|
||
for _, pod := range podList.Items { | ||
_, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey] | ||
if found { | ||
// Skip mirror pod | ||
continue | ||
} | ||
replicated := false | ||
daemonset_pod := false | ||
|
||
creatorRef, found := pod.ObjectMeta.Annotations[controller.CreatedByAnnotation] | ||
if found { | ||
// Now verify that the specified creator actually exists. | ||
var sr api.SerializedReference | ||
if err := runtime.DecodeInto(decoder, []byte(creatorRef), &sr); err != nil { | ||
return []api.Pod{}, []string{}, []string{}, err | ||
podOk := true | ||
for _, filt := range []podFilter{mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter, o.daemonsetFilter} { | ||
filterOk, w, f := filt(pod) | ||
|
||
podOk = podOk && filterOk | ||
if w != nil { | ||
ws[w.string] = append(ws[w.string], pod.Name) | ||
} | ||
if sr.Reference.Kind == "ReplicationController" { | ||
rc, err := client.ReplicationControllers(sr.Reference.Namespace).Get(sr.Reference.Name) | ||
// Assume the only reason for an error is because the RC is | ||
// gone/missing, not for any other cause. TODO(mml): something more | ||
// sophisticated than this | ||
if err == nil && rc != nil { | ||
replicated = true | ||
} | ||
} else if sr.Reference.Kind == "DaemonSet" { | ||
ds, err := client.DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name) | ||
|
||
// Assume the only reason for an error is because the DaemonSet is | ||
// gone/missing, not for any other cause. TODO(mml): something more | ||
// sophisticated than this | ||
if err == nil && ds != nil { | ||
// Otherwise, treat daemonset-managed pods as unmanaged since | ||
// DaemonSet Controller currently ignores the unschedulable bit. | ||
// FIXME(mml): Add link to the issue concerning a proper way to drain | ||
// daemonset pods, probably using taints. | ||
daemonset_pod = true | ||
} | ||
} else if sr.Reference.Kind == "Job" { | ||
job, err := client.ExtensionsClient.Jobs(sr.Reference.Namespace).Get(sr.Reference.Name) | ||
|
||
// Assume the only reason for an error is because the Job is | ||
// gone/missing, not for any other cause. TODO(mml): something more | ||
// sophisticated than this | ||
if err == nil && job != nil { | ||
replicated = true | ||
} | ||
} else if sr.Reference.Kind == "ReplicaSet" { | ||
rs, err := client.ExtensionsClient.ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name) | ||
|
||
// Assume the only reason for an error is because the RS is | ||
// gone/missing, not for any other cause. TODO(mml): something more | ||
// sophisticated than this | ||
if err == nil && rs != nil { | ||
replicated = true | ||
} | ||
if f != nil { | ||
fs[f.string] = append(fs[f.string], pod.Name) | ||
} | ||
} | ||
|
||
switch { | ||
case daemonset_pod: | ||
daemonSetPodNames = append(daemonSetPodNames, pod.Name) | ||
case !replicated: | ||
unreplicatedPodNames = append(unreplicatedPodNames, pod.Name) | ||
if force { | ||
pods = append(pods, pod) | ||
} | ||
default: | ||
if podOk { | ||
pods = append(pods, pod) | ||
} | ||
} | ||
return pods, unreplicatedPodNames, daemonSetPodNames, nil | ||
} | ||
|
||
// Helper for generating errors or warnings about unmanaged pods. | ||
func unmanagedMsg(unreplicatedNames []string, daemonSetNames []string, include_guidance bool) string { | ||
msgs := []string{} | ||
if len(unreplicatedNames) > 0 { | ||
msg := fmt.Sprintf("pods not managed by ReplicationController, ReplicaSet, Job, or DaemonSet: %s", strings.Join(unreplicatedNames, ",")) | ||
if include_guidance { | ||
msg += " (use --force to override)" | ||
} | ||
msgs = append(msgs, msg) | ||
if len(fs) > 0 { | ||
return []api.Pod{}, errors.New(fs.Message()) | ||
} | ||
if len(daemonSetNames) > 0 { | ||
msg := fmt.Sprintf("DaemonSet-managed pods: %s", strings.Join(daemonSetNames, ",")) | ||
if include_guidance { | ||
msg += " (use --ignore-daemonsets to ignore)" | ||
} | ||
msgs = append(msgs, msg) | ||
if len(ws) > 0 { | ||
fmt.Fprintf(o.out, "WARNING: %s\n", ws.Message()) | ||
} | ||
|
||
return strings.Join(msgs, " and ") | ||
return pods, nil | ||
} | ||
|
||
// deletePods deletes the pods on the api server | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. No matter what, we never delete the daemonset pods because the DS controller doesn't respect unschedulable. The flag only toggles whether we consider their presence an error or not.
I will add a comment that clarifies this.