-
Notifications
You must be signed in to change notification settings - Fork 2
/
misseddata.go
95 lines (82 loc) · 2.2 KB
/
misseddata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package controller
import (
"context"
"sync"
"time"
"github.com/insolar/block-explorer/instrumentation/belogger"
)
type missedData struct {
ts time.Time
fromPulse int64
toPulse int64
}
// MissedDataManager manages working with missed data pool
// It's thread safe
type MissedDataManager struct {
mutex sync.Mutex
missedDataPool []missedData
ttl time.Duration
cleanPeriod time.Duration
stopped chan struct{}
}
// NewMissedDataManager creates new missed data manager with custom params
func NewMissedDataManager(ttl time.Duration, cleanPeriod time.Duration) *MissedDataManager {
mdm := MissedDataManager{
ttl: ttl,
cleanPeriod: cleanPeriod,
stopped: make(chan struct{}),
}
return &mdm
}
func (mdm *MissedDataManager) Start() {
ticker := time.NewTicker(mdm.cleanPeriod)
go func() {
var stop = false
for !stop {
select {
case <-ticker.C:
mdm.deleteExpired()
case <-mdm.stopped:
stop = true
ticker.Stop()
}
}
mdm.stopped <- struct{}{}
}()
}
func (mdm *MissedDataManager) Stop() {
mdm.stopped <- struct{}{}
<-mdm.stopped
}
// Add adds missed data to pool
func (mdm *MissedDataManager) Add(ctx context.Context, fromPulse, toPulse int64) bool {
mdm.mutex.Lock()
defer mdm.mutex.Unlock()
for _, missed := range mdm.missedDataPool {
if missed.fromPulse <= fromPulse && missed.toPulse >= toPulse {
belogger.FromContext(ctx).Infof("Data from pulse %d to %d was already reload", fromPulse, toPulse)
return false
}
}
mdm.missedDataPool = append(mdm.missedDataPool, missedData{
ts: time.Now(),
fromPulse: fromPulse,
toPulse: toPulse,
})
return true
}
func (mdm *MissedDataManager) isExpired(ts time.Time) bool {
return time.Since(ts) > mdm.ttl
}
func (mdm *MissedDataManager) deleteExpired() {
mdm.mutex.Lock()
defer mdm.mutex.Unlock()
for i, missed := range mdm.missedDataPool {
length := len(mdm.missedDataPool)
if i < length && mdm.isExpired(missed.ts) {
mdm.missedDataPool[i] = mdm.missedDataPool[length-1] // Copy last element to index i.
mdm.missedDataPool[length-1] = missedData{} // Erase last element (write zero value).
mdm.missedDataPool = mdm.missedDataPool[:length-1]
}
}
}