Skip to content

Commit

Permalink
Merge pull request #53332 from jsravn/automated-cherry-pick-of-#46542…
Browse files Browse the repository at this point in the history
…-upstream-release-1.7

Automatic merge from submit-queue.

Automated cherry pick of #46542

Cherry pick of #46542  on release-1.7.

#53239: Ignore pods for quota marked for deletion whose node is unreachable
  • Loading branch information
Kubernetes Submit Queue committed Oct 4, 2017
2 parents 5a06927 + 7eb2f71 commit 862b0bc
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 33 deletions.
2 changes: 2 additions & 0 deletions pkg/controller/resourcequota/BUILD
Expand Up @@ -35,6 +35,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
Expand Down Expand Up @@ -62,6 +63,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/resourcequota/replenishment_controller.go
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
Expand Down Expand Up @@ -51,11 +52,11 @@ type ReplenishmentControllerOptions struct {
}

// PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not
func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) {
func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions, clock clock.Clock) func(oldObj, newObj interface{}) {
return func(oldObj, newObj interface{}) {
oldPod := oldObj.(*v1.Pod)
newPod := newObj.(*v1.Pod)
if core.QuotaV1Pod(oldPod) && !core.QuotaV1Pod(newPod) {
if core.QuotaV1Pod(oldPod, clock) && !core.QuotaV1Pod(newPod, clock) {
options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod)
}
}
Expand Down Expand Up @@ -115,9 +116,10 @@ func (r *replenishmentControllerFactory) NewController(options *ReplenishmentCon
if err != nil {
return nil, err
}
clock := clock.RealClock{}
informer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
UpdateFunc: PodReplenishmentUpdateFunc(options),
UpdateFunc: PodReplenishmentUpdateFunc(options, clock),
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
options.ResyncPeriod(),
Expand Down
Expand Up @@ -18,10 +18,12 @@ package resourcequota

import (
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
Expand Down Expand Up @@ -55,7 +57,8 @@ func TestPodReplenishmentUpdateFunc(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "pod"},
Status: v1.PodStatus{Phase: v1.PodFailed},
}
updateFunc := PodReplenishmentUpdateFunc(&options)
fakeClock := clock.NewFakeClock(time.Now())
updateFunc := PodReplenishmentUpdateFunc(&options, fakeClock)
updateFunc(oldPod, newPod)
if mockReplenish.groupKind != api.Kind("Pod") {
t.Errorf("Unexpected group kind %v", mockReplenish.groupKind)
Expand Down
2 changes: 0 additions & 2 deletions pkg/kubelet/eviction/BUILD
Expand Up @@ -59,7 +59,6 @@ go_library(
library = ":cgo_codegen",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/api/v1/helper/qos:go_default_library",
"//pkg/features:go_default_library",
Expand All @@ -72,7 +71,6 @@ go_library(
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/quota/evaluator/core:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
Expand Down
31 changes: 19 additions & 12 deletions pkg/kubelet/eviction/helpers.go
Expand Up @@ -26,14 +26,12 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/quota/evaluator/core"
)

const (
Expand Down Expand Up @@ -581,26 +579,35 @@ func memory(stats statsFunc) cmpFunc {

// adjust p1, p2 usage relative to the request (if any)
p1Memory := p1Usage[v1.ResourceMemory]
p1Spec, err := core.PodUsageFunc(p1)
if err != nil {
return -1
}
p1Request := p1Spec[api.ResourceRequestsMemory]
p1Request := podMemoryRequest(p1)
p1Memory.Sub(p1Request)

p2Memory := p2Usage[v1.ResourceMemory]
p2Spec, err := core.PodUsageFunc(p2)
if err != nil {
return 1
}
p2Request := p2Spec[api.ResourceRequestsMemory]
p2Request := podMemoryRequest(p2)
p2Memory.Sub(p2Request)

// if p2 is using more than p1, we want p2 first
return p2Memory.Cmp(p1Memory)
}
}

// podMemoryRequest returns the total memory request of a pod which is the
// max(sum of init container requests, sum of container requests)
func podMemoryRequest(pod *v1.Pod) resource.Quantity {
containerValue := resource.Quantity{Format: resource.BinarySI}
for i := range pod.Spec.Containers {
containerValue.Add(*pod.Spec.Containers[i].Resources.Requests.Memory())
}
initValue := resource.Quantity{Format: resource.BinarySI}
for i := range pod.Spec.InitContainers {
initValue.Add(*pod.Spec.InitContainers[i].Resources.Requests.Memory())
}
if containerValue.Cmp(initValue) > 0 {
return containerValue
}
return initValue
}

// disk compares pods by largest consumer of disk relative to request for the specified disk resource.
func disk(stats statsFunc, fsStatsToMeasure []fsStatsType, diskResource v1.ResourceName) cmpFunc {
return func(p1, p2 *v1.Pod) int {
Expand Down
3 changes: 3 additions & 0 deletions pkg/quota/evaluator/core/BUILD
Expand Up @@ -36,6 +36,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
Expand All @@ -55,8 +56,10 @@ go_test(
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/quota:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
],
)

Expand Down
68 changes: 56 additions & 12 deletions pkg/quota/evaluator/core/pods.go
Expand Up @@ -19,11 +19,14 @@ package core
import (
"fmt"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/admission"
Expand Down Expand Up @@ -68,20 +71,23 @@ func listPodsByNamespaceFuncUsingClient(kubeClient clientset.Interface) generic.

// NewPodEvaluator returns an evaluator that can evaluate pods
// if the specified shared informer factory is not nil, evaluator may use it to support listing functions.
func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator {
func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory, clock clock.Clock) quota.Evaluator {
listFuncByNamespace := listPodsByNamespaceFuncUsingClient(kubeClient)
if f != nil {
listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, v1.SchemeGroupVersion.WithResource("pods"))
}
return &podEvaluator{
listFuncByNamespace: listFuncByNamespace,
clock: clock,
}
}

// podEvaluator knows how to measure usage of pods.
type podEvaluator struct {
// knows how to list pods
listFuncByNamespace generic.ListFuncByNamespace
// used to track time
clock clock.Clock
}

// Constraints verifies that all required resources are present on the pod
Expand Down Expand Up @@ -148,7 +154,8 @@ func (p *podEvaluator) MatchingResources(input []api.ResourceName) []api.Resourc

// Usage knows how to measure usage associated with pods
func (p *podEvaluator) Usage(item runtime.Object) (api.ResourceList, error) {
return PodUsageFunc(item)
// delegate to normal usage
return PodUsageFunc(item, p.clock)
}

// UsageStats calculates aggregate usage for the object.
Expand Down Expand Up @@ -231,20 +238,22 @@ func podMatchesScopeFunc(scope api.ResourceQuotaScope, object runtime.Object) (b
return false, nil
}

// PodUsageFunc knows how to measure usage associated with pods
func PodUsageFunc(obj runtime.Object) (api.ResourceList, error) {
// PodUsageFunc returns the quota usage for a pod.
// A pod is charged for quota if the following are not true.
// - pod has a terminal phase (failed or succeeded)
// - pod has been marked for deletion and grace period has expired
func PodUsageFunc(obj runtime.Object, clock clock.Clock) (api.ResourceList, error) {
pod, err := toInternalPodOrError(obj)
if err != nil {
return api.ResourceList{}, err
}
// by convention, we do not quota pods that have reached an end-of-life state
if !QuotaPod(pod) {
// by convention, we do not quota pods that have reached end-of life
if !QuotaPod(pod, clock) {
return api.ResourceList{}, nil
}
requests := api.ResourceList{}
limits := api.ResourceList{}
// TODO: fix this when we have pod level cgroups
// when we have pod level cgroups, we can just read pod level requests/limits
// TODO: ideally, we have pod level requests and limits in the future.
for i := range pod.Spec.Containers {
requests = quota.Add(requests, pod.Spec.Containers[i].Resources.Requests)
limits = quota.Add(limits, pod.Spec.Containers[i].Resources.Limits)
Expand Down Expand Up @@ -272,12 +281,47 @@ func isTerminating(pod *api.Pod) bool {
}

// QuotaPod returns true if the pod is eligible to track against a quota
func QuotaPod(pod *api.Pod) bool {
return !(api.PodFailed == pod.Status.Phase || api.PodSucceeded == pod.Status.Phase)
// A pod is eligible for quota, unless any of the following are true:
// - pod has a terminal phase (failed or succeeded)
// - pod has been marked for deletion and grace period has expired.
func QuotaPod(pod *api.Pod, clock clock.Clock) bool {
// if pod is terminal, ignore it for quota
if api.PodFailed == pod.Status.Phase || api.PodSucceeded == pod.Status.Phase {
return false
}
// deleted pods that should be gone should not be charged to user quota.
// this can happen if a node is lost, and the kubelet is never able to confirm deletion.
// even though the cluster may have drifting clocks, quota makes a reasonable effort
// to balance cluster needs against user needs. user's do not control clocks,
// but at worst a small drive in clocks will only slightly impact quota.
if pod.DeletionTimestamp != nil && pod.DeletionGracePeriodSeconds != nil {
now := clock.Now()
deletionTime := pod.DeletionTimestamp.Time
gracePeriod := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
if now.After(deletionTime.Add(gracePeriod)) {
return false
}
}
return true
}

// QuotaV1Pod returns true if the pod is eligible to track against a quota
// if it's not in a terminal state according to its phase.
func QuotaV1Pod(pod *v1.Pod) bool {
return !(v1.PodFailed == pod.Status.Phase || v1.PodSucceeded == pod.Status.Phase)
func QuotaV1Pod(pod *v1.Pod, clock clock.Clock) bool {
// if pod is terminal, ignore it for quota
if v1.PodFailed == pod.Status.Phase || v1.PodSucceeded == pod.Status.Phase {
return false
}
// if pods are stuck terminating (for example, a node is lost), we do not want
// to charge the user for that pod in quota because it could prevent them from
// scaling up new pods to service their application.
if pod.DeletionTimestamp != nil && pod.DeletionGracePeriodSeconds != nil {
now := clock.Now()
deletionTime := pod.DeletionTimestamp.Time
gracePeriod := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
if now.After(deletionTime.Add(gracePeriod)) {
return false
}
}
return true
}

0 comments on commit 862b0bc

Please sign in to comment.