-
Notifications
You must be signed in to change notification settings - Fork 50
/
stage.go
184 lines (143 loc) · 4.25 KB
/
stage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package kernel
import (
"context"
"errors"
"fmt"
"strings"
"github.com/justtrackio/gosoline/pkg/cfg"
"github.com/justtrackio/gosoline/pkg/clock"
"github.com/justtrackio/gosoline/pkg/coffin"
"github.com/justtrackio/gosoline/pkg/conc"
"github.com/justtrackio/gosoline/pkg/log"
)
var ErrKernelStopping = fmt.Errorf("stopping kernel")
type modules struct {
lck conc.PoisonedLock
modules map[string]*moduleState
}
func (m modules) len() int {
return len(m.modules)
}
type stage struct {
cfn coffin.Coffin
ctx context.Context
clk clock.Clock
logger log.Logger
index int
healthCheckSettings HealthCheckSettings
err error
running conc.SignalOnce
terminated conc.SignalOnce
modules modules
}
func newStage(ctx context.Context, config cfg.Config, logger log.Logger, index int) *stage {
cfn, ctx := coffin.WithContext(ctx)
settings := &Settings{}
config.UnmarshalKey("kernel", settings)
return &stage{
cfn: cfn,
ctx: ctx,
clk: clock.NewRealClock(),
logger: logger,
index: index,
healthCheckSettings: settings.HealthCheck,
running: conc.NewSignalOnce(),
terminated: conc.NewSignalOnce(),
modules: modules{
lck: conc.NewPoisonedLock(),
modules: make(map[string]*moduleState),
},
}
}
func (s *stage) run(k *kernel) error {
if err := s.modules.lck.Poison(); err != nil {
return fmt.Errorf("stage was already run: %w", err)
}
for name, ms := range s.modules.modules {
s.cfn.Gof(func(name string, ms *moduleState) func() error {
return func() error {
// wait until every routine of the stage was spawned
// if a module exists too fast, we have a race condition
// regarding the precondition of tomb.Go (namely that no
// new routine may be added after the last one exited)
<-s.running.Channel()
resultErr := k.runModule(s.ctx, name, ms)
if resultErr != nil {
k.Stop(fmt.Sprintf("module %s returned with an error", name))
}
return resultErr
}
}(name, ms), "panic during running of module %s", name)
}
s.running.Signal()
return s.waitUntilHealthy()
}
func (s *stage) healthcheck() HealthCheckResult {
var ok bool
var err error
var healthAware HealthCheckedModule
var result HealthCheckResult
for name, ms := range s.modules.modules {
if healthAware, ok = ms.module.(HealthCheckedModule); !ok {
continue
}
ok, err = func() (ok bool, err error) {
defer func() {
if err != nil {
return
}
err = coffin.ResolveRecovery(recover())
}()
return healthAware.IsHealthy(s.ctx)
}()
result = append(result, ModuleHealthCheckResult{
StageIndex: s.index,
Name: name,
Healthy: ok,
Err: err,
})
}
return result
}
func (s *stage) waitUntilHealthy() error {
var result HealthCheckResult
waitStart := s.clk.Now()
timeoutTimer := clock.NewRealTimer(s.healthCheckSettings.Timeout)
sleepTicker := clock.NewRealTicker(s.healthCheckSettings.WaitInterval)
defer timeoutTimer.Stop()
defer sleepTicker.Stop()
for {
sleepTicker.Stop()
result = s.healthcheck()
if result.Err() != nil {
s.logger.Warn("errors during health checks in stage %d: %s", s.index, result.Err())
}
if result.IsHealthy() {
return nil
}
for _, unhealthy := range result.GetUnhealthy() {
timeLeft := s.healthCheckSettings.Timeout - s.clk.Since(waitStart)
s.logger.Info("waiting %s for module %s in stage %d to get healthy: time left %s", s.healthCheckSettings.WaitInterval, unhealthy.Name, s.index, timeLeft)
}
sleepTicker.Reset(s.healthCheckSettings.WaitInterval)
select {
case <-timeoutTimer.Chan():
unhealthyModules := result.GetUnhealthyNames()
return fmt.Errorf("stage %d was not able to get healthy in %s due to: %s", s.index, s.healthCheckSettings.Timeout, strings.Join(unhealthyModules, ", "))
case <-s.ctx.Done():
return nil
case <-sleepTicker.Chan():
}
}
}
func (s *stage) stopWait() {
s.cfn.Kill(ErrKernelStopping)
s.err = s.cfn.Wait()
if s.err != nil && !errors.Is(s.err, ErrKernelStopping) {
s.logger.Error("error during the execution of stage %d: %w", s.index, s.err)
}
s.terminated.Signal()
}
func (s *stage) len() int {
return s.modules.len()
}