Skip to content

Commit

Permalink
UPSTREAM: <carry>: add management support to kubelet
Browse files Browse the repository at this point in the history
UPSTREAM: <carry>: management workloads enhancement 741

UPSTREAM: <carry>: lower verbosity of managed workloads logging

Support for managed workloads was introduced by PR#627.  However, the
the CPU manager reconcile loop now seems to flood kubelet log with
"reconcileState: skipping pod; pod is managed" warnings.  Lower the
verbosity of these log messages.

UPSTREAM: <carry>: set correctly static pods CPUs when workload partitioning is disabled

UPSTREAM: <carry>: Remove reserved CPUs from default set

Remove reserved CPUs from default set when workload partitioning is
enabled.

Co-Authored-By: Brent Rowsell <browsell@redhat.com>
Signed-off-by: Artyom Lukianov <alukiano@redhat.com>
Signed-off-by: Don Penney <dpenney@redhat.com>
OpenShift-Rebase-Source: b762ced
OpenShift-Rebase-Source: 63cf793
OpenShift-Rebase-Source: 32af64c

UPSTREAM: <carry>: add management support to kubelet
  • Loading branch information
rphillips authored and sairameshv committed Dec 4, 2023
1 parent b7ae612 commit 50cdcde
Show file tree
Hide file tree
Showing 10 changed files with 511 additions and 1 deletion.
6 changes: 6 additions & 0 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/managed"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/utils/cpuset"
)
Expand Down Expand Up @@ -407,13 +408,18 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
failure = []reconciledContainer{}

