-
Notifications
You must be signed in to change notification settings - Fork 211
/
clock.go
103 lines (88 loc) · 2.67 KB
/
clock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package timesync
import (
"sync"
"time"
"github.com/spacemeshos/go-spacemesh/log"
)
// Clock defines the functionality needed from any clock type.
type Clock interface {
Now() time.Time
}
// RealClock is the struct wrapping a local time struct.
type RealClock struct{}
// Now returns the current local time.
func (RealClock) Now() time.Time {
return time.Now()
}
// TimeClock is the struct holding a real clock.
type TimeClock struct {
*Ticker
tickInterval time.Duration
startEpoch time.Time
stop chan struct{}
once sync.Once
log log.Log
}
// NewClock return TimeClock struct that notifies tickInterval has passed.
func NewClock(c Clock, tickInterval time.Duration, genesisTime time.Time, logger log.Log) *TimeClock {
if tickInterval == 0 {
logger.Panic("could not create new clock: bad configuration: tick interval is zero")
}
t := &TimeClock{
Ticker: NewTicker(c, LayerConv{duration: tickInterval, genesis: genesisTime}, WithLog(logger)),
tickInterval: tickInterval,
startEpoch: genesisTime,
stop: make(chan struct{}),
once: sync.Once{},
log: logger,
}
go t.startClock()
return t
}
func (t *TimeClock) startClock() {
t.log.Info("starting global clock now=%v genesis=%v %p", t.clock.Now(), t.startEpoch, t)
for {
currLayer := t.Ticker.TimeToLayer(t.clock.Now()) // get current layer
nextTickTime := t.Ticker.LayerToTime(currLayer.Add(1)) // get next tick time for the next layer
diff := nextTickTime.Sub(t.clock.Now())
tmr := time.NewTimer(diff)
t.log.With().Info("global clock going to sleep before next layer",
log.String("diff", diff.String()),
log.FieldNamed("curr_layer", currLayer))
select {
case <-tmr.C:
t.m.Lock()
subscriberCount := len(t.subscribers)
t.m.Unlock()
t.log.With().Info("clock notifying subscribers of new layer tick",
log.Int("subscriber_count", subscriberCount),
t.TimeToLayer(t.clock.Now()))
// notify subscribers
if missed, err := t.Notify(); err != nil {
t.log.With().Error("could not notify subscribers",
log.Err(err),
log.Int("missed", missed))
}
case <-t.stop:
tmr.Stop()
t.log.Info("stopping global clock %p", t)
return
}
}
}
// GetGenesisTime returns at which time this clock has started (used to calculate current tick).
func (t *TimeClock) GetGenesisTime() time.Time {
return t.startEpoch
}
// GetInterval returns the time interval between clock ticks.
func (t *TimeClock) GetInterval() time.Duration {
return t.tickInterval
}
// Close closes the clock ticker.
func (t *TimeClock) Close() {
t.log.Info("closing clock %p", t)
t.once.Do(func() {
t.log.Info("closed clock %p", t)
close(t.stop)
})
}