Skip to content

Commit

Permalink
Support cluster-level extended resources in kubelet and kube-scheduler
Browse files Browse the repository at this point in the history
Co-authored-by: Yang Guo <ygg@google.com>
Co-authored-by: Chun Chen <chenchun.feed@gmail.com>
  • Loading branch information
yguo0905 and chenchun committed Feb 28, 2018
1 parent c1a97c3 commit 8d88050
Show file tree
Hide file tree
Showing 25 changed files with 613 additions and 58 deletions.
12 changes: 10 additions & 2 deletions pkg/kubelet/kubelet_node_status.go
Expand Up @@ -606,8 +606,16 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}

for _, removedResource := range removedDevicePlugins {
glog.V(2).Infof("Remove capacity for %s", removedResource)
delete(node.Status.Capacity, v1.ResourceName(removedResource))
glog.V(2).Infof("Set capacity for %s to 0 on device removal", removedResource)
// Set the capacity of the removed resource to 0 instead of
// removing the resource from the node status. This is to indicate
// that the resource is managed by device plugin and had been
// registered before.
//
// This is required to differentiate the device plugin managed
// resources and the cluster-level resources, which are absent in
// node status.
node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
}
}

Expand Down
56 changes: 36 additions & 20 deletions pkg/kubelet/kubelet_test.go
Expand Up @@ -583,18 +583,20 @@ func TestHandlePluginResources(t *testing.T) {
kl := testKubelet.kubelet

adjustedResource := v1.ResourceName("domain1.com/adjustedResource")
unadjustedResouce := v1.ResourceName("domain2.com/unadjustedResouce")
emptyResource := v1.ResourceName("domain2.com/emptyResource")
missingResource := v1.ResourceName("domain2.com/missingResource")
failedResource := v1.ResourceName("domain2.com/failedResource")
resourceQuantity0 := *resource.NewQuantity(int64(0), resource.DecimalSI)
resourceQuantity1 := *resource.NewQuantity(int64(1), resource.DecimalSI)
resourceQuantity2 := *resource.NewQuantity(int64(2), resource.DecimalSI)
resourceQuantityInvalid := *resource.NewQuantity(int64(-1), resource.DecimalSI)
allowedPodQuantity := *resource.NewQuantity(int64(10), resource.DecimalSI)
nodes := []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{
adjustedResource: resourceQuantity1,
unadjustedResouce: resourceQuantity1,
v1.ResourcePods: allowedPodQuantity,
adjustedResource: resourceQuantity1,
emptyResource: resourceQuantity0,
v1.ResourcePods: allowedPodQuantity,
}}},
}
kl.nodeInfo = testNodeInfo{nodes: nodes}
Expand All @@ -607,6 +609,7 @@ func TestHandlePluginResources(t *testing.T) {
// quantity unchanged.
updateResourceMap := map[v1.ResourceName]resource.Quantity{
adjustedResource: resourceQuantity2,
emptyResource: resourceQuantity0,
failedResource: resourceQuantityInvalid,
}
pod := attrs.Pod
Expand Down Expand Up @@ -634,7 +637,7 @@ func TestHandlePluginResources(t *testing.T) {

// pod requiring adjustedResource can be successfully allocated because updatePluginResourcesFunc
// adjusts node.allocatableResource for this resource to a sufficient value.
fittingPodspec := v1.PodSpec{NodeName: string(kl.nodeName),
fittingPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
adjustedResource: resourceQuantity2,
Expand All @@ -644,14 +647,30 @@ func TestHandlePluginResources(t *testing.T) {
},
}}},
}
// pod requiring unadjustedResouce with insufficient quantity will still fail PredicateAdmit.
exceededPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
// pod requiring emptyResource (extended resources with 0 allocatable) will
// not pass PredicateAdmit.
emptyPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
unadjustedResouce: resourceQuantity2,
emptyResource: resourceQuantity2,
},
Requests: v1.ResourceList{
unadjustedResouce: resourceQuantity2,
emptyResource: resourceQuantity2,
},
}}},
}
// pod requiring missingResource will pass PredicateAdmit.
//
// Extended resources missing in node status are ignored in PredicateAdmit.
// This is required to support extended resources that are not managed by
// device plugin, such as cluster-level resources.
missingPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
missingResource: resourceQuantity2,
},
Requests: v1.ResourceList{
missingResource: resourceQuantity2,
},
}}},
}
Expand All @@ -666,21 +685,18 @@ func TestHandlePluginResources(t *testing.T) {
},
}}},
}
pods := []*v1.Pod{
podWithUIDNameNsSpec("123", "fittingpod", "foo", fittingPodspec),
podWithUIDNameNsSpec("456", "exceededpod", "foo", exceededPodSpec),
podWithUIDNameNsSpec("789", "failedpod", "foo", failedPodSpec),
}
// The latter two pod should be rejected.
fittingPod := pods[0]
exceededPod := pods[1]
failedPod := pods[2]

kl.HandlePodAdditions(pods)
fittingPod := podWithUIDNameNsSpec("1", "fittingpod", "foo", fittingPodSpec)
emptyPod := podWithUIDNameNsSpec("2", "emptypod", "foo", emptyPodSpec)
missingPod := podWithUIDNameNsSpec("3", "missingpod", "foo", missingPodSpec)
failedPod := podWithUIDNameNsSpec("4", "failedpod", "foo", failedPodSpec)

kl.HandlePodAdditions([]*v1.Pod{fittingPod, emptyPod, missingPod, failedPod})

// Check pod status stored in the status map.
checkPodStatus(t, kl, fittingPod, v1.PodPending)
checkPodStatus(t, kl, exceededPod, v1.PodFailed)
checkPodStatus(t, kl, emptyPod, v1.PodFailed)
checkPodStatus(t, kl, missingPod, v1.PodPending)
checkPodStatus(t, kl, failedPod, v1.PodFailed)
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/kubelet/lifecycle/BUILD
Expand Up @@ -18,6 +18,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/kubelet/lifecycle",
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
Expand All @@ -34,12 +35,17 @@ go_library(

go_test(
name = "go_default_test",
srcs = ["handlers_test.go"],
srcs = [
"handlers_test.go",
"predicate_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/scheduler/schedulercache:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
],
)
Expand Down
34 changes: 33 additions & 1 deletion pkg/kubelet/lifecycle/predicate.go
Expand Up @@ -20,7 +20,9 @@ import (
"fmt"

"github.com/golang/glog"

"k8s.io/api/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
Expand Down Expand Up @@ -77,7 +79,18 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
Message: message,
}
}
fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo)

// Remove the requests of the extended resources that are missing in the
// node info. This is required to support cluster-level resources, which
// are extended resources unknown to nodes.
//
// Caveat: If a pod was manually bound to a node (e.g., static pod) where a
// node-level extended resource it requires is not found, then kubelet will
// not fail admission while it should. This issue will be addressed with
// the Resource Class API in the future.
podWithoutMissingExtendedResources := removeMissingExtendedResources(pod, nodeInfo)

fit, reasons, err := predicates.GeneralPredicates(podWithoutMissingExtendedResources, nil, nodeInfo)
if err != nil {
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
Expand Down Expand Up @@ -141,3 +154,22 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
Admit: true,
}
}

func removeMissingExtendedResources(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) *v1.Pod {
podCopy := pod.DeepCopy()
for i, c := range pod.Spec.Containers {
// We only handle requests in Requests but not Limits because the
// PodFitsResources predicate, to which the result pod will be passed,
// does not use Limits.
podCopy.Spec.Containers[i].Resources.Requests = make(v1.ResourceList)
for rName, rQuant := range c.Resources.Requests {
if v1helper.IsExtendedResourceName(rName) {
if _, found := nodeInfo.AllocatableResource().ScalarResources[rName]; !found {
continue
}
}
podCopy.Spec.Containers[i].Resources.Requests[rName] = rQuant
}
}
return podCopy
}
114 changes: 114 additions & 0 deletions pkg/kubelet/lifecycle/predicate_test.go
@@ -0,0 +1,114 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lifecycle

import (
"reflect"
"testing"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
)

var (
quantity = *resource.NewQuantity(1, resource.DecimalSI)
extendedResourceName1 = "example.com/er1"
extendedResourceName2 = "example.com/er2"
)

func TestRemoveMissingExtendedResources(t *testing.T) {
for _, test := range []struct {
desc string
pod *v1.Pod
node *v1.Node

expectedPod *v1.Pod
}{
{
desc: "requests in Limits should be ignored",
pod: makeTestPod(
v1.ResourceList{}, // Requests
v1.ResourceList{"foo.com/bar": quantity}, // Limits
),
node: makeTestNode(
v1.ResourceList{"foo.com/baz": quantity}, // Allocatable
),
expectedPod: makeTestPod(
v1.ResourceList{}, // Requests
v1.ResourceList{"foo.com/bar": quantity}, // Limits
),
},
{
desc: "requests for resources available in node should not be removed",
pod: makeTestPod(
v1.ResourceList{"foo.com/bar": quantity}, // Requests
v1.ResourceList{}, // Limits
),
node: makeTestNode(
v1.ResourceList{"foo.com/bar": quantity}, // Allocatable
),
expectedPod: makeTestPod(
v1.ResourceList{"foo.com/bar": quantity}, // Requests
v1.ResourceList{}), // Limits
},
{
desc: "requests for resources unavailable in node should be removed",
pod: makeTestPod(
v1.ResourceList{"foo.com/bar": quantity}, // Requests
v1.ResourceList{}, // Limits
),
node: makeTestNode(
v1.ResourceList{"foo.com/baz": quantity}, // Allocatable
),
expectedPod: makeTestPod(
v1.ResourceList{}, // Requests
v1.ResourceList{}, // Limits
),
},
} {
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(test.node)
pod := removeMissingExtendedResources(test.pod, nodeInfo)
if !reflect.DeepEqual(pod, test.expectedPod) {
t.Errorf("%s: Expected pod\n%v\ngot\n%v\n", test.desc, test.expectedPod, pod)
}
}
}

func makeTestPod(requests, limits v1.ResourceList) *v1.Pod {
return &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: requests,
Limits: limits,
},
},
},
},
}
}

func makeTestNode(allocatable v1.ResourceList) *v1.Node {
return &v1.Node{
Status: v1.NodeStatus{
Allocatable: allocatable,
},
}
}
1 change: 1 addition & 0 deletions pkg/scheduler/algorithm/predicates/BUILD
Expand Up @@ -61,6 +61,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)
Expand Down
28 changes: 24 additions & 4 deletions pkg/scheduler/algorithm/predicates/metadata.go
Expand Up @@ -22,6 +22,7 @@ import (

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
Expand Down Expand Up @@ -53,6 +54,13 @@ type predicateMetadata struct {
serviceAffinityInUse bool
serviceAffinityMatchingPodList []*v1.Pod
serviceAffinityMatchingPodServices []*v1.Service
// ignoredExtendedResources is a set of extended resource names that will
// be ignored in the PodFitsResources predicate.
//
// They can be scheduler extender managed resources, the consumption of
// which should be accounted only by the extenders. This set is synthesized
// from scheduler extender configuration and does not change per pod.
ignoredExtendedResources sets.String
}

// Ensure that predicateMetadata implements algorithm.PredicateMetadata.
Expand All @@ -71,6 +79,17 @@ func RegisterPredicateMetadataProducer(predicateName string, precomp PredicateMe
predicateMetadataProducers[predicateName] = precomp
}

// RegisterPredicateMetadataProducerWithExtendedResourceOptions registers a
// PredicateMetadataProducer that creates predicate metadata with the provided
// options for extended resources.
//
// See the comments in "predicateMetadata" for the explanation of the options.
func RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources sets.String) {
RegisterPredicateMetadataProducer("PredicateWithExtendedResourceOptions", func(pm *predicateMetadata) {
pm.ignoredExtendedResources = ignoredExtendedResources
})
}

// NewPredicateMetadataFactory creates a PredicateMetadataFactory.
func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.PredicateMetadataProducer {
factory := &PredicateMetadataFactory{
Expand Down Expand Up @@ -170,10 +189,11 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache
// its maps and slices, but it does not copy the contents of pointer values.
func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
newPredMeta := &predicateMetadata{
pod: meta.pod,
podBestEffort: meta.podBestEffort,
podRequest: meta.podRequest,
serviceAffinityInUse: meta.serviceAffinityInUse,
pod: meta.pod,
podBestEffort: meta.podBestEffort,
podRequest: meta.podRequest,
serviceAffinityInUse: meta.serviceAffinityInUse,
ignoredExtendedResources: meta.ignoredExtendedResources,
}
newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...)
newPredMeta.matchingAntiAffinityTerms = map[string][]matchingPodAntiAffinityTerm{}
Expand Down

0 comments on commit 8d88050

Please sign in to comment.