-
Notifications
You must be signed in to change notification settings - Fork 0
/
shuffleOutCloser.go
106 lines (87 loc) · 2.84 KB
/
shuffleOutCloser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package closing
import (
"context"
"fmt"
"time"
"github.com/subrahamanyam341/andes-xyz/core"
"github.com/subrahamanyam341/andes-xyz/core/check"
"github.com/subrahamanyam341/andes-xyz/core/random"
"github.com/subrahamanyam341/andes-xyz/data/endProcess"
)
const minDuration = time.Second
type shuffleOutCloser struct {
minWaitDuration time.Duration
maxWaitDuration time.Duration
signalChan chan endProcess.ArgEndProcess
randomizer IntRandomizer
log core.Logger
ctx context.Context
cancelFunc func()
}
// NewShuffleOutCloser creates a shuffle out component that is able to trigger a node restart and cancel that request if necessarily
func NewShuffleOutCloser(
minWaitDuration time.Duration,
maxWaitDuration time.Duration,
signalChan chan endProcess.ArgEndProcess,
log core.Logger,
) (*shuffleOutCloser, error) {
if minWaitDuration < minDuration {
return nil, fmt.Errorf("%w for minWaitDuration", core.ErrInvalidValue)
}
if maxWaitDuration < minDuration {
return nil, fmt.Errorf("%w for maxWaitDuration", core.ErrInvalidValue)
}
if minWaitDuration > maxWaitDuration {
return nil, fmt.Errorf("%w, minWaitDuration > maxWaitDuration", core.ErrInvalidValue)
}
if signalChan == nil {
return nil, core.ErrNilSignalChan
}
if check.IfNil(log) {
return nil, core.ErrNilLogger
}
soc := &shuffleOutCloser{
minWaitDuration: minWaitDuration,
maxWaitDuration: maxWaitDuration,
signalChan: signalChan,
randomizer: &random.ConcurrentSafeIntRandomizer{},
log: log,
}
soc.ctx, soc.cancelFunc = context.WithCancel(context.Background())
return soc, nil
}
// EndOfProcessingHandler will be called each time a delayed end of processing is needed
func (soc *shuffleOutCloser) EndOfProcessingHandler(event endProcess.ArgEndProcess) error {
go soc.writeOnChanDelayed(event)
return nil
}
func (soc *shuffleOutCloser) writeOnChanDelayed(event endProcess.ArgEndProcess) {
delta := soc.maxWaitDuration - soc.minWaitDuration
randDurationBeforeStop := soc.randomizer.Intn(int(delta))
timeToWait := soc.minWaitDuration + time.Duration(randDurationBeforeStop)
soc.log.Info("the application will stop in",
"waiting time", fmt.Sprintf("%v", timeToWait),
"description", event.Description,
"reason", event.Reason)
select {
case <-time.After(timeToWait):
case <-soc.ctx.Done():
soc.log.Debug("canceled the application stop go routine")
return
}
soc.log.Info("the application will stop now after",
"waiting time", fmt.Sprintf("%v", timeToWait),
"description", event.Description,
"reason", event.Reason,
)
soc.signalChan <- event
}
// Close cancels the channel write
func (soc *shuffleOutCloser) Close() error {
soc.cancelFunc()
return nil
}
// IsInterfaceNil returns true if there is no value under the interface
func (soc *shuffleOutCloser) IsInterfaceNil() bool {
return soc == nil
}