Skip to content

Commit

Permalink
Remove patching with null value for descheduling strategy
Browse files Browse the repository at this point in the history
Signed-off-by: Madalina Lazar <madalina.lazar@intel.com>
  • Loading branch information
madalazar committed Dec 22, 2022
1 parent 4a4409a commit 533c754
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 55 deletions.
39 changes: 19 additions & 20 deletions telemetry-aware-scheduling/pkg/strategies/deschedule/enforce.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
)

const (
l2 = 2
l4 = 4
l2 = 2
l4 = 4
failNodeListCleanUpMessage = "failed to list nodes during clean-up"
failNodeListEnforceMessage = "failed to list all nodes during enforce"
)

var errNull = errors.New("")
Expand All @@ -40,7 +42,7 @@ func (d *Strategy) Cleanup(enforcer *strategy.MetricEnforcer, policyName string)
msg := fmt.Sprintf("cannot list nodes: %v", err)
klog.V(l2).InfoS(msg, "component", "controller")

return fmt.Errorf("failed to cleanup node labels: %w", err)
return fmt.Errorf("%s: %w", failNodeListCleanUpMessage, err)
}

for _, node := range nodes.Items {
Expand Down Expand Up @@ -74,7 +76,7 @@ func (d *Strategy) Enforce(enforcer *strategy.MetricEnforcer, cache cache.Reader
msg := fmt.Sprintf("cannot list nodes: %v", err)
klog.V(l2).InfoS(msg, "component", "controller")

return -1, fmt.Errorf("failed to enforce strategy: %w", err)
return -1, fmt.Errorf("%s: %w", failNodeListEnforceMessage, err)
}

list := d.nodeStatusForStrategy(enforcer, cache)
Expand Down Expand Up @@ -147,31 +149,28 @@ func (d *Strategy) updateNodeLabels(enforcer *strategy.MetricEnforcer, viols vio

for policyName := range nonViolatedPolicies {
if _, ok := node.Labels[policyName]; ok {
// There is a duplication of work here - both label added as null and label removed. Due to some oddness in behaviour on remove label.
//TODO: Decide which behaviour is better. This leaves a constant label on every node for every strategy in the enforcer.
payload = append(payload,
patchValue{
Op: "remove",
Path: "/metadata/labels/" + policyName,
Op: "remove",
Path: "/metadata/labels/" + policyName,
Value: "",
})
payload = append(payload, patchValue{
Op: "add",
Path: "/metadata/labels/" + policyName,
Value: "null",
})
}
totalViolations++
}

err := d.patchNode(node.Name, enforcer, payload)
if err != nil {
if len(labelErrs) == 0 {
labelErrs = "could not label: "
}
if len(payload) != 0 {
err := d.patchNode(node.Name, enforcer, payload)

klog.V(l4).InfoS(err.Error(), "component", "controller")
if err != nil {
if len(labelErrs) == 0 {
labelErrs = "could not label: "
}

labelErrs = labelErrs + node.Name + ": [ " + violatedPolicies + " ]; "
klog.V(l4).InfoS(err.Error(), "component", "controller")

labelErrs = labelErrs + node.Name + ": [ " + violatedPolicies + " ]; "
}
}

if len(violatedPolicies) > 0 {
Expand Down
243 changes: 208 additions & 35 deletions telemetry-aware-scheduling/pkg/strategies/deschedule/enforce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package deschedule

import (
"context"
"errors"
"fmt"
"reflect"
"strings"
"testing"
"time"

"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/typed/core/v1/fake"
k8stesting "k8s.io/client-go/testing"

"github.com/intel/platform-aware-scheduling/telemetry-aware-scheduling/pkg/cache"
"github.com/intel/platform-aware-scheduling/telemetry-aware-scheduling/pkg/metrics"
Expand All @@ -19,81 +24,249 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"
)

var errMockTest = errors.New("error when calling list")

type expected struct {
nodes map[string]map[string]string // node name: string -> labels: map[string]string
labeledNodes map[string]map[string]string // node name: string -> labels: map[string]string
}

type CacheMetric struct {
metricName string
metricValue int64
}

func assertViolatingNodes(t *testing.T, nodeList *v1.NodeList, wantNodes map[string]map[string]string) {
t.Helper()

nodes := nodeList.Items
// check lengths are equal
if len(nodes) != len(wantNodes) {
t.Errorf("Number of violating nodes returned: %v not as expected: %v", len(nodes), len(wantNodes))
}

// check if the nodes are similar
for _, node := range nodes {
currentNodeName := node.Name
currentNodeLabels := node.Labels

if wantNodes[currentNodeName] == nil {
t.Errorf("Expected to find node %s in list of expected nodes, but wasn't there.", currentNodeName)
}

expectedNodeLabels := wantNodes[node.Name]
if !reflect.DeepEqual(expectedNodeLabels, currentNodeLabels) {
t.Errorf("Labels for node were different, expected %v got: %v", expectedNodeLabels, currentNodeLabels)
}
}
}

func TestDescheduleStrategy_Enforce(t *testing.T) {
type args struct {
enforcer *strategy.MetricEnforcer
cache cache.ReaderWriter
}

type expected struct {
nodeNames []string
}
clientWithListNodeException := testclient.NewSimpleClientset()
clientWithListNodeException.CoreV1().(*fake.FakeCoreV1).PrependReactor("list", "nodes",
func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &v1.NodeList{}, errMockTest
})

tests := []struct {
name string
d *Strategy
node *v1.Node
args args
wantErr bool
want expected
name string
d *Strategy
nodes []*v1.Node
args args
wantErr bool
want expected
cacheMetrics map[string]CacheMetric // node name: string -> metric : { metricName, metricValue }
wantErrMessageToken string
}{
{name: "node label test",
d: &Strategy{PolicyName: "deschedule-test", Rules: []telpol.TASPolicyRule{
{Metricname: "memory", Operator: "GreaterThan", Target: 1},
{Metricname: "cpu", Operator: "LessThan", Target: 10}}},
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1", Labels: map[string]string{"deschedule-test": ""}}},
nodes: []*v1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1", Labels: map[string]string{"deschedule-test": "", "node-1-label": "test"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{"deschedule-test": "violating", "node-2-label": "test"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "node-3", Labels: map[string]string{"node-3-label": "test"}}}},
cacheMetrics: map[string]CacheMetric{"node-2": {"cpu", 5}, "node-3": {"memory", 100}},
args: args{enforcer: strategy.NewEnforcer(testclient.NewSimpleClientset()),
cache: cache.MockEmptySelfUpdatingCache()},
want: expected{nodeNames: []string{"node-1"}}},
want: expected{
nodes: map[string]map[string]string{"node-1": {"node-1-label": "test"},
"node-2": {"deschedule-test": "violating", "node-2-label": "test"},
"node-3": {"deschedule-test": "violating", "node-3-label": "test"}},
labeledNodes: map[string]map[string]string{"node-2": {"deschedule-test": "violating", "node-2-label": "test"},
"node-3": {"deschedule-test": "violating", "node-3-label": "test"}},
}},
{name: "node unlabel test",
d: &Strategy{PolicyName: "deschedule-test", Rules: []telpol.TASPolicyRule{
{Metricname: "memory", Operator: "GreaterThan", Target: 1000},
{Metricname: "cpu", Operator: "LessThan", Target: 10}}},
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1", Labels: map[string]string{"deschedule-test": "violating"}}},
nodes: []*v1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1", Labels: map[string]string{"deschedule-test": "violating", "node-1-label": "test"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{"deschedule-test": "violating", "node-2-label": "test"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "node-3", Labels: map[string]string{"node-3-label": "test"}}}},
cacheMetrics: map[string]CacheMetric{"node-2": {"cpu", 11}, "node-3": {"memory", 100}},
args: args{enforcer: strategy.NewEnforcer(testclient.NewSimpleClientset()),
cache: cache.MockEmptySelfUpdatingCache()},
want: expected{nodeNames: []string{}}},
want: expected{
nodes: map[string]map[string]string{"node-1": {"node-1-label": "test"},
"node-2": {"node-2-label": "test"},
"node-3": {"node-3-label": "test"}},
labeledNodes: map[string]map[string]string{}}},
{name: "list nodes with exception",
d: &Strategy{PolicyName: "deschedule-test", Rules: []telpol.TASPolicyRule{
{Metricname: "memory", Operator: "GreaterThan", Target: 1000},
{Metricname: "cpu", Operator: "LessThan", Target: 10}}},
nodes: []*v1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1", Labels: map[string]string{"deschedule-test": "violating", "node-1-label": "test"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{"deschedule-test": "violating", "node-2-label": "test"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "node-3", Labels: map[string]string{"node-3-label": "test"}}}},
cacheMetrics: map[string]CacheMetric{"node-2": {"cpu", 11}, "node-3": {"memory", 100}},
args: args{enforcer: strategy.NewEnforcer(clientWithListNodeException),
cache: cache.MockEmptySelfUpdatingCache()},
want: expected{},
wantErr: true,
wantErrMessageToken: failNodeListEnforceMessage},
}
for _, tt := range tests {
tt := tt

err := tt.args.cache.WriteMetric("memory", metrics.NodeMetricsInfo{
"node-1": {Timestamp: time.Now(), Window: 1, Value: *resource.NewQuantity(100, resource.DecimalSI)}})
if err != nil {
t.Errorf("Cannot write metric to mock cach for test: %v", err)
for metricNodeName, metric := range tt.cacheMetrics {
err := tt.args.cache.WriteMetric(metric.metricName, metrics.NodeMetricsInfo{
metricNodeName: {Timestamp: time.Now(), Window: 1, Value: *resource.NewQuantity(metric.metricValue, resource.DecimalSI)}})
if err != nil {
t.Errorf("Cannot write metric %s to mock cache for test: %v", metricNodeName, err)
}
}

_, err = tt.args.enforcer.KubeClient.CoreV1().Nodes().Create(context.TODO(), tt.node, metav1.CreateOptions{})
if err != nil {
t.Errorf("Cannot write metric to mock cach for test: %v", err)
// create nodes
for _, node := range tt.nodes {
_, err := tt.args.enforcer.KubeClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
if err != nil {
t.Errorf("Cannot create node %s : %v", node.Name, err)
}
}

tt.args.enforcer.RegisterStrategyType(tt.d)
tt.args.enforcer.AddStrategy(tt.d, tt.d.StrategyType())
t.Run(tt.name, func(t *testing.T) {
got := []string{}
if _, err := tt.d.Enforce(tt.args.enforcer, tt.args.cache); (err != nil) != tt.wantErr {
t.Errorf("Strategy.Enforce() error = %v, wantErr %v", err, tt.wantErr)
if !strings.Contains(err.Error(), tt.wantErrMessageToken) {
t.Errorf("Expecting output to match wantErr %v, instead got %v", tt.wantErrMessageToken, err)

return
}
t.Errorf("Unexpected exception while trying to call Enforce %v", err)

return
}

labelledNodes, err := tt.args.enforcer.KubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "deschedule-test=violating"})
if err != nil {
if !tt.wantErr {
t.Errorf("Strategy.Enforce() error = %v, wantErr %v", err, tt.wantErr)
if !tt.wantErr {
// violating nodes
labeledNodes, err := tt.args.enforcer.KubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "deschedule-test=violating"})
if err != nil && !tt.wantErr {
t.Errorf("Unexpected exception while trying to fetch violating nodes %v", err)

return
}
assertViolatingNodes(t, labeledNodes, tt.want.labeledNodes)
nodes, _ := tt.args.enforcer.KubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
assertViolatingNodes(t, nodes, tt.want.nodes)
}
})
}
}

func TestDescheduleStrategy_Cleanup(t *testing.T) {
type args struct {
enforcer *strategy.MetricEnforcer
cache cache.ReaderWriter
}

clientWithException := testclient.NewSimpleClientset()
clientWithException.CoreV1().(*fake.FakeCoreV1).PrependReactor("list", "nodes", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &v1.NodeList{}, errMockTest
})

tests := []struct {
name string
d *Strategy
node *v1.Node
args args
wantErr bool
wantErrMessageToken string
want expected
}{
{name: "node with violating label",
d: &Strategy{PolicyName: "deschedule-test", Rules: []telpol.TASPolicyRule{
{Metricname: "memory", Operator: "GreaterThan", Target: 1},
{Metricname: "cpu", Operator: "LessThan", Target: 10}}},
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1", Labels: map[string]string{"deschedule-test": "violating", "node-1-label": "test"}}},
args: args{enforcer: strategy.NewEnforcer(testclient.NewSimpleClientset()),
cache: cache.MockEmptySelfUpdatingCache()},
want: expected{
nodes: map[string]map[string]string{"node-1": {"node-1-label": "test"}},
labeledNodes: map[string]map[string]string{},
}},
{name: "node without violating label",
d: &Strategy{PolicyName: "deschedule-test", Rules: []telpol.TASPolicyRule{
{Metricname: "memory", Operator: "GreaterThan", Target: 1000},
{Metricname: "cpu", Operator: "LessThan", Target: 10}}},
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{"deschedule-test": "", "node-2-label": "test"}}},
args: args{enforcer: strategy.NewEnforcer(testclient.NewSimpleClientset()),
cache: cache.MockEmptySelfUpdatingCache()},
want: expected{
nodes: map[string]map[string]string{"node-2": {"deschedule-test": "", "node-2-label": "test"}},
labeledNodes: map[string]map[string]string{},
}},
{name: "list nodes throws an error",
d: &Strategy{PolicyName: "deschedule-test", Rules: []telpol.TASPolicyRule{
{Metricname: "memory", Operator: "GreaterThan", Target: 1000},
{Metricname: "cpu", Operator: "LessThan", Target: 10}}},
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{"deschedule-test": "", "test": "label"}}},
args: args{enforcer: strategy.NewEnforcer(clientWithException),
cache: cache.MockEmptySelfUpdatingCache()},
wantErr: true,
wantErrMessageToken: failNodeListCleanUpMessage,
want: expected{}},
}

for _, tt := range tests {
tt := tt

_, err := tt.args.enforcer.KubeClient.CoreV1().Nodes().Create(context.TODO(), tt.node, metav1.CreateOptions{})
if err != nil {
t.Errorf("Cannot write metric to mock cach for test: %v", err)
}

t.Run(tt.name, func(t *testing.T) {
if err := tt.d.Cleanup(tt.args.enforcer, tt.d.PolicyName); (err != nil) != tt.wantErr {
if !strings.Contains(fmt.Sprint(err.Error()), tt.wantErrMessageToken) {
t.Errorf("Expecting output to match wantErr %v, instead got %v", tt.wantErrMessageToken, err)

return
}
t.Errorf("Strategy.Cleanup() unexpected error = %v, wantErr %v", err, tt.wantErr)

return
}
for _, node := range labelledNodes.Items {
got = append(got, node.Name)
}
nodys, _ := tt.args.enforcer.KubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
msg := fmt.Sprint(nodys.Items[0])
klog.InfoS(msg, "component", "testing")
if len(tt.want.nodeNames) != len(got) {
t.Errorf("Number of pods returned: %v not as expected: %v", got, tt.want.nodeNames)

if !tt.wantErr {
labelledNodes, err := tt.args.enforcer.KubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "deschedule-test=violating"})
if err != nil {
if !tt.wantErr {
t.Errorf("Strategy.Enforce() error = %v, wantErr %v", err, tt.wantErr)

return
}
t.Errorf("Unexpected error encountered while trying to filter for the deschedule-test=violating label...")

return
}
assertViolatingNodes(t, labelledNodes, tt.want.labeledNodes)
nodes, _ := tt.args.enforcer.KubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
assertViolatingNodes(t, nodes, tt.want.nodes)
}
})
}
Expand Down

0 comments on commit 533c754

Please sign in to comment.