m.removeStaleState()
workloadEnabled := managed.IsEnabled()
for _, pod := range m.activePods() {
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
if !ok {
klog.V(4).InfoS("ReconcileState: skipping pod; status not found", "pod", klog.KObj(pod))
failure = append(failure, reconciledContainer{pod.Name, "", ""})
continue
}
if enabled, _, _ := managed.IsPodManaged(pod); workloadEnabled && enabled {
klog.V(4).InfoS("[cpumanager] reconcileState: skipping pod; pod is managed (pod: %s)", pod.Name)
continue
}

allContainers := pod.Spec.InitContainers
allContainers = append(allContainers, pod.Spec.Containers...)
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/cm/cpumanager/policy_static.go
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/managed"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/cpuset"
Expand Down Expand Up @@ -203,6 +204,10 @@ func (p *staticPolicy) validateState(s state.State) error {
// state is empty initialize
allCPUs := p.topology.CPUDetails.CPUs()
s.SetDefaultCPUSet(allCPUs)
if managed.IsEnabled() {
defaultCpus := s.GetDefaultCPUSet().Difference(p.reservedCPUs)
s.SetDefaultCPUSet(defaultCpus)
}
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/kubelet/cm/qos_container_manager_linux.go
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1/resource"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/managed"
)

const (
Expand Down Expand Up @@ -173,6 +174,9 @@ func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass]
reuseReqs := make(v1.ResourceList, 4)
for i := range pods {
pod := pods[i]
if enabled, _, _ := managed.IsPodManaged(pod); enabled {
continue
}
qosClass := v1qos.GetPodQOS(pod)
if qosClass != v1.PodQOSBurstable {
// we only care about the burstable qos tier
Expand Down
11 changes: 11 additions & 0 deletions pkg/kubelet/config/file.go
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubelet/managed"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
utilio "k8s.io/utils/io"
)
Expand Down Expand Up @@ -230,6 +231,16 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
if podErr != nil {
return pod, podErr
}
if managed.IsEnabled() {
if newPod, _, err := managed.ModifyStaticPodForPinnedManagement(pod); err != nil {
klog.V(2).Error(err, "Static Pod is managed but errored", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
} else if newPod != nil {
klog.V(2).InfoS("Static Pod is managed. Using modified pod", "name", newPod.ObjectMeta.Name, "namespace", newPod.ObjectMeta.Namespace, "annotations", newPod.Annotations)
pod = newPod
} else {
klog.V(2).InfoS("Static Pod is not managed", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace)
}
}
return pod, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/config/file_linux.go
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/fsnotify/fsnotify"
"k8s.io/klog/v2"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/flowcontrol"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/kubelet.go
Expand Up @@ -88,6 +88,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs"
"k8s.io/kubernetes/pkg/kubelet/managed"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/kubernetes/pkg/kubelet/network/dns"
Expand Down Expand Up @@ -616,6 +617,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,

klet.runtimeService = kubeDeps.RemoteRuntimeService

if managed.IsEnabled() {
klog.InfoS("Pinned Workload Management Enabled")
}

if kubeDeps.KubeClient != nil {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/kubelet/kubelet_node_status.go
Expand Up @@ -38,7 +38,9 @@ import (
"k8s.io/klog/v2"
kubeletapis "k8s.io/kubelet/pkg/apis"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/managed"
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
"k8s.io/kubernetes/pkg/kubelet/util"
taintutil "k8s.io/kubernetes/pkg/util/taints"
Expand Down Expand Up @@ -118,6 +120,9 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate
if managed.IsEnabled() {
requiresUpdate = kl.addManagementNodeCapacity(node, existingNode) || requiresUpdate
}
if requiresUpdate {
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
Expand All @@ -128,6 +133,25 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
return true
}

// addManagementNodeCapacity adds the managednode capacity to the node
func (kl *Kubelet) addManagementNodeCapacity(initialNode, existingNode *v1.Node) bool {
updateDefaultResources(initialNode, existingNode)
machineInfo, err := kl.cadvisor.MachineInfo()
if err != nil {
klog.Errorf("Unable to calculate managed node capacity for %q: %v", kl.nodeName, err)
return false
}
cpuRequest := cadvisor.CapacityFromMachineInfo(machineInfo)[v1.ResourceCPU]
cpuRequestInMilli := cpuRequest.MilliValue()
newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)
managedResourceName := managed.GenerateResourceName("management")
if existingCapacity, ok := existingNode.Status.Capacity[managedResourceName]; ok && existingCapacity.Equal(*newCPURequest) {
return false
}
existingNode.Status.Capacity[managedResourceName] = *newCPURequest
return true
}

// reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported
func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
requiresUpdate := updateDefaultResources(initialNode, existingNode)
Expand Down Expand Up @@ -427,6 +451,9 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
}
}
}
if managed.IsEnabled() {
kl.addManagementNodeCapacity(node, node)
}

kl.setNodeStatus(ctx, node)

Expand Down
30 changes: 30 additions & 0 deletions pkg/kubelet/managed/cpu_shares.go
@@ -0,0 +1,30 @@
package managed

const (
// These limits are defined in the kernel:
// https://github.com/torvalds/linux/blob/0bddd227f3dc55975e2b8dfa7fc6f959b062a2c7/kernel/sched/sched.h#L427-L428
MinShares = 2
MaxShares = 262144

SharesPerCPU = 1024
MilliCPUToCPU = 1000
)

// MilliCPUToShares converts the milliCPU to CFS shares.
func MilliCPUToShares(milliCPU int64) uint64 {
if milliCPU == 0 {
// Docker converts zero milliCPU to unset, which maps to kernel default
// for unset: 1024. Return 2 here to really match kernel default for
// zero milliCPU.
return MinShares
}
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU
if shares < MinShares {
return MinShares
}
if shares > MaxShares {
return MaxShares
}
return uint64(shares)
}
141 changes: 141 additions & 0 deletions pkg/kubelet/managed/managed.go
@@ -0,0 +1,141 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package managed

import (
"encoding/json"
"fmt"
"os"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

var (
pinnedManagementEnabled bool
pinnedManagementFilename = "/etc/kubernetes/openshift-workload-pinning"
WorkloadsAnnotationPrefix = "target.workload.openshift.io/"
WorkloadsCapacitySuffix = "workload.openshift.io/cores"
ContainerAnnotationFormat = "resources.workload.openshift.io/%v"
)

type WorkloadContainerAnnotation struct {
CpuShares uint64 `json:"cpushares"`
}

func NewWorkloadContainerAnnotation(cpushares uint64) WorkloadContainerAnnotation {
return WorkloadContainerAnnotation{
CpuShares: cpushares,
}
}

func (w WorkloadContainerAnnotation) Serialize() ([]byte, error) {
return json.Marshal(w)
}

func init() {
readEnablementFile()
}

func readEnablementFile() {
if _, err := os.Stat(pinnedManagementFilename); err == nil {
pinnedManagementEnabled = true
}
}

func IsEnabled() bool {
return pinnedManagementEnabled
}

// IsPodManaged returns true and the name of the workload if enabled.
// returns true, workload name, and the annotation payload.
func IsPodManaged(pod *v1.Pod) (bool, string, string) {
if pod.ObjectMeta.Annotations == nil {
return false, "", ""
}
for annotation, value := range pod.ObjectMeta.Annotations {
if strings.HasPrefix(annotation, WorkloadsAnnotationPrefix) {
return true, strings.TrimPrefix(annotation, WorkloadsAnnotationPrefix), value
}
}
return false, "", ""
}

// ModifyStaticPodForPinnedManagement will modify a pod for pod management
func ModifyStaticPodForPinnedManagement(pod *v1.Pod) (*v1.Pod, string, error) {
pod = pod.DeepCopy()
enabled, workloadName, value := IsPodManaged(pod)
if !enabled {
return nil, "", nil
}
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[fmt.Sprintf("%v%v", WorkloadsAnnotationPrefix, workloadName)] = value
if err := updateContainers(workloadName, pod); err != nil {
return nil, "", err
}
return pod, workloadName, nil
}

func GenerateResourceName(workloadName string) v1.ResourceName {
return v1.ResourceName(fmt.Sprintf("%v.%v", workloadName, WorkloadsCapacitySuffix))
}

func updateContainers(workloadName string, pod *v1.Pod) error {
updateContainer := func(container *v1.Container) error {
if container.Resources.Requests == nil {
return fmt.Errorf("managed container %v does not have Resource.Requests", container.Name)
}
if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
return fmt.Errorf("managed container %v does not have cpu requests", container.Name)
}
if _, ok := container.Resources.Requests[v1.ResourceMemory]; !ok {
return fmt.Errorf("managed container %v does not have memory requests", container.Name)
}
if container.Resources.Limits == nil {
container.Resources.Limits = v1.ResourceList{}
}
cpuRequest := container.Resources.Requests[v1.ResourceCPU]
cpuRequestInMilli := cpuRequest.MilliValue()

containerAnnotation := NewWorkloadContainerAnnotation(MilliCPUToShares(cpuRequestInMilli))
jsonAnnotation, _ := containerAnnotation.Serialize()
containerNameKey := fmt.Sprintf(ContainerAnnotationFormat, container.Name)

newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format)

pod.Annotations[containerNameKey] = string(jsonAnnotation)
container.Resources.Requests[GenerateResourceName(workloadName)] = *newCPURequest
container.Resources.Limits[GenerateResourceName(workloadName)] = *newCPURequest

delete(container.Resources.Requests, v1.ResourceCPU)
return nil
}
for idx := range pod.Spec.Containers {
if err := updateContainer(&pod.Spec.Containers[idx]); err != nil {
return err
}
}
for idx := range pod.Spec.InitContainers {
if err := updateContainer(&pod.Spec.InitContainers[idx]); err != nil {
return err
}
}
return nil
}

0 comments on commit 50cdcde

Please sign in to comment.