Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix pod eviction storage #33927

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 1 addition & 14 deletions pkg/master/master.go
Expand Up @@ -81,7 +81,6 @@ import (
storagerest "k8s.io/kubernetes/pkg/registry/storage/rest"

// direct etcd registry dependencies
podetcd "k8s.io/kubernetes/pkg/registry/core/pod/etcd"
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata/etcd"
)
Expand Down Expand Up @@ -220,6 +219,7 @@ func (c completedConfig) New() (*Master, error) {
ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange,
ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange,
ComponentStatusServerFunc: func() map[string]apiserver.Server { return getServersToValidate(c.StorageFactory) },
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
},
}

Expand Down Expand Up @@ -328,19 +328,6 @@ func (m *Master) InstallAPIs(c *Config) {
}
}

// This is here so that, if the policy group is present, the eviction
// subresource handler wil be able to find poddisruptionbudgets
// TODO(lavalamp) find a better way for groups to discover and interact
// with each other
if group == "policy" {
storage := apiGroupsInfo[0].VersionedResourcesStorageMap["v1"]["pods/eviction"]
evictionStorage := storage.(*podetcd.EvictionREST)

storage = apiGroupInfo.VersionedResourcesStorageMap["v1alpha1"]["poddisruptionbudgets"]
evictionStorage.PodDisruptionBudgetLister = storage.(rest.Lister)
evictionStorage.PodDisruptionBudgetUpdater = storage.(rest.Updater)
}

apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/master/master_test.go
Expand Up @@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/apis/rbac"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/kubelet/client"
Expand Down Expand Up @@ -95,6 +96,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
config.GenericConfig.ProxyTLSClientConfig = &tls.Config{}
config.GenericConfig.RequestContextMapper = api.NewRequestContextMapper()
config.GenericConfig.EnableVersion = true
config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}}
config.EnableCoreControllers = false

// TODO: this is kind of hacky. The trouble is that the sync loop
Expand Down
156 changes: 3 additions & 153 deletions pkg/registry/core/pod/etcd/etcd.go
Expand Up @@ -27,9 +27,8 @@ import (
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apis/policy"
policyclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/unversioned"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/registry/core/pod"
podrest "k8s.io/kubernetes/pkg/registry/core/pod/rest"
Expand Down Expand Up @@ -59,7 +58,7 @@ type REST struct {
}

// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
prefix := "/" + opts.ResourcePrefix

newListFunc := func() runtime.Object { return &api.PodList{} }
Expand Down Expand Up @@ -105,7 +104,7 @@ func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTr
return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
Eviction: &EvictionREST{store: store},
Eviction: newEvictionStorage(store, podDisruptionBudgetClient),
Status: &StatusREST{store: &statusStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},
Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
Expand All @@ -123,155 +122,6 @@ func (r *REST) ResourceLocation(ctx api.Context, name string) (*url.URL, http.Ro
return pod.ResourceLocation(r, r.proxyTransport, ctx, name)
}

// EvictionREST implements the REST endpoint for evicting pods from nodes when etcd is in use.
type EvictionREST struct {
store *registry.Store
PodDisruptionBudgetLister rest.Lister
PodDisruptionBudgetUpdater rest.Updater
}

var _ = rest.Creater(&EvictionREST{})

// New creates a new eviction resource
func (r *EvictionREST) New() runtime.Object {
return &policy.Eviction{}
}

// Create attempts to create a new eviction. That is, it tries to evict a pod.
func (r *EvictionREST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
eviction := obj.(*policy.Eviction)

obj, err := r.store.Get(ctx, eviction.Name)
if err != nil {
return nil, err
}
pod := obj.(*api.Pod)
pdbs, err := r.getPodDisruptionBudgets(ctx, pod)
if err != nil {
return nil, err
}

if len(pdbs) > 1 {
return &unversioned.Status{
Status: unversioned.StatusFailure,
Message: "This pod has more than one PodDisruptionBudget, which the eviction subresource does not support.",
Code: 500,
}, nil
} else if len(pdbs) == 1 {
pdb := pdbs[0]
// Try to verify-and-decrement

// If it was false already, or if it becomes false during the course of our retries,
// raise an error marked as a 429.
ok, err := r.checkAndDecrement(ctx, pdb)
if err != nil {
return nil, err
}

if !ok {
return &unversioned.Status{
Status: unversioned.StatusFailure,
// TODO(mml): Include some more details about why the eviction is disallowed.
// Ideally any such text is generated by the DisruptionController (offline).
Message: "Cannot evict pod as it would violate the pod's disruption budget.",
Code: 429,
// TODO(mml): Add a Retry-After header. Once there are time-based
// budgets, we can sometimes compute a sensible suggested value. But
// even without that, we can give a suggestion (10 minutes?) that
// prevents well-behaved clients from hammering us.
}, nil
}
}

// At this point there was either no PDB or we succeded in decrementing

// Try the delete
_, err = r.store.Delete(ctx, eviction.Name, eviction.DeleteOptions)
if err != nil {
return nil, err
}

// Success!
return &unversioned.Status{Status: unversioned.StatusSuccess}, nil
}

// UpdatedObjectInfo is a simple interface for attempting updates to
// runtime.Objects. EvictionREST implements it directly.
var _ = rest.UpdatedObjectInfo(&EvictionREST{})

// Preconditions returns any preconditions required prior to updating the
// PDB. None currently.
func (r *EvictionREST) Preconditions() *api.Preconditions {
return nil
}

// UpdatedObject returns the updated PDB if it is able to update
// PodDisruptionAllowed from true->false.
func (r *EvictionREST) UpdatedObject(ctx api.Context, oldObj runtime.Object) (newObj runtime.Object, err error) {
copy, err := api.Scheme.DeepCopy(oldObj)
if err != nil {
return
}
newObj = copy.(runtime.Object)
pdb := oldObj.(*policy.PodDisruptionBudget)
if !pdb.Status.PodDisruptionAllowed {
return nil, fmt.Errorf("PodDisruptionAllowed is already false")
}
pdb.Status.PodDisruptionAllowed = false

return
}

func (r *EvictionREST) checkAndDecrement(ctx api.Context, pdb policy.PodDisruptionBudget) (ok bool, err error) {
if !pdb.Status.PodDisruptionAllowed {
return false, nil
}
newObj, _, err := r.PodDisruptionBudgetUpdater.Update(ctx, pdb.Name, r)
if err != nil {
return false, err
}

newPdb := newObj.(*policy.PodDisruptionBudget)
if newPdb.Status.PodDisruptionAllowed {
return false, fmt.Errorf("update did not succeed")
}

return true, nil
}

// Returns any PDBs that match the pod.
// err is set if there's an error.
func (r *EvictionREST) getPodDisruptionBudgets(ctx api.Context, pod *api.Pod) (pdbs []policy.PodDisruptionBudget, err error) {
if len(pod.Labels) == 0 {
return
}

l, err := r.PodDisruptionBudgetLister.List(ctx, nil)
if err != nil {
return
}

pdbList := l.(*policy.PodDisruptionBudgetList)

for _, pdb := range pdbList.Items {
if pdb.Namespace != pod.Namespace {
continue
}
selector, err := unversioned.LabelSelectorAsSelector(pdb.Spec.Selector)
if err != nil {
continue
}
// If a PDB with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}

pdbs = append(pdbs, pdb)
}

return pdbs, nil
}

// BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
type BindingREST struct {
store *registry.Store
Expand Down
4 changes: 2 additions & 2 deletions pkg/registry/core/pod/etcd/etcd_test.go
Expand Up @@ -40,7 +40,7 @@ import (
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 3}
storage := NewStorage(restOptions, nil, nil)
storage := NewStorage(restOptions, nil, nil, nil)
return storage.Pod, storage.Binding, storage.Status, server
}

Expand Down Expand Up @@ -148,7 +148,7 @@ func (f FailDeletionStorage) Delete(ctx context.Context, key string, out runtime
func newFailDeleteStorage(t *testing.T, called *bool) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 3}
storage := NewStorage(restOptions, nil, nil)
storage := NewStorage(restOptions, nil, nil, nil)
storage.Pod.Store.Storage = FailDeletionStorage{storage.Pod.Store.Storage, called}
return storage.Pod, server
}
Expand Down