forked from hashicorp/consul
/
tombstone_gc.go
150 lines (129 loc) · 3.95 KB
/
tombstone_gc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package consul
import (
"fmt"
"sync"
"time"
)
// TombstoneGC is used to track creation of tombstones
// so that they can be garbage collected after their TTL
// expires. The tombstones allow queries to provide monotonic
// index values within the TTL window. The GC is used to
// prevent monotonic growth in storage usage. This is a trade off
// between the length of the TTL and the storage overhead.
//
// In practice, this is required to fix the issue of delete
// visibility. When data is deleted from the KV store, the
// "latest" row can go backwards if the newest row is removed.
// The tombstones provide a way to ensure time doesn't move
// backwards within some interval.
//
type TombstoneGC struct {
ttl time.Duration
granularity time.Duration
// enabled controls if we actually setup any timers.
enabled bool
// expires maps the time of expiration to the highest
// tombstone value that should be expired.
expires map[time.Time]*expireInterval
// expireCh is used to stream expiration
expireCh chan uint64
// lock is used to ensure safe access to all the fields
lock sync.Mutex
}
// expireInterval is used to track the maximum index
// to expire in a given interval with a timer
type expireInterval struct {
maxIndex uint64
timer *time.Timer
}
// NewTombstoneGC is used to construct a new TombstoneGC given
// a TTL for tombstones and a tracking granularity. Longer TTLs
// ensure correct behavior for more time, but use more storage.
// A shorter granularity increases the number of Raft transactions
// and reduce how far past the TTL we perform GC.
func NewTombstoneGC(ttl, granularity time.Duration) (*TombstoneGC, error) {
// Sanity check the inputs
if ttl <= 0 || granularity <= 0 {
return nil, fmt.Errorf("Tombstone TTL and granularity must be positive")
}
t := &TombstoneGC{
ttl: ttl,
granularity: granularity,
enabled: false,
expires: make(map[time.Time]*expireInterval),
expireCh: make(chan uint64, 1),
}
return t, nil
}
// ExpireCh is used to return a channel that streams the next index
// that should be expired
func (t *TombstoneGC) ExpireCh() <-chan uint64 {
return t.expireCh
}
// SetEnabled is used to control if the tombstone GC is
// enabled. Should only be enabled by the leader node.
func (t *TombstoneGC) SetEnabled(enabled bool) {
t.lock.Lock()
defer t.lock.Unlock()
if enabled == t.enabled {
return
}
// Stop all the timers and clear
if !enabled {
for _, exp := range t.expires {
exp.timer.Stop()
}
t.expires = make(map[time.Time]*expireInterval)
}
// Update the status
t.enabled = enabled
}
// Hint is used to indicate that keys at the given index have been
// deleted, and that their GC should be scheduled.
func (t *TombstoneGC) Hint(index uint64) {
expires := t.nextExpires()
t.lock.Lock()
defer t.lock.Unlock()
if !t.enabled {
return
}
// Check for an existing expiration timer
exp, ok := t.expires[expires]
if ok {
// Increment the highest index to be expired at that time
if index > exp.maxIndex {
exp.maxIndex = index
}
return
}
// Create new expiration time
t.expires[expires] = &expireInterval{
maxIndex: index,
timer: time.AfterFunc(expires.Sub(time.Now()), func() {
t.expireTime(expires)
}),
}
}
// PendingExpiration is used to check if any expirations are pending
func (t *TombstoneGC) PendingExpiration() bool {
t.lock.Lock()
defer t.lock.Unlock()
return len(t.expires) > 0
}
// nextExpires is used to calculate the next experation time
func (t *TombstoneGC) nextExpires() time.Time {
expires := time.Now().Add(t.ttl)
remain := expires.UnixNano() % int64(t.granularity)
adj := expires.Add(t.granularity - time.Duration(remain))
return adj
}
// expireTime is used to expire the entries at the given time
func (t *TombstoneGC) expireTime(expires time.Time) {
// Get the maximum index and clear the entry
t.lock.Lock()
exp := t.expires[expires]
delete(t.expires, expires)
t.lock.Unlock()
// Notify the expires channel
t.expireCh <- exp.maxIndex
}