Skip to content

Commit

Permalink
UPSTREAM: <carry>: advertise shared cpus for mixed cpus feature
Browse files Browse the repository at this point in the history
Kubelet should advertise the shared cpus as extedned resources.
This has the benefit of limiting the amount of containers
that can request an access to the shared cpus.

For more information see - openshift/enhancements#1396

Signed-off-by: Talor Itzhak <titzhak@redhat.com>
  • Loading branch information
Tal-or authored and dinhxuanvu committed Apr 15, 2024
1 parent 2dc5c17 commit 9ed9af8
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/server"
servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/sharedcpus"
"k8s.io/kubernetes/pkg/kubelet/stats"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
Expand Down Expand Up @@ -631,6 +632,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
if managed.IsEnabled() {
klog.InfoS("Pinned Workload Management Enabled")
}
if sharedcpus.IsEnabled() {
klog.InfoS("Mixed CPUs Workload Enabled")
}

if kubeDeps.KubeClient != nil {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
Expand Down
22 changes: 22 additions & 0 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/managed"
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
"k8s.io/kubernetes/pkg/kubelet/sharedcpus"
"k8s.io/kubernetes/pkg/kubelet/util"
taintutil "k8s.io/kubernetes/pkg/util/taints"
volutil "k8s.io/kubernetes/pkg/volume/util"
Expand Down Expand Up @@ -123,6 +124,7 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
if managed.IsEnabled() {
requiresUpdate = kl.addManagementNodeCapacity(node, existingNode) || requiresUpdate
}
requiresUpdate = kl.reconcileSharedCPUsNodeCapacity(node, existingNode) || requiresUpdate
if requiresUpdate {
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
Expand Down Expand Up @@ -152,6 +154,25 @@ func (kl *Kubelet) addManagementNodeCapacity(initialNode, existingNode *v1.Node)
return true
}

func (kl *Kubelet) reconcileSharedCPUsNodeCapacity(initialNode, existingNode *v1.Node) bool {
updateDefaultResources(initialNode, existingNode)
sharedCPUsResourceName := sharedcpus.GetResourceName()
// delete resources in case they exist and feature has been disabled
if !sharedcpus.IsEnabled() {
if _, ok := existingNode.Status.Capacity[sharedCPUsResourceName]; ok {
delete(existingNode.Status.Capacity, sharedCPUsResourceName)
return true
}
return false
}
q := resource.NewQuantity(sharedcpus.GetConfig().ContainersLimit, resource.DecimalSI)
if existingCapacity, ok := existingNode.Status.Capacity[sharedCPUsResourceName]; ok && existingCapacity.Equal(*q) {
return false
}
existingNode.Status.Capacity[sharedCPUsResourceName] = *q
return true
}

// reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported
func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
requiresUpdate := updateDefaultResources(initialNode, existingNode)
Expand Down Expand Up @@ -454,6 +475,7 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
if managed.IsEnabled() {
kl.addManagementNodeCapacity(node, node)
}
kl.reconcileSharedCPUsNodeCapacity(node, node)

kl.setNodeStatus(ctx, node)

Expand Down
87 changes: 87 additions & 0 deletions pkg/kubelet/sharedcpus/sharedcpus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sharedcpus

import (
"encoding/json"
"errors"
"os"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)

const (
configFileName = "/etc/kubernetes/openshift-workload-mixed-cpus"
sharedCpusResourceName = "workload.openshift.io/enable-shared-cpus"
)

var (
config Config
sharedCpusEnabled bool
)

type Config struct {
sharedCpus `json:"shared_cpus"`
}

type sharedCpus struct {
// ContainersLimit specify the number of containers that are allowed to access the shared CPU pool`
ContainersLimit int64 `json:"containers_limit"`
}

func init() {
parseConfig()
}

func IsEnabled() bool {
return sharedCpusEnabled
}

func GetResourceName() corev1.ResourceName {
return sharedCpusResourceName
}

func GetConfig() Config {
return config
}

func parseConfig() {
b, err := os.ReadFile(configFileName)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return
}
klog.ErrorS(err, "Failed to read configuration file for shared cpus", "fileName", configFileName)
return
}
cfg, err := parseConfigData(b)
if err != nil {
return
}
config = *cfg
sharedCpusEnabled = true
}

func parseConfigData(data []byte) (*Config, error) {
cfg := &Config{}
err := json.Unmarshal(data, cfg)
if err != nil {
klog.ErrorS(err, "Failed to parse configuration file for shared cpus", "fileContent", string(data))
}
return cfg, err
}
39 changes: 39 additions & 0 deletions pkg/kubelet/sharedcpus/sharedcpus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package sharedcpus

import "testing"

func TestParseConfigData(t *testing.T) {
testCases := []struct {
data []byte
expectedToBeParsed bool
containerLimitValue int64
}{
{
data: []byte(`{
"shared_cpus": {
"containers_limit": 15
}
}`),
expectedToBeParsed: true,
containerLimitValue: 15,
},
{
data: []byte(`{
"shared_cpus": {
"abc": "25"
}
}`),
expectedToBeParsed: false,
containerLimitValue: 0,
},
}
for _, tc := range testCases {
cfg, err := parseConfigData(tc.data)
if err != nil && tc.expectedToBeParsed {
t.Errorf("shared cpus data expected to be parsed")
}
if cfg.ContainersLimit != tc.containerLimitValue {
t.Errorf("shared cpus ContainersLimit is different than expected: want: %d; got %d", tc.containerLimitValue, cfg.ContainersLimit)
}
}
}

0 comments on commit 9ed9af8

Please sign in to comment.