forked from tormol/AIS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
periodic.go
166 lines (149 loc) · 4.18 KB
/
periodic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package logger
import (
"sync"
"time"
"github.com/cenkalti/backoff"
)
const (
periodicMinSleep = 2 * time.Second
periodicMaxSleep = 365 * 24 * time.Hour // FIXME max representable
)
// DebugPeriodicIntervals enables logging of periodic-logger intervals.
// After each logger is run the time until next run of that periodic logger is
// printed, as well as the time until any other logger if non-zero.
var DebugPeriodicIntervals = false
type loggerFunc func(l *Composer, sinceLast time.Duration)
// PeriodicLogger is a function that is ran periodically by a logger
type periodicLogger struct {
id string
logger loggerFunc
interval backoff.ExponentialBackOff
nextRun time.Time
lastRun time.Time
}
// groups related fields in Logger
type periodic struct {
timer *time.Timer
loggers []*periodicLogger
m sync.Mutex
stop bool // tell periodicRunner() to exit
}
func newPeriodic() periodic {
return periodic{
timer: time.NewTimer(periodicMaxSleep),
}
// NewLogger starts periodicRunner()
}
func (p *periodic) Close() {
p.m.Lock()
defer p.m.Unlock()
p.stop = true
p.timer.Stop()
p.timer.Reset(0)
}
// Find the logger with the least time remaining until it should be run,
// and update the timer to fire then.
func resetTimer(l *Logger, now time.Time) {
next := now.Add(periodicMaxSleep)
for _, pl := range l.p.loggers {
if next.After(pl.nextRun) {
next = pl.nextRun
}
}
if DebugPeriodicIntervals {
l.Debug("(%s until next periodic logger)", RoundDuration(next.Sub(now), time.Second/1000))
}
l.p.timer.Stop() // the channel is immediately drained by periodicRunner().
l.p.timer.Reset(next.Sub(now))
}
// Run all loggers that want to be run before (now + minSleep)
func runPeriodic(l *Logger, minSleep time.Duration, started time.Time) {
c := l.Compose(Info)
defer c.Close()
limit := started.Add(minSleep)
for _, pl := range l.p.loggers {
if limit.After(pl.nextRun) {
pl.logger(&c, started.Sub(pl.lastRun))
pl.lastRun = started
next := pl.interval.NextBackOff()
if next <= 0 {
// Cannot use l.Warn() because l.writeLock is locked by c
l.prefixMessage(Warning)
c.Writeln("Stopping periodic logger %s", pl.id)
next = periodicMaxSleep
}
if DebugPeriodicIntervals {
c.Writeln("(%s until next %s)", RoundDuration(next, time.Second), pl.id)
}
pl.nextRun = started.Add(next)
}
}
}
// Runs until l.p.stop is true
func periodicRunner(l *Logger) {
for {
now := <-l.p.timer.C
// Somebody else could take the lock here, but then no loggers will be run.
l.p.m.Lock()
if l.p.stop {
l.p.m.Unlock()
break
}
runPeriodic(l, periodicMinSleep, now)
resetTimer(l, now)
l.p.m.Unlock()
}
}
// RunAllPeriodic runs all the closures right now, ignoring any intervals.
func (l *Logger) RunAllPeriodic() {
l.p.m.Lock()
defer l.p.m.Unlock()
n := time.Now()
runPeriodic(l, periodicMaxSleep, n)
resetTimer(l, n)
}
// AddPeriodic stores a closure that will be called periodically
// with an interval that increases from minInterval to maxInterval exponentally.
func (l *Logger) AddPeriodic(id string, minInterval, maxInterval time.Duration, f loggerFunc) {
b := backoff.ExponentialBackOff{
InitialInterval: minInterval,
MaxInterval: maxInterval,
Multiplier: 3.0,
RandomizationFactor: 0.0,
MaxElapsedTime: 0, // disabled
Clock: backoff.SystemClock,
}
b.Reset()
l.p.m.Lock()
defer l.p.m.Unlock()
for _, p := range l.p.loggers {
if p.id == id {
l.Error("A periodic logger with ID %s already exists", id)
return
}
}
added := time.Now()
l.p.loggers = append(l.p.loggers, &periodicLogger{
id: id,
logger: f,
interval: b,
lastRun: added,
nextRun: added.Add(b.NextBackOff()),
})
resetTimer(l, added)
}
// RemovePeriodic removes a periodic logger so that it will never be called again.
// If it doesn't exist an error is printed to the logger.
func (l *Logger) RemovePeriodic(id string) {
l.p.m.Lock()
defer l.p.m.Unlock()
n := len(l.p.loggers)
for i := 0; i < n; i++ {
if id == l.p.loggers[i].id {
l.p.loggers[i] = l.p.loggers[n-1] // no-op if last
l.p.loggers = l.p.loggers[:n-1]
return
}
}
l.Error("There is no periodic logger with ID %s to remove", id)
}