forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pods.go
234 lines (212 loc) · 8.67 KB
/
pods.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"fmt"
"strings"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/validation/field"
)
// listPodsByNamespaceFuncUsingClient returns a pod listing function based on the provided client.
func listPodsByNamespaceFuncUsingClient(kubeClient clientset.Interface) generic.ListFuncByNamespace {
// TODO: ideally, we could pass dynamic client pool down into this code, and have one way of doing this.
// unfortunately, dynamic client works with Unstructured objects, and when we calculate Usage, we require
// structured objects.
return func(namespace string, options api.ListOptions) ([]runtime.Object, error) {
itemList, err := kubeClient.Core().Pods(namespace).List(options)
if err != nil {
return nil, err
}
results := make([]runtime.Object, 0, len(itemList.Items))
for i := range itemList.Items {
results = append(results, &itemList.Items[i])
}
return results, nil
}
}
// NewPodEvaluator returns an evaluator that can evaluate pods
// if the specified shared informer factory is not nil, evaluator may use it to support listing functions.
func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator {
computeResources := []api.ResourceName{
api.ResourceCPU,
api.ResourceMemory,
api.ResourceRequestsCPU,
api.ResourceRequestsMemory,
api.ResourceLimitsCPU,
api.ResourceLimitsMemory,
}
allResources := append(computeResources, api.ResourcePods)
listFuncByNamespace := listPodsByNamespaceFuncUsingClient(kubeClient)
if f != nil {
listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, unversioned.GroupResource{Resource: "pods"})
}
return &generic.GenericEvaluator{
Name: "Evaluator.Pod",
InternalGroupKind: api.Kind("Pod"),
InternalOperationResources: map[admission.Operation][]api.ResourceName{
admission.Create: allResources,
// TODO: the quota system can only charge for deltas on compute resources when pods support updates.
// admission.Update: computeResources,
},
GetFuncByNamespace: func(namespace, name string) (runtime.Object, error) {
return kubeClient.Core().Pods(namespace).Get(name)
},
ConstraintsFunc: PodConstraintsFunc,
MatchedResourceNames: allResources,
MatchesScopeFunc: PodMatchesScopeFunc,
UsageFunc: PodUsageFunc,
ListFuncByNamespace: listFuncByNamespace,
}
}
// PodConstraintsFunc verifies that all required resources are present on the pod
// In addition, it validates that the resources are valid (i.e. requests < limits)
func PodConstraintsFunc(required []api.ResourceName, object runtime.Object) error {
pod, ok := object.(*api.Pod)
if !ok {
return fmt.Errorf("Unexpected input object %v", object)
}
// Pod level resources are often set during admission control
// As a consequence, we want to verify that resources are valid prior
// to ever charging quota prematurely in case they are not.
allErrs := field.ErrorList{}
fldPath := field.NewPath("spec").Child("containers")
for i, ctr := range pod.Spec.Containers {
allErrs = append(allErrs, validation.ValidateResourceRequirements(&ctr.Resources, fldPath.Index(i).Child("resources"))...)
}
fldPath = field.NewPath("spec").Child("initContainers")
for i, ctr := range pod.Spec.InitContainers {
allErrs = append(allErrs, validation.ValidateResourceRequirements(&ctr.Resources, fldPath.Index(i).Child("resources"))...)
}
if len(allErrs) > 0 {
return allErrs.ToAggregate()
}
// TODO: fix this when we have pod level cgroups
// since we do not yet pod level requests/limits, we need to ensure each
// container makes an explict request or limit for a quota tracked resource
requiredSet := quota.ToSet(required)
missingSet := sets.NewString()
for i := range pod.Spec.Containers {
enforcePodContainerConstraints(&pod.Spec.Containers[i], requiredSet, missingSet)
}
for i := range pod.Spec.InitContainers {
enforcePodContainerConstraints(&pod.Spec.InitContainers[i], requiredSet, missingSet)
}
if len(missingSet) == 0 {
return nil
}
return fmt.Errorf("must specify %s", strings.Join(missingSet.List(), ","))
}
// enforcePodContainerConstraints checks for required resources that are not set on this container and
// adds them to missingSet.
func enforcePodContainerConstraints(container *api.Container, requiredSet, missingSet sets.String) {
requests := container.Resources.Requests
limits := container.Resources.Limits
containerUsage := podUsageHelper(requests, limits)
containerSet := quota.ToSet(quota.ResourceNames(containerUsage))
if !containerSet.Equal(requiredSet) {
difference := requiredSet.Difference(containerSet)
missingSet.Insert(difference.List()...)
}
}
// podUsageHelper can summarize the pod quota usage based on requests and limits
func podUsageHelper(requests api.ResourceList, limits api.ResourceList) api.ResourceList {
result := api.ResourceList{}
result[api.ResourcePods] = resource.MustParse("1")
if request, found := requests[api.ResourceCPU]; found {
result[api.ResourceCPU] = request
result[api.ResourceRequestsCPU] = request
}
if limit, found := limits[api.ResourceCPU]; found {
result[api.ResourceLimitsCPU] = limit
}
if request, found := requests[api.ResourceMemory]; found {
result[api.ResourceMemory] = request
result[api.ResourceRequestsMemory] = request
}
if limit, found := limits[api.ResourceMemory]; found {
result[api.ResourceLimitsMemory] = limit
}
return result
}
// PodUsageFunc knows how to measure usage associated with pods
func PodUsageFunc(object runtime.Object) api.ResourceList {
pod, ok := object.(*api.Pod)
if !ok {
return api.ResourceList{}
}
// by convention, we do not quota pods that have reached an end-of-life state
if !QuotaPod(pod) {
return api.ResourceList{}
}
// TODO: fix this when we have pod level cgroups
// when we have pod level cgroups, we can just read pod level requests/limits
requests := api.ResourceList{}
limits := api.ResourceList{}
for i := range pod.Spec.Containers {
requests = quota.Add(requests, pod.Spec.Containers[i].Resources.Requests)
limits = quota.Add(limits, pod.Spec.Containers[i].Resources.Limits)
}
// InitContainers are run sequentially before other containers start, so the highest
// init container resource is compared against the sum of app containers to determine
// the effective usage for both requests and limits.
for i := range pod.Spec.InitContainers {
requests = quota.Max(requests, pod.Spec.InitContainers[i].Resources.Requests)
limits = quota.Max(limits, pod.Spec.InitContainers[i].Resources.Limits)
}
return podUsageHelper(requests, limits)
}
// PodMatchesScopeFunc is a function that knows how to evaluate if a pod matches a scope
func PodMatchesScopeFunc(scope api.ResourceQuotaScope, object runtime.Object) bool {
pod, ok := object.(*api.Pod)
if !ok {
return false
}
switch scope {
case api.ResourceQuotaScopeTerminating:
return isTerminating(pod)
case api.ResourceQuotaScopeNotTerminating:
return !isTerminating(pod)
case api.ResourceQuotaScopeBestEffort:
return isBestEffort(pod)
case api.ResourceQuotaScopeNotBestEffort:
return !isBestEffort(pod)
}
return false
}
func isBestEffort(pod *api.Pod) bool {
return qos.GetPodQOS(pod) == qos.BestEffort
}
func isTerminating(pod *api.Pod) bool {
if pod.Spec.ActiveDeadlineSeconds != nil && *pod.Spec.ActiveDeadlineSeconds >= int64(0) {
return true
}
return false
}
// QuotaPod returns true if the pod is eligible to track against a quota
// if it's not in a terminal state according to its phase.
func QuotaPod(pod *api.Pod) bool {
// see GetPhase in kubelet.go for details on how it covers all restart policy conditions
// https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/kubelet.go#L3001
return !(api.PodFailed == pod.Status.Phase || api.PodSucceeded == pod.Status.Phase)
}