Skip to content

Commit

Permalink
Do not re-patch when the node is still violating the strategy and the…
Browse files Browse the repository at this point in the history
… label is still present

This commit will:
* not patch a node if violating label already applied
* refactor log messages to match existing statements

Signed-off-by: Madalina Lazar <madalina.lazar@intel.com>
  • Loading branch information
madalazar committed Dec 22, 2022
1 parent 533c754 commit 6106d3f
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 33 deletions.
58 changes: 39 additions & 19 deletions telemetry-aware-scheduling/pkg/strategies/deschedule/enforce.go
Expand Up @@ -21,6 +21,9 @@ const (
l4 = 4
failNodeListCleanUpMessage = "failed to list nodes during clean-up"
failNodeListEnforceMessage = "failed to list all nodes during enforce"
failNodePatchMessage = "failed to patch node"
failedLabelingMessage = "could not label"
defaultPolicyValue = "violating"
)

var errNull = errors.New("")
Expand All @@ -33,6 +36,14 @@ type patchValue struct {
Value string `json:"value"`
}

func createLabelPatchValue(op, labelName, value string) *patchValue {
return &patchValue{
Op: op,
Path: "/metadata/labels/" + labelName,
Value: value,
}
}

// Cleanup remove node labels for violating when policy is deleted.
func (d *Strategy) Cleanup(enforcer *strategy.MetricEnforcer, policyName string) error {
lbls := metav1.LabelSelector{MatchLabels: map[string]string{policyName: "violating"}}
Expand All @@ -47,12 +58,12 @@ func (d *Strategy) Cleanup(enforcer *strategy.MetricEnforcer, policyName string)

for _, node := range nodes.Items {
var payload []patchValue

if _, ok := node.Labels[policyName]; ok {
payload = append(payload,
patchValue{
Op: "remove",
Path: "/metadata/labels/" + policyName,
})
msg := fmt.Sprintf("patch %s label for removal with empty value", policyName)
klog.V(l2).InfoS(msg, "component", "controller")

payload = append(payload, *createLabelPatchValue("remove", policyName, ""))
}

err := d.patchNode(node.Name, enforcer, payload)
Expand Down Expand Up @@ -104,7 +115,7 @@ func (d *Strategy) patchNode(nodeName string, enforcer *strategy.MetricEnforcer,
if err != nil {
klog.V(l4).InfoS(err.Error(), "component", "controller")

return fmt.Errorf("failed to patch %v the node: %w", payload, err)
return fmt.Errorf("%s with %v: %w", failNodePatchMessage, payload, err)
}

return nil
Expand All @@ -120,6 +131,21 @@ func allPolicies(enforcer *strategy.MetricEnforcer) map[string]interface{} {
return policies
}

// appendViolationPatchValue appends a de-scheduling patch to a node if it doesn't already exist.
// It returns the given payload appended by any patch value.
func appendViolationPatchValue(payload []patchValue, policyName string, node v1.Node) []patchValue {
labelValue, ok := node.Labels[policyName]

if !ok || (ok && labelValue != defaultPolicyValue) {
msg := fmt.Sprintf("patching for violation %s with value %s", policyName, defaultPolicyValue)
klog.V(l2).InfoS(msg, "component", "controller")

payload = append(payload, *createLabelPatchValue("add", policyName, defaultPolicyValue))
}

return payload
}

// updateNodeLabels takes the list of nodes violating the strategy.
// It then sets the payloads for labelling them as violators and calls for them to be labelled.
func (d *Strategy) updateNodeLabels(enforcer *strategy.MetricEnforcer, viols violationList, allNodes *v1.NodeList) (int, error) {
Expand All @@ -131,30 +157,24 @@ func (d *Strategy) updateNodeLabels(enforcer *strategy.MetricEnforcer, viols vio
var nonViolatedPolicies map[string]interface{}

for _, node := range allNodes.Items {
payload := []patchValue{}
var payload []patchValue

nonViolatedPolicies = allPolicies(enforcer)
violatedPolicies := ""

for _, policyName := range viols[node.Name] {
delete(nonViolatedPolicies, policyName)

payload = append(payload,
patchValue{
Op: "add",
Path: "/metadata/labels/" + policyName,
Value: "violating",
})
payload = appendViolationPatchValue(payload, policyName, node)
violatedPolicies += policyName + ", "
}

for policyName := range nonViolatedPolicies {
if _, ok := node.Labels[policyName]; ok {
payload = append(payload,
patchValue{
Op: "remove",
Path: "/metadata/labels/" + policyName,
Value: "",
})
klog.V(l2).InfoS("patching for removal", "name", policyName,
"labelValue", "")

payload = append(payload, *createLabelPatchValue("remove", policyName, ""))
}
totalViolations++
}
Expand Down
Expand Up @@ -61,18 +61,32 @@ func assertViolatingNodes(t *testing.T, nodeList *v1.NodeList, wantNodes map[str
}
}

func TestDescheduleStrategy_Enforce(t *testing.T) {
type args struct {
enforcer *strategy.MetricEnforcer
cache cache.ReaderWriter
}

func getClientWithListException() *testclient.Clientset {
clientWithListNodeException := testclient.NewSimpleClientset()
clientWithListNodeException.CoreV1().(*fake.FakeCoreV1).PrependReactor("list", "nodes",
func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &v1.NodeList{}, errMockTest
})

return clientWithListNodeException
}

func getClientWithPatchException() *testclient.Clientset {
clientWithPatchException := testclient.NewSimpleClientset()
clientWithPatchException.CoreV1().(*fake.FakeCoreV1).PrependReactor("patch", "nodes",
func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errMockTest
})

return clientWithPatchException
}

func TestDescheduleStrategy_Enforce(t *testing.T) {
type args struct {
enforcer *strategy.MetricEnforcer
cache cache.ReaderWriter
}

tests := []struct {
name string
d *Strategy
Expand Down Expand Up @@ -123,11 +137,22 @@ func TestDescheduleStrategy_Enforce(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{"deschedule-test": "violating", "node-2-label": "test"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "node-3", Labels: map[string]string{"node-3-label": "test"}}}},
cacheMetrics: map[string]CacheMetric{"node-2": {"cpu", 11}, "node-3": {"memory", 100}},
args: args{enforcer: strategy.NewEnforcer(clientWithListNodeException),
args: args{enforcer: strategy.NewEnforcer(getClientWithListException()),
cache: cache.MockEmptySelfUpdatingCache()},
want: expected{},
wantErr: true,
wantErrMessageToken: failNodeListEnforceMessage},
{name: "list nodes with patch exception",
d: &Strategy{PolicyName: "deschedule-test", Rules: []telpol.TASPolicyRule{
{Metricname: "memory", Operator: "GreaterThan", Target: 1000},
{Metricname: "cpu", Operator: "LessThan", Target: 10}}},
nodes: []*v1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1", Labels: map[string]string{"deschedule-test": "violating", "node-1-label": "test"}}}},
cacheMetrics: map[string]CacheMetric{"node-1": {"cpu", 40}},
args: args{enforcer: strategy.NewEnforcer(getClientWithPatchException()),
cache: cache.MockEmptySelfUpdatingCache()},
want: expected{},
wantErr: true,
wantErrMessageToken: failedLabelingMessage},
}
for _, tt := range tests {
tt := tt
Expand Down Expand Up @@ -184,11 +209,6 @@ func TestDescheduleStrategy_Cleanup(t *testing.T) {
cache cache.ReaderWriter
}

clientWithException := testclient.NewSimpleClientset()
clientWithException.CoreV1().(*fake.FakeCoreV1).PrependReactor("list", "nodes", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &v1.NodeList{}, errMockTest
})

tests := []struct {
name string
d *Strategy
Expand Down Expand Up @@ -225,11 +245,24 @@ func TestDescheduleStrategy_Cleanup(t *testing.T) {
{Metricname: "memory", Operator: "GreaterThan", Target: 1000},
{Metricname: "cpu", Operator: "LessThan", Target: 10}}},
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{"deschedule-test": "", "test": "label"}}},
args: args{enforcer: strategy.NewEnforcer(clientWithException),
args: args{enforcer: strategy.NewEnforcer(getClientWithListException()),
cache: cache.MockEmptySelfUpdatingCache()},
wantErr: true,
wantErrMessageToken: failNodeListCleanUpMessage,
want: expected{}},
{name: "patch nodes throws an error",
d: &Strategy{PolicyName: "deschedule-test", Rules: []telpol.TASPolicyRule{
{Metricname: "memory", Operator: "GreaterThan", Target: 1000},
{Metricname: "cpu", Operator: "LessThan", Target: 10}}},
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{"deschedule-test": "violating", "test": "label"}}},
args: args{enforcer: strategy.NewEnforcer(getClientWithPatchException()),
cache: cache.MockEmptySelfUpdatingCache()},
wantErr: false,
wantErrMessageToken: failNodePatchMessage,
want: expected{
nodes: map[string]map[string]string{"node-2": {"deschedule-test": "violating", "test": "label"}},
labeledNodes: map[string]map[string]string{"node-2": {"deschedule-test": "violating", "test": "label"}},
}},
}

for _, tt := range tests {
Expand All @@ -241,7 +274,8 @@ func TestDescheduleStrategy_Cleanup(t *testing.T) {
}

t.Run(tt.name, func(t *testing.T) {
if err := tt.d.Cleanup(tt.args.enforcer, tt.d.PolicyName); (err != nil) != tt.wantErr {
err := tt.d.Cleanup(tt.args.enforcer, tt.d.PolicyName)
if (err != nil) != tt.wantErr {
if !strings.Contains(fmt.Sprint(err.Error()), tt.wantErrMessageToken) {
t.Errorf("Expecting output to match wantErr %v, instead got %v", tt.wantErrMessageToken, err)

Expand Down

0 comments on commit 6106d3f

Please sign in to comment.