forked from coreos/fleet
/
engine.go
189 lines (161 loc) · 4.27 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package engine
import (
"fmt"
"time"
log "github.com/coreos/fleet/Godeps/_workspace/src/github.com/golang/glog"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/registry"
)
const (
// name of role that represents the lead engine in a cluster
engineRoleName = "engine-leader"
)
type Engine struct {
rec *Reconciler
registry registry.Registry
rStream registry.EventStream
machine machine.Machine
lease registry.Lease
trigger chan struct{}
}
func New(reg registry.Registry, rStream registry.EventStream, mach machine.Machine) *Engine {
rec := NewReconciler()
return &Engine{rec, reg, rStream, mach, nil, make(chan struct{})}
}
func (e *Engine) Run(ival time.Duration, stop chan bool) {
leaseTTL := ival * 5
ticker := time.Tick(ival)
machID := e.machine.State().ID
reconcile := func() {
e.lease = ensureLeader(e.lease, e.registry, machID, leaseTTL)
if e.lease == nil {
return
}
// abort is closed when reconciliation must stop prematurely, either
// by a local timeout or the fleet server shutting down
abort := make(chan struct{})
// monitor is used to shut down the following goroutine
monitor := make(chan struct{})
go func() {
select {
case <-monitor:
return
case <-time.After(leaseTTL):
close(abort)
case <-stop:
close(abort)
}
}()
start := time.Now()
e.rec.Reconcile(e, abort)
close(monitor)
elapsed := time.Now().Sub(start)
msg := fmt.Sprintf("Engine completed reconciliation in %s", elapsed)
if elapsed > ival {
log.Warning(msg)
} else {
log.V(1).Info(msg)
}
}
trigger := make(chan struct{})
go func() {
for {
abort := make(chan struct{})
select {
case <-stop:
close(abort)
return
case <-e.rStream.Next(abort):
trigger <- struct{}{}
}
}
}()
for {
select {
case <-stop:
log.V(1).Info("Engine exiting due to stop signal")
return
case <-ticker:
log.V(1).Info("Engine tick")
reconcile()
case <-trigger:
log.V(1).Info("Engine reconcilation triggered by job state change")
reconcile()
}
}
}
func (e *Engine) Purge() {
if e.lease == nil {
return
}
err := e.lease.Release()
if err != nil {
log.Errorf("Failed to release lease: %v", err)
}
}
// ensureLeader will attempt to renew a non-nil Lease, falling back to
// acquiring a new Lease on the lead engine role.
func ensureLeader(prev registry.Lease, reg registry.Registry, machID string, ttl time.Duration) (cur registry.Lease) {
if prev != nil {
err := prev.Renew(ttl)
if err == nil {
log.V(1).Infof("Engine leadership renewed")
cur = prev
return
} else {
log.Errorf("Engine leadership lost, renewal failed: %v", err)
}
}
var err error
cur, err = reg.LeaseRole(engineRoleName, machID, ttl)
if err != nil {
log.Errorf("Engine leadership acquisition failed: %v", err)
} else if cur == nil {
log.V(1).Infof("Unable to acquire engine leadership")
} else {
log.Infof("Engine leadership acquired")
}
return
}
func (e *Engine) Trigger() {
e.trigger <- struct{}{}
}
func (e *Engine) clusterState() (*clusterState, error) {
units, err := e.registry.Units()
if err != nil {
log.Errorf("Failed fetching Units from Registry: %v", err)
return nil, err
}
sUnits, err := e.registry.Schedule()
if err != nil {
log.Errorf("Failed fetching schedule from Registry: %v", err)
return nil, err
}
machines, err := e.registry.Machines()
if err != nil {
log.Errorf("Failed fetching Machines from Registry: %v", err)
return nil, err
}
return newClusterState(units, sUnits, machines), nil
}
func (e *Engine) unscheduleUnit(name, machID string) (err error) {
err = e.registry.UnscheduleUnit(name, machID)
if err != nil {
log.Errorf("Failed unscheduling Unit(%s) from Machine(%s): %v", name, machID, err)
} else {
log.Infof("Unscheduled Job(%s) from Machine(%s)", name, machID)
}
return
}
// attemptScheduleUnit tries to persist a scheduling decision in the
// Registry, returning true on success. If any communication with the
// Registry fails, false is returned.
func (e *Engine) attemptScheduleUnit(name, machID string) bool {
err := e.registry.ScheduleUnit(name, machID)
if err != nil {
log.Errorf("Failed scheduling Unit(%s) to Machine(%s): %v", name, machID, err)
return false
}
log.Infof("Scheduled Unit(%s) to Machine(%s)", name, machID)
return true
}