forked from hashicorp/nomad
/
draining_node.go
155 lines (126 loc) · 3.48 KB
/
draining_node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package drainer
import (
"fmt"
"sync"
"time"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
type drainingNode struct {
state *state.StateStore
node *structs.Node
l sync.RWMutex
}
func NewDrainingNode(node *structs.Node, state *state.StateStore) *drainingNode {
return &drainingNode{
state: state,
node: node,
}
}
func (n *drainingNode) GetNode() *structs.Node {
n.l.Lock()
defer n.l.Unlock()
return n.node
}
func (n *drainingNode) Update(node *structs.Node) {
n.l.Lock()
defer n.l.Unlock()
n.node = node
}
// DeadlineTime returns if the node has a deadline and if so what it is
func (n *drainingNode) DeadlineTime() (bool, time.Time) {
n.l.RLock()
defer n.l.RUnlock()
// Should never happen
if n.node == nil || n.node.DrainStrategy == nil {
return false, time.Time{}
}
return n.node.DrainStrategy.DeadlineTime()
}
// IsDone returns if the node is done draining batch and service allocs. System
// allocs must be stopped before marking drain complete unless they're being
// ignored.
func (n *drainingNode) IsDone() (bool, error) {
n.l.RLock()
defer n.l.RUnlock()
// Should never happen
if n.node == nil || n.node.DrainStrategy == nil {
return false, fmt.Errorf("node doesn't have a drain strategy set")
}
// Retrieve the allocs on the node
allocs, err := n.state.AllocsByNode(nil, n.node.ID)
if err != nil {
return false, err
}
for _, alloc := range allocs {
// System jobs are only stopped after a node is done draining
// everything else, so ignore them here.
if alloc.Job.Type == structs.JobTypeSystem {
continue
}
// If there is a non-terminal we aren't done
if !alloc.TerminalStatus() {
return false, nil
}
}
return true, nil
}
// RemainingAllocs returns the set of allocations remaining on a node that
// still need to be drained.
func (n *drainingNode) RemainingAllocs() ([]*structs.Allocation, error) {
n.l.RLock()
defer n.l.RUnlock()
// Should never happen
if n.node == nil || n.node.DrainStrategy == nil {
return nil, fmt.Errorf("node doesn't have a drain strategy set")
}
// Grab the relevant drain info
ignoreSystem := n.node.DrainStrategy.IgnoreSystemJobs
// Retrieve the allocs on the node
allocs, err := n.state.AllocsByNode(nil, n.node.ID)
if err != nil {
return nil, err
}
var drain []*structs.Allocation
for _, alloc := range allocs {
// Nothing to do on a terminal allocation
if alloc.TerminalStatus() {
continue
}
// Skip system if configured to
if alloc.Job.Type == structs.JobTypeSystem && ignoreSystem {
continue
}
drain = append(drain, alloc)
}
return drain, nil
}
// DrainingJobs returns the set of jobs on the node that can block a drain.
// These include batch and service jobs.
func (n *drainingNode) DrainingJobs() ([]structs.NamespacedID, error) {
n.l.RLock()
defer n.l.RUnlock()
// Should never happen
if n.node == nil || n.node.DrainStrategy == nil {
return nil, fmt.Errorf("node doesn't have a drain strategy set")
}
// Retrieve the allocs on the node
allocs, err := n.state.AllocsByNode(nil, n.node.ID)
if err != nil {
return nil, err
}
jobIDs := make(map[structs.NamespacedID]struct{})
var jobs []structs.NamespacedID
for _, alloc := range allocs {
if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
continue
}
jns := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
if _, ok := jobIDs[jns]; ok {
continue
}
jobIDs[jns] = struct{}{}
jobs = append(jobs, jns)
}
return jobs, nil
}