Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explain the cause of the TopologyAffinityError #96890

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 11 additions & 3 deletions pkg/kubelet/cm/topologymanager/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ limitations under the License.
package topologymanager

import (
"fmt"
"strings"

"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
)
Expand All @@ -27,7 +30,7 @@ type Policy interface {
Name() string
// Returns a merged TopologyHint based on input from hint providers
// and a Pod Admit Handler Response based on hints and policy type
Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool)
Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool, error)
}

// Merge a TopologyHints permutation to a single hint by performing a bitwise-AND
Expand Down Expand Up @@ -59,14 +62,17 @@ func mergePermutation(numaNodes []int, permutation []TopologyHint) TopologyHint
return TopologyHint{mergedAffinity, preferred}
}

func filterProvidersHints(providersHints []map[string][]TopologyHint) [][]TopologyHint {
func filterProvidersHints(providersHints []map[string][]TopologyHint) ([][]TopologyHint, error) {
// Loop through all hint providers and save an accumulated list of the
// hints returned by each hint provider. If no hints are provided, assume
// that provider has no preference for topology-aware allocation.
var allProviderHints [][]TopologyHint
var err error
lackingResources := []string{}
for _, hints := range providersHints {
// If hints is nil, insert a single, preferred any-numa hint into allProviderHints.
if len(hints) == 0 {

klog.Infof("[topologymanager] Hint Provider has no preference for NUMA affinity with any resource")
allProviderHints = append(allProviderHints, []TopologyHint{{nil, true}})
continue
Expand All @@ -83,13 +89,15 @@ func filterProvidersHints(providersHints []map[string][]TopologyHint) [][]Topolo
if len(hints[resource]) == 0 {
klog.Infof("[topologymanager] Hint Provider has no possible NUMA affinities for resource '%s'", resource)
allProviderHints = append(allProviderHints, []TopologyHint{{nil, false}})
lackingResources = append(lackingResources, resource)
err = fmt.Errorf("not enough '%s' to allocate a pod", strings.Join(lackingResources, ", "))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure in general about this specific log, but this can be done outside the loop, right before we return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for diving into the PR! The idea behind it was to create slice of all missing resources and return it entirely with TopologyAffinityError, which could be truly helpful for the ones not familiar with Scope and the TopologyManager itself.

continue
}

allProviderHints = append(allProviderHints, hints[resource])
}
}
return allProviderHints
return allProviderHints, err
}

func mergeFilteredHints(numaNodes []int, filteredHints [][]TopologyHint) TopologyHint {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kubelet/cm/topologymanager/policy_best_effort.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func (p *bestEffortPolicy) canAdmitPodResult(hint *TopologyHint) bool {
return true
}

func (p *bestEffortPolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
filteredProvidersHints := filterProvidersHints(providersHints)
func (p *bestEffortPolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool, error) {
filteredProvidersHints, err := filterProvidersHints(providersHints)
bestHint := mergeFilteredHints(p.numaNodes, filteredProvidersHints)
admit := p.canAdmitPodResult(&bestHint)
return bestHint, admit
return bestHint, admit, err
}
4 changes: 2 additions & 2 deletions pkg/kubelet/cm/topologymanager/policy_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ func (p *nonePolicy) canAdmitPodResult(hint *TopologyHint) bool {
return true
}

func (p *nonePolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
return TopologyHint{}, p.canAdmitPodResult(nil)
func (p *nonePolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool, error) {
return TopologyHint{}, p.canAdmitPodResult(nil), nil
}
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/topologymanager/policy_none_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestPolicyNoneMerge(t *testing.T) {

for _, tc := range tcases {
policy := NewNonePolicy()
result, admit := policy.Merge(tc.providersHints)
result, admit, _ := policy.Merge(tc.providersHints)
if !result.IsEqual(tc.expectedHint) || admit != tc.expectedAdmit {
t.Errorf("Test Case: %s: Expected merge hint to be %v, got %v", tc.name, tc.expectedHint, result)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kubelet/cm/topologymanager/policy_restricted.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func (p *restrictedPolicy) canAdmitPodResult(hint *TopologyHint) bool {
return hint.Preferred
}

func (p *restrictedPolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
filteredHints := filterProvidersHints(providersHints)
func (p *restrictedPolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool, error) {
filteredHints, err := filterProvidersHints(providersHints)
hint := mergeFilteredHints(p.numaNodes, filteredHints)
admit := p.canAdmitPodResult(&hint)
return hint, admit
return hint, admit, err
}
6 changes: 3 additions & 3 deletions pkg/kubelet/cm/topologymanager/policy_single_numa_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func filterSingleNumaHints(allResourcesHints [][]TopologyHint) [][]TopologyHint
return filteredResourcesHints
}

func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
filteredHints := filterProvidersHints(providersHints)
func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool, error) {
filteredHints, err := filterProvidersHints(providersHints)
// Filter to only include don't cares and hints with a single NUMA node.
singleNumaHints := filterSingleNumaHints(filteredHints)
bestHint := mergeFilteredHints(p.numaNodes, singleNumaHints)
Expand All @@ -73,5 +73,5 @@ func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]TopologyHint)
}

admit := p.canAdmitPodResult(&bestHint)
return bestHint, admit
return bestHint, admit, err
}
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/topologymanager/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ func testPolicyMerge(policy Policy, tcases []policyMergeTestCase, t *testing.T)
providersHints = append(providersHints, hints)
}

actual, _ := policy.Merge(providersHints)
actual, _, _ := policy.Merge(providersHints)
if !reflect.DeepEqual(actual, tc.expected) {
t.Errorf("%v: Expected Topology Hint to be %v, got %v:", tc.name, tc.expected, actual)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/cm/topologymanager/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ func (s *scope) allocateAlignedResources(pod *v1.Pod, container *v1.Container) e
return nil
}

func topologyAffinityError() lifecycle.PodAdmitResult {
func topologyAffinityError(err error) lifecycle.PodAdmitResult {
return lifecycle.PodAdmitResult{
Message: "Resources cannot be allocated with Topology locality",
Message: fmt.Sprintf("Resources cannot be allocated with Topology locality due to %v", err),
Reason: "TopologyAffinityError",
Admit: false,
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/kubelet/cm/topologymanager/scope_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package topologymanager

import (
"fmt"

"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand Down Expand Up @@ -49,11 +51,11 @@ func (s *containerScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
}

for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
bestHint, admit := s.calculateAffinity(pod, &container)
bestHint, admit, err := s.calculateAffinity(pod, &container)
klog.Infof("[topologymanager] Best TopologyHint for (pod: %v container: %v): %v", format.Pod(pod), container.Name, bestHint)

if !admit {
return topologyAffinityError()
return topologyAffinityError(err)
}

if (s.podTopologyHints)[string(pod.UID)] == nil {
Expand All @@ -62,7 +64,7 @@ func (s *containerScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {

klog.Infof("[topologymanager] Topology Affinity for (pod: %v container: %v): %v", format.Pod(pod), container.Name, bestHint)
(s.podTopologyHints)[string(pod.UID)][container.Name] = bestHint
err := s.allocateAlignedResources(pod, &container)
err = s.allocateAlignedResources(pod, &container)
if err != nil {
return unexpectedAdmissionError(err)
}
Expand All @@ -82,9 +84,12 @@ func (s *containerScope) accumulateProvidersHints(pod *v1.Pod, container *v1.Con
return providersHints
}

func (s *containerScope) calculateAffinity(pod *v1.Pod, container *v1.Container) (TopologyHint, bool) {
func (s *containerScope) calculateAffinity(pod *v1.Pod, container *v1.Container) (TopologyHint, bool, error) {
providersHints := s.accumulateProvidersHints(pod, container)
bestHint, admit := s.policy.Merge(providersHints)
bestHint, admit, err := s.policy.Merge(providersHints)
klog.Infof("[topologymanager] ContainerTopologyHint: %v", bestHint)
return bestHint, admit
if !admit && err == nil {
err = fmt.Errorf("policy and requirements of a pod collide with each other")
}
return bestHint, admit, err
}
15 changes: 10 additions & 5 deletions pkg/kubelet/cm/topologymanager/scope_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package topologymanager

import (
"fmt"

"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand Down Expand Up @@ -48,10 +50,10 @@ func (s *podScope) Admit(pod *v1.Pod) lifecycle.PodAdmitResult {
return s.admitPolicyNone(pod)
}

bestHint, admit := s.calculateAffinity(pod)
bestHint, admit, err := s.calculateAffinity(pod)
klog.Infof("[topologymanager] Best TopologyHint for (pod: %v): %v", format.Pod(pod), bestHint)
if !admit {
return topologyAffinityError()
return topologyAffinityError(err)
}

for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
Expand Down Expand Up @@ -83,9 +85,12 @@ func (s *podScope) accumulateProvidersHints(pod *v1.Pod) []map[string][]Topology
return providersHints
}

func (s *podScope) calculateAffinity(pod *v1.Pod) (TopologyHint, bool) {
func (s *podScope) calculateAffinity(pod *v1.Pod) (TopologyHint, bool, error) {
providersHints := s.accumulateProvidersHints(pod)
bestHint, admit := s.policy.Merge(providersHints)
bestHint, admit, err := s.policy.Merge(providersHints)
klog.Infof("[topologymanager] PodTopologyHint: %v", bestHint)
return bestHint, admit
if !admit && err == nil {
err = fmt.Errorf("policy and requirements of a pod collide with each other")
}
return bestHint, admit, err
}
4 changes: 2 additions & 2 deletions pkg/kubelet/cm/topologymanager/topology_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ type mockPolicy struct {
ph []map[string][]TopologyHint
}

func (p *mockPolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool) {
func (p *mockPolicy) Merge(providersHints []map[string][]TopologyHint) (TopologyHint, bool, error) {
p.ph = providersHints
return TopologyHint{}, true
return TopologyHint{}, true, nil
}

func TestAddHintProvider(t *testing.T) {
Expand Down