-
Notifications
You must be signed in to change notification settings - Fork 574
/
even_pod_spread.go
151 lines (125 loc) · 5.43 KB
/
even_pod_spread.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
Copyright 2021 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package evenpodspread
import (
"context"
"encoding/json"
"math"
"strings"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing/pkg/scheduler/factory"
state "knative.dev/eventing/pkg/scheduler/state"
"knative.dev/pkg/logging"
)
// EvenPodSpread is a filter or score plugin that picks/favors pods that create an equal spread of resources across pods
type EvenPodSpread struct {
}
// Verify EvenPodSpread Implements FilterPlugin and ScorePlugin Interface
var _ state.FilterPlugin = &EvenPodSpread{}
var _ state.ScorePlugin = &EvenPodSpread{}
// Name of the plugin
const (
Name = state.EvenPodSpread
ErrReasonInvalidArg = "invalid arguments"
ErrReasonUnschedulable = "pod will cause an uneven spread"
)
func init() {
factory.RegisterFP(Name, &EvenPodSpread{})
factory.RegisterSP(Name, &EvenPodSpread{})
}
// Name returns name of the plugin
func (pl *EvenPodSpread) Name() string {
return Name
}
// Filter invoked at the filter extension point.
func (pl *EvenPodSpread) Filter(ctx context.Context, args interface{}, states *state.State, key types.NamespacedName, podID int32) *state.Status {
logger := logging.FromContext(ctx).With("Filter", pl.Name())
spreadArgs, ok := args.(string)
if !ok {
logger.Errorf("Filter args %v for predicate %q are not valid", args, pl.Name())
return state.NewStatus(state.Unschedulable, ErrReasonInvalidArg)
}
skewVal := state.EvenPodSpreadArgs{}
decoder := json.NewDecoder(strings.NewReader(spreadArgs))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&skewVal); err != nil {
return state.NewStatus(state.Unschedulable, ErrReasonInvalidArg)
}
if states.Replicas > 0 { //need at least a pod to compute spread
currentReps := states.PodSpread[key][state.PodNameFromOrdinal(states.StatefulSetName, podID)] //get #vreps on this podID
var skew int32
for _, otherPodID := range states.SchedulablePods { //compare with #vreps on other pods
if otherPodID != podID {
otherReps := states.PodSpread[key][state.PodNameFromOrdinal(states.StatefulSetName, otherPodID)]
if otherReps == 0 && states.Free(otherPodID) <= 0 { //other pod fully occupied by other vpods - so ignore
continue
}
if skew = (currentReps + 1) - otherReps; skew < 0 {
skew = skew * int32(-1)
}
//logger.Infof("Current Pod %d with %d and Other Pod %d with %d causing skew %d", podID, currentReps, otherPodID, otherReps, skew)
if skew > skewVal.MaxSkew {
logger.Infof("Unschedulable! Pod %d will cause an uneven spread %v with other pod %v", podID, states.PodSpread[key], otherPodID)
return state.NewStatus(state.Unschedulable, ErrReasonUnschedulable)
}
}
}
}
return state.NewStatus(state.Success)
}
// Score invoked at the score extension point. The "score" returned in this function is higher for pods that create an even spread across pods.
func (pl *EvenPodSpread) Score(ctx context.Context, args interface{}, states *state.State, feasiblePods []int32, key types.NamespacedName, podID int32) (uint64, *state.Status) {
logger := logging.FromContext(ctx).With("Score", pl.Name())
var score uint64 = 0
spreadArgs, ok := args.(string)
if !ok {
logger.Errorf("Scoring args %v for priority %q are not valid", args, pl.Name())
return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg)
}
skewVal := state.EvenPodSpreadArgs{}
decoder := json.NewDecoder(strings.NewReader(spreadArgs))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&skewVal); err != nil {
return 0, state.NewStatus(state.Unschedulable, ErrReasonInvalidArg)
}
if states.Replicas > 0 { //need at least a pod to compute spread
currentReps := states.PodSpread[key][state.PodNameFromOrdinal(states.StatefulSetName, podID)] //get #vreps on this podID
var skew int32
for _, otherPodID := range states.SchedulablePods { //compare with #vreps on other pods
if otherPodID != podID {
otherReps := states.PodSpread[key][state.PodNameFromOrdinal(states.StatefulSetName, otherPodID)]
if otherReps == 0 && states.Free(otherPodID) == 0 { //other pod fully occupied by other vpods - so ignore
continue
}
if skew = (currentReps + 1) - otherReps; skew < 0 {
skew = skew * int32(-1)
}
//logger.Infof("Current Pod %d with %d and Other Pod %d with %d causing skew %d", podID, currentReps, otherPodID, otherReps, skew)
if skew > skewVal.MaxSkew {
logger.Infof("Pod %d will cause an uneven spread %v with other pod %v", podID, states.PodSpread[key], otherPodID)
}
score = score + uint64(skew)
}
}
score = math.MaxUint64 - score //lesser skews get higher score
}
return score, state.NewStatus(state.Success)
}
// ScoreExtensions of the Score plugin.
func (pl *EvenPodSpread) ScoreExtensions() state.ScoreExtensions {
return pl
}
// NormalizeScore invoked after scoring all pods.
func (pl *EvenPodSpread) NormalizeScore(ctx context.Context, states *state.State, scores state.PodScoreList) *state.Status {
return nil
}