/
cpu_manager.go
466 lines (389 loc) · 17.1 KB
/
cpu_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cpumanager
import (
"fmt"
"math"
"sync"
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/status"
)
// ActivePodsFunc is a function that returns a list of pods to reconcile.
type ActivePodsFunc func() []*v1.Pod
type runtimeService interface {
UpdateContainerResources(id string, resources *runtimeapi.LinuxContainerResources) error
}
type policyName string
// cpuManagerStateFileName is the file name where cpu manager stores its state
const cpuManagerStateFileName = "cpu_manager_state"
// Manager interface provides methods for Kubelet to manage pod cpus.
type Manager interface {
// Start is called during Kubelet initialization.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
// Called to trigger the allocation of CPUs to a container. This must be
// called at some point prior to the AddContainer() call for a container,
// e.g. at pod admission time.
Allocate(pod *v1.Pod, container *v1.Container) error
// AddContainer is called between container create and container start
// so that initial CPU affinity settings can be written through to the
// container runtime before the first process begins to execute.
AddContainer(p *v1.Pod, c *v1.Container, containerID string) error
// RemoveContainer is called after Kubelet decides to kill or delete a
// container. After this call, the CPU manager stops trying to reconcile
// that container and any CPUs dedicated to the container are freed.
RemoveContainer(containerID string) error
// State returns a read-only interface to the internal CPU manager state.
State() state.Reader
// GetTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
}
type manager struct {
sync.Mutex
policy Policy
// reconcilePeriod is the duration between calls to reconcileState.
reconcilePeriod time.Duration
// state allows pluggable CPU assignment policies while sharing a common
// representation of state for the system to inspect and reconcile.
state state.State
// containerRuntime is the container runtime service interface needed
// to make UpdateContainerResources() calls against the containers.
containerRuntime runtimeService
// activePods is a method for listing active pods on the node
// so all the containers can be updated in the reconciliation loop.
activePods ActivePodsFunc
// podStatusProvider provides a method for obtaining pod statuses
// and the containerID of their containers
podStatusProvider status.PodStatusProvider
// containerMap provides a mapping from (pod, container) -> containerID
// for all containers a pod
containerMap containermap.ContainerMap
topology *topology.CPUTopology
nodeAllocatableReservation v1.ResourceList
// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
// We use it to determine when we can purge inactive pods from checkpointed state.
sourcesReady config.SourcesReady
// stateFileDirectory holds the directory where the state file for checkpoints is held.
stateFileDirectory string
}
var _ Manager = &manager{}
type sourcesReadyStub struct{}
func (s *sourcesReadyStub) AddSource(source string) {}
func (s *sourcesReadyStub) AllReady() bool { return true }
// NewManager creates new cpu manager based on provided policy
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
var topo *topology.CPUTopology
var policy Policy
switch policyName(cpuPolicyName) {
case PolicyNone:
policy = NewNonePolicy()
case PolicyStatic:
var err error
topo, err = topology.Discover(machineInfo)
if err != nil {
return nil, err
}
klog.Infof("[cpumanager] detected CPU topology: %v", topo)
reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
if !ok {
// The static policy cannot initialize without this information.
return nil, fmt.Errorf("[cpumanager] unable to determine reserved CPU resources for static policy")
}
if reservedCPUs.IsZero() {
// The static policy requires this to be nonzero. Zero CPU reservation
// would allow the shared pool to be completely exhausted. At that point
// either we would violate our guarantee of exclusivity or need to evict
// any pod that has at least one container that requires zero CPUs.
// See the comments in policy_static.go for more details.
return nil, fmt.Errorf("[cpumanager] the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero")
}
// Take the ceiling of the reservation, since fractional CPUs cannot be
// exclusively allocated.
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)
if err != nil {
return nil, fmt.Errorf("new static policy error: %v", err)
}
default:
return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)
}
manager := &manager{
policy: policy,
reconcilePeriod: reconcilePeriod,
topology: topo,
nodeAllocatableReservation: nodeAllocatableReservation,
stateFileDirectory: stateFileDirectory,
}
manager.sourcesReady = &sourcesReadyStub{}
return manager, nil
}
func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
klog.Infof("[cpumanager] starting with %s policy", m.policy.Name())
klog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod)
m.sourcesReady = sourcesReady
m.activePods = activePods
m.podStatusProvider = podStatusProvider
m.containerRuntime = containerRuntime
m.containerMap = initialContainers
stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
if err != nil {
klog.Errorf("[cpumanager] could not initialize checkpoint manager: %v, please drain node and remove policy state file", err)
return err
}
m.state = stateImpl
err = m.policy.Start(m.state)
if err != nil {
klog.Errorf("[cpumanager] policy start error: %v", err)
return err
}
if m.policy.Name() == string(PolicyNone) {
return nil
}
// Periodically call m.reconcileState() to continue to keep the CPU sets of
// all pods in sync with and guaranteed CPUs handed out among them.
go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
return nil
}
func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
// Garbage collect any stranded resources before allocating CPUs.
m.removeStaleState()
m.Lock()
defer m.Unlock()
// Call down into the policy to assign this container CPUs if required.
err := m.policy.Allocate(m.state, p, c)
if err != nil {
klog.Errorf("[cpumanager] Allocate error: %v", err)
return err
}
return nil
}
func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
m.Lock()
// Get the CPUs assigned to the container during Allocate()
// (or fall back to the default CPUSet if none were assigned).
cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name)
m.Unlock()
if !cpus.IsEmpty() {
err := m.updateContainerCPUSet(containerID, cpus)
if err != nil {
klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err)
m.Lock()
err := m.policyRemoveContainerByRef(string(p.UID), c.Name)
if err != nil {
klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err)
}
m.Unlock()
}
return err
}
klog.V(5).Infof("[cpumanager] update container resources is skipped due to cpu set is empty")
return nil
}
func (m *manager) RemoveContainer(containerID string) error {
m.Lock()
defer m.Unlock()
err := m.policyRemoveContainerByID(containerID)
if err != nil {
klog.Errorf("[cpumanager] RemoveContainer error: %v", err)
return err
}
return nil
}
func (m *manager) policyRemoveContainerByID(containerID string) error {
podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
if err != nil {
return nil
}
err = m.policy.RemoveContainer(m.state, podUID, containerName)
if err == nil {
m.containerMap.RemoveByContainerID(containerID)
}
return err
}
func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) error {
err := m.policy.RemoveContainer(m.state, podUID, containerName)
if err == nil {
m.containerMap.RemoveByContainerRef(podUID, containerName)
}
return err
}
func (m *manager) State() state.Reader {
return m.state
}
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
return m.policy.GetTopologyHints(m.state, pod, container)
}
type reconciledContainer struct {
podName string
containerName string
containerID string
}
func (m *manager) removeStaleState() {
// Only once all sources are ready do we attempt to remove any stale state.
// This ensures that the call to `m.activePods()` below will succeed with
// the actual active pods list.
if !m.sourcesReady.AllReady() {
return
}
// We grab the lock to ensure that no new containers will grab CPUs while
// executing the code below. Without this lock, its possible that we end up
// removing state that is newly added by an asynchronous call to
// AddContainer() during the execution of this code.
m.Lock()
defer m.Unlock()
// Get the list of active pods.
activePods := m.activePods()
// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
activeContainers := make(map[string]map[string]struct{})
for _, pod := range activePods {
activeContainers[string(pod.UID)] = make(map[string]struct{})
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
activeContainers[string(pod.UID)][container.Name] = struct{}{}
}
}
// Loop through the CPUManager state. Remove any state for containers not
// in the `activeContainers` list built above.
assignments := m.state.GetCPUAssignments()
for podUID := range assignments {
for containerName := range assignments[podUID] {
if _, ok := activeContainers[podUID][containerName]; !ok {
klog.Errorf("[cpumanager] removeStaleState: removing (pod %s, container: %s)", podUID, containerName)
err := m.policyRemoveContainerByRef(podUID, containerName)
if err != nil {
klog.Errorf("[cpumanager] removeStaleState: failed to remove (pod %s, container %s), error: %v)", podUID, containerName, err)
}
}
}
}
}
func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) {
success = []reconciledContainer{}
failure = []reconciledContainer{}
m.removeStaleState()
for _, pod := range m.activePods() {
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
if !ok {
klog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s)", pod.Name)
failure = append(failure, reconciledContainer{pod.Name, "", ""})
continue
}
allContainers := pod.Spec.InitContainers
allContainers = append(allContainers, pod.Spec.Containers...)
for _, container := range allContainers {
containerID, err := findContainerIDByName(&pstatus, container.Name)
if err != nil {
klog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
continue
}
cstatus, err := findContainerStatusByName(&pstatus, container.Name)
if err != nil {
klog.Warningf("[cpumanager] reconcileState: skipping container; container status not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
continue
}
if cstatus.State.Waiting != nil ||
(cstatus.State.Waiting == nil && cstatus.State.Running == nil && cstatus.State.Terminated == nil) {
klog.Warningf("[cpumanager] reconcileState: skipping container; container still in the waiting state (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
continue
}
m.Lock()
if cstatus.State.Terminated != nil {
// The container is terminated but we can't call m.RemoveContainer()
// here because it could remove the allocated cpuset for the container
// which may be in the process of being restarted. That would result
// in the container losing any exclusively-allocated CPUs that it
// was allocated.
_, _, err := m.containerMap.GetContainerRef(containerID)
if err == nil {
klog.Warningf("[cpumanager] reconcileState: ignoring terminated container (pod: %s, container id: %s)", pod.Name, containerID)
}
m.Unlock()
continue
}
// Once we make it here we know we have a running container.
// Idempotently add it to the containerMap incase it is missing.
// This can happen after a kubelet restart, for example.
m.containerMap.Add(string(pod.UID), container.Name, containerID)
m.Unlock()
cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
if cset.IsEmpty() {
// NOTE: This should not happen outside of tests.
klog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name)
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
continue
}
klog.V(4).Infof("[cpumanager] reconcileState: updating container (pod: %s, container: %s, container id: %s, cpuset: \"%v\")", pod.Name, container.Name, containerID, cset)
err = m.updateContainerCPUSet(containerID, cset)
if err != nil {
klog.Errorf("[cpumanager] reconcileState: failed to update container (pod: %s, container: %s, container id: %s, cpuset: \"%v\", error: %v)", pod.Name, container.Name, containerID, cset, err)
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
continue
}
success = append(success, reconciledContainer{pod.Name, container.Name, containerID})
}
}
return success, failure
}
func findContainerIDByName(status *v1.PodStatus, name string) (string, error) {
allStatuses := status.InitContainerStatuses
allStatuses = append(allStatuses, status.ContainerStatuses...)
for _, container := range allStatuses {
if container.Name == name && container.ContainerID != "" {
cid := &kubecontainer.ContainerID{}
err := cid.ParseString(container.ContainerID)
if err != nil {
return "", err
}
return cid.ID, nil
}
}
return "", fmt.Errorf("unable to find ID for container with name %v in pod status (it may not be running)", name)
}
func findContainerStatusByName(status *v1.PodStatus, name string) (*v1.ContainerStatus, error) {
for _, status := range append(status.InitContainerStatuses, status.ContainerStatuses...) {
if status.Name == name {
return &status, nil
}
}
return nil, fmt.Errorf("unable to find status for container with name %v in pod status (it may not be running)", name)
}
func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) error {
// TODO: Consider adding a `ResourceConfigForContainer` helper in
// helpers_linux.go similar to what exists for pods.
// It would be better to pass the full container resources here instead of
// this patch-like partial resources.
return m.containerRuntime.UpdateContainerResources(
containerID,
&runtimeapi.LinuxContainerResources{
CpusetCpus: cpus.String(),
})
}