forked from hashicorp/nomad
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stack.go
167 lines (138 loc) · 5.24 KB
/
stack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package scheduler
import (
"math"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// serviceJobAntiAffinityPenalty is the penalty applied
// to the score for placing an alloc on a node that
// already has an alloc for this job.
serviceJobAntiAffinityPenalty = 10.0
// batchJobAntiAffinityPenalty is the same as the
// serviceJobAntiAffinityPenalty but for batch type jobs.
batchJobAntiAffinityPenalty = 5.0
)
// Stack is a chained collection of iterators. The stack is used to
// make placement decisions. Different schedulers may customize the
// stack they use to vary the way placements are made.
type Stack interface {
// SetNodes is used to set the base set of potential nodes
SetNodes([]*structs.Node)
// SetTaskGroup is used to set the job for selection
SetJob(job *structs.Job)
// Select is used to select a node for the task group
Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources)
}
// GenericStack is the Stack used for the Generic scheduler. It is
// designed to make better placement decisions at the cost of performance.
type GenericStack struct {
batch bool
ctx Context
source *StaticIterator
jobConstraint *ConstraintIterator
taskGroupDrivers *DriverIterator
taskGroupConstraint *ConstraintIterator
binPack *BinPackIterator
jobAntiAff *JobAntiAffinityIterator
limit *LimitIterator
maxScore *MaxScoreIterator
}
// NewGenericStack constructs a stack used for selecting service placements
func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *GenericStack {
// Create a new stack
s := &GenericStack{
batch: batch,
ctx: ctx,
}
// Create the source iterator. We randomize the order we visit nodes
// to reduce collisions between schedulers and to do a basic load
// balancing across eligible nodes.
s.source = NewRandomIterator(ctx, baseNodes)
// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintIterator(ctx, s.source, nil)
// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverIterator(ctx, s.jobConstraint, nil)
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintIterator(ctx, s.taskGroupDrivers, nil)
// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.taskGroupConstraint)
// Apply the bin packing, this depends on the resources needed
// by a particular task group. Only enable eviction for the service
// scheduler as that logic is expensive.
evict := !batch
s.binPack = NewBinPackIterator(ctx, rankSource, evict, 0)
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job. The penalty
// is less for batch jobs as it matters less.
penalty := serviceJobAntiAffinityPenalty
if batch {
penalty = batchJobAntiAffinityPenalty
}
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, penalty, "")
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.jobAntiAff, 2)
// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
// Set the nodes if given
if len(baseNodes) != 0 {
s.SetNodes(baseNodes)
}
return s
}
func (s *GenericStack) SetNodes(baseNodes []*structs.Node) {
// Shuffle base nodes
shuffleNodes(baseNodes)
// Update the set of base nodes
s.source.SetNodes(baseNodes)
// Apply a limit function. This is to avoid scanning *every* possible node.
// For batch jobs we only need to evaluate 2 options and depend on the
// powwer of two choices. For services jobs we need to visit "enough".
// Using a log of the total number of nodes is a good restriction, with
// at least 2 as the floor
limit := 2
if n := len(baseNodes); !s.batch && n > 0 {
logLimit := int(math.Ceil(math.Log2(float64(n))))
if logLimit > limit {
limit = logLimit
}
}
s.limit.SetLimit(limit)
}
func (s *GenericStack) SetJob(job *structs.Job) {
s.jobConstraint.SetConstraints(job.Constraints)
s.binPack.SetPriority(job.Priority)
s.jobAntiAff.SetJob(job.ID)
}
func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) {
// Reset the max selector and context
s.maxScore.Reset()
s.ctx.Reset()
start := time.Now()
// Collect the constraints, drivers and resources required by each
// sub-task to aggregate the TaskGroup totals
constr := make([]*structs.Constraint, 0, len(tg.Constraints))
drivers := make(map[string]struct{})
size := new(structs.Resources)
constr = append(constr, tg.Constraints...)
for _, task := range tg.Tasks {
drivers[task.Driver] = struct{}{}
constr = append(constr, task.Constraints...)
size.Add(task.Resources)
}
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(drivers)
s.taskGroupConstraint.SetConstraints(constr)
s.binPack.SetTasks(tg.Tasks)
// Find the node with the max score
option := s.maxScore.Next()
// Ensure that the task resources were specified
if option != nil && len(option.TaskResources) != len(tg.Tasks) {
for _, task := range tg.Tasks {
option.SetTaskResources(task, task.Resources)
}
}
// Store the compute time
s.ctx.Metrics().AllocationTime = time.Since(start)
return option, size
}