-
Notifications
You must be signed in to change notification settings - Fork 2
/
dedupe.go
83 lines (72 loc) · 2.49 KB
/
dedupe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package dedup
import (
"hash/maphash"
"math/rand"
"sync"
"time"
)
// PacketDeduplicator is a structure that prevents processing of duplicate packets.
// It keeps a record of seen packets and the time they were last seen.
type PacketDeduplicator struct {
expiresAfter time.Duration // expiresAfter defines the duration after which a seen packet record expires.
seed maphash.Seed
sync.Mutex // protects the seen map from concurrent access.
seen map[uint64]time.Time // seen maps a packet identifier to the last time it was seen.
}
// NewDeduplicator creates a new PacketDeduplicator with a given hasher and expiration duration for packet records.
// It starts a background goroutine to periodically clean up expired packet records.
func NewDeduplicator(expiresAfter time.Duration) *PacketDeduplicator {
return &PacketDeduplicator{
seen: map[uint64]time.Time{},
seed: maphash.MakeSeed(),
expiresAfter: expiresAfter,
}
}
func (p *PacketDeduplicator) isSeen(now time.Time, k uint64) bool {
// Checking for dedup purges ~5% of the time
if rand.Intn(100) < 5 {
p.deleteUntil(now.Add(-p.expiresAfter))
}
p.Lock()
defer p.Unlock()
d, exists := p.seen[k]
if !exists {
p.seen[k] = now
return false
}
if now.Sub(d) > p.expiresAfter {
delete(p.seen, k)
return false
}
return true
}
// Seen checks whether a packet with the given sender and packetID has been seen before.
// If not, it records the packet as seen and returns false. Otherwise, it returns true.
func (p *PacketDeduplicator) Seen(sender, packetID uint32) bool {
return p.seenAt(time.Now(), sender, packetID)
}
// SeenData checks whether the data has been seen before based on its hashed value.
// If not, it records the data as seen and returns false. Otherwise, it returns true.
func (p *PacketDeduplicator) SeenData(data []byte) bool {
return p.seenDataAt(time.Now(), data)
}
//
// These are used internally and are test hooks allowing us to avoid the clock.
//
func (p *PacketDeduplicator) deleteUntil(t time.Time) {
p.Lock()
defer p.Unlock()
for packet, timestamp := range p.seen {
if timestamp.Before(t) {
delete(p.seen, packet)
}
}
}
// Seendata with an explicit time
func (p *PacketDeduplicator) seenDataAt(now time.Time, data []byte) bool {
return p.isSeen(now, maphash.Bytes(p.seed, data))
}
// Seen with an explicit time
func (p *PacketDeduplicator) seenAt(now time.Time, sender uint32, packetID uint32) bool {
return p.isSeen(now, (uint64(sender)<<32)|uint64(packetID))
}