Skip to content

Commit

Permalink
UPSTREAM: <carry>: add management support to kubelet
Browse files Browse the repository at this point in the history
UPSTREAM: <carry>: management workloads enhancement 741

UPSTREAM: <carry>: lower verbosity of managed workloads logging

Support for managed workloads was introduced by PR#627.  However, the
the CPU manager reconcile loop now seems to flood kubelet log with
"reconcileState: skipping pod; pod is managed" warnings.  Lower the
verbosity of these log messages.
  • Loading branch information
rphillips authored and damemi committed Dec 6, 2021
1 parent 29ee971 commit b8502cc
Show file tree
Hide file tree
Showing 9 changed files with 505 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/managed"
"k8s.io/kubernetes/pkg/kubelet/status"
)

Expand Down Expand Up @@ -402,6 +403,10 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
failure = append(failure, reconciledContainer{pod.Name, "", ""})
continue
}
if enabled, _, _ := managed.IsPodManaged(pod); enabled {
klog.V(4).InfoS("[cpumanager] reconcileState: skipping pod; pod is managed (pod: %s)", pod.Name)
continue
}

allContainers := pod.Spec.InitContainers
allContainers = append(allContainers, pod.Spec.Containers...)
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubelet/cm/qos_container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1/resource"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/managed"
)

const (
Expand Down Expand Up @@ -172,6 +173,9 @@ func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass]
burstablePodCPURequest := int64(0)
for i := range pods {
pod := pods[i]
if enabled, _, _ := managed.IsPodManaged(pod); enabled {
continue
}
qosClass := v1qos.GetPodQOS(pod)
if qosClass != v1.PodQOSBurstable {
// we only care about the burstable qos tier
Expand Down
11 changes: 11 additions & 0 deletions pkg/kubelet/config/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubelet/managed"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
utilio "k8s.io/utils/io"
)
Expand Down Expand Up @@ -230,6 +231,16 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
if podErr != nil {
return pod, podErr
}
if managed.IsEnabled() {
if newPod, _, err := managed.ModifyStaticPodForPinnedManagement(pod); err != nil {
klog.V(2).Error(err, "Static Pod is managed but errored", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
} else if newPod != nil {
klog.V(2).InfoS("Static Pod is managed. Using modified pod", "name", newPod.ObjectMeta.Name, "namespace", newPod.ObjectMeta.Namespace, "annotations", newPod.Annotations)
pod = newPod
} else {
klog.V(2).InfoS("Static Pod is not managed", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
}
}
return pod, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/config/file_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/fsnotify/fsnotify"
"k8s.io/klog/v2"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/flowcontrol"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/legacy"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs"
"k8s.io/kubernetes/pkg/kubelet/managed"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/kubernetes/pkg/kubelet/network/dns"
Expand Down Expand Up @@ -623,6 +624,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.dockerLegacyService = kubeDeps.dockerLegacyService
klet.runtimeService = kubeDeps.RemoteRuntimeService

if managed.IsEnabled() {
klog.InfoS("Pinned Workload Management Enabled")
}

if kubeDeps.KubeClient != nil {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ import (
"k8s.io/klog/v2"
kubeletapis "k8s.io/kubelet/pkg/apis"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/managed"
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
"k8s.io/kubernetes/pkg/kubelet/util"
taintutil "k8s.io/kubernetes/pkg/util/taints"
Expand Down Expand Up @@ -114,6 +116,9 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate
if managed.IsEnabled() {
requiresUpdate = kl.addManagementNodeCapacity(node, existingNode) || requiresUpdate
}
if requiresUpdate {
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
Expand All @@ -124,6 +129,25 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
return true
}

// addManagementNodeCapacity adds the managednode capacity to the node
func (kl *Kubelet) addManagementNodeCapacity(initialNode, existingNode *v1.Node) bool {
updateDefaultResources(initialNode, existingNode)
machineInfo, err := kl.cadvisor.MachineInfo()
if err != nil {
klog.Errorf("Unable to calculate managed node capacity for %q: %v", kl.nodeName, err)
return false
}
cpuRequest := cadvisor.CapacityFromMachineInfo(machineInfo)[v1.ResourceCPU]
cpuRequestInMilli := cpuRequest.MilliValue()
newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)
managedResourceName := managed.GenerateResourceName("management")
if existingCapacity, ok := existingNode.Status.Capacity[managedResourceName]; ok && existingCapacity.Equal(*newCPURequest) {
return false
}
existingNode.Status.Capacity[managedResourceName] = *newCPURequest
return true
}

// reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported
func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
requiresUpdate := updateDefaultResources(initialNode, existingNode)
Expand Down Expand Up @@ -423,6 +447,9 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
}
}
}
if managed.IsEnabled() {
kl.addManagementNodeCapacity(node, node)
}

kl.setNodeStatus(node)

Expand Down
30 changes: 30 additions & 0 deletions pkg/kubelet/managed/cpu_shares.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package managed

const (
// These limits are defined in the kernel:
// https://github.com/torvalds/linux/blob/0bddd227f3dc55975e2b8dfa7fc6f959b062a2c7/kernel/sched/sched.h#L427-L428
MinShares = 2
MaxShares = 262144

SharesPerCPU = 1024
MilliCPUToCPU = 1000
)

// MilliCPUToShares converts the milliCPU to CFS shares.
func MilliCPUToShares(milliCPU int64) uint64 {
if milliCPU == 0 {
// Docker converts zero milliCPU to unset, which maps to kernel default
// for unset: 1024. Return 2 here to really match kernel default for
// zero milliCPU.
return MinShares
}
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU
if shares < MinShares {
return MinShares
}
if shares > MaxShares {
return MaxShares
}
return uint64(shares)
}
141 changes: 141 additions & 0 deletions pkg/kubelet/managed/managed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package managed

import (
"encoding/json"
"fmt"
"os"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

var (
pinnedManagementEnabled bool
pinnedManagementFilename = "/etc/kubernetes/openshift-workload-pinning"
WorkloadsAnnotationPrefix = "target.workload.openshift.io/"
WorkloadsCapacitySuffix = "workload.openshift.io/cores"
ContainerAnnotationFormat = "resources.workload.openshift.io/%v"
)

type WorkloadContainerAnnotation struct {
CpuShares uint64 `json:"cpushares"`
}

func NewWorkloadContainerAnnotation(cpushares uint64) WorkloadContainerAnnotation {
return WorkloadContainerAnnotation{
CpuShares: cpushares,
}
}

func (w WorkloadContainerAnnotation) Serialize() ([]byte, error) {
return json.Marshal(w)
}

func init() {
readEnablementFile()
}

func readEnablementFile() {
if _, err := os.Stat(pinnedManagementFilename); err == nil {
pinnedManagementEnabled = true
}
}

func IsEnabled() bool {
return pinnedManagementEnabled
}

/// IsPodManaged returns true and the name of the workload if enabled.
/// returns true, workload name, and the annotation payload.
func IsPodManaged(pod *v1.Pod) (bool, string, string) {
if pod.ObjectMeta.Annotations == nil {
return false, "", ""
}
for annotation, value := range pod.ObjectMeta.Annotations {
if strings.HasPrefix(annotation, WorkloadsAnnotationPrefix) {
return true, strings.TrimPrefix(annotation, WorkloadsAnnotationPrefix), value
}
}
return false, "", ""
}

// ModifyStaticPodForPinnedManagement will modify a pod for pod management
func ModifyStaticPodForPinnedManagement(pod *v1.Pod) (*v1.Pod, string, error) {
pod = pod.DeepCopy()
enabled, workloadName, value := IsPodManaged(pod)
if !enabled {
return nil, "", nil
}
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[fmt.Sprintf("%v%v", WorkloadsAnnotationPrefix, workloadName)] = value
if err := updateContainers(workloadName, pod); err != nil {
return nil, "", err
}
return pod, workloadName, nil
}

func GenerateResourceName(workloadName string) v1.ResourceName {
return v1.ResourceName(fmt.Sprintf("%v.%v", workloadName, WorkloadsCapacitySuffix))
}

func updateContainers(workloadName string, pod *v1.Pod) error {
updateContainer := func(container *v1.Container) error {
if container.Resources.Requests == nil {
return fmt.Errorf("managed container %v does not have Resource.Requests", container.Name)
}
if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
return fmt.Errorf("managed container %v does not have cpu requests", container.Name)
}
if _, ok := container.Resources.Requests[v1.ResourceMemory]; !ok {
return fmt.Errorf("managed container %v does not have memory requests", container.Name)
}
if container.Resources.Limits == nil {
container.Resources.Limits = v1.ResourceList{}
}
cpuRequest := container.Resources.Requests[v1.ResourceCPU]
cpuRequestInMilli := cpuRequest.MilliValue()

containerAnnotation := NewWorkloadContainerAnnotation(MilliCPUToShares(cpuRequestInMilli))
jsonAnnotation, _ := containerAnnotation.Serialize()
containerNameKey := fmt.Sprintf(ContainerAnnotationFormat, container.Name)

newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)

pod.Annotations[containerNameKey] = string(jsonAnnotation)
container.Resources.Requests[GenerateResourceName(workloadName)] = *newCPURequest
container.Resources.Limits[GenerateResourceName(workloadName)] = *newCPURequest

delete(container.Resources.Requests, v1.ResourceCPU)
return nil
}
for idx := range pod.Spec.Containers {
if err := updateContainer(&pod.Spec.Containers[idx]); err != nil {
return err
}
}
for idx := range pod.Spec.InitContainers {
if err := updateContainer(&pod.Spec.InitContainers[idx]); err != nil {
return err
}
}
return nil
}

0 comments on commit b8502cc

Please sign in to comment.