-
Notifications
You must be signed in to change notification settings - Fork 246
/
cache.go
125 lines (98 loc) · 2.95 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package dedup
import (
"time"
"github.com/status-im/status-go/db"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
"golang.org/x/crypto/sha3"
)
// cache represents a cache of whisper messages with a limit of 2 days.
// the limit is counted from the time when the message was added to the cache.
type cache struct {
db *leveldb.DB
now func() time.Time
}
func newCache(db *leveldb.DB) *cache {
return &cache{db, time.Now}
}
func (d *cache) Has(filterID string, message *whisper.Message) (bool, error) {
has, err := d.db.Has(d.KeyToday(filterID, message), nil)
if err != nil {
return false, err
}
if has {
return true, nil
}
return d.db.Has(d.keyYesterday(filterID, message), nil)
}
func (d *cache) Put(filterID string, messages []*whisper.Message) error {
batch := leveldb.Batch{}
for _, msg := range messages {
batch.Put(d.KeyToday(filterID, msg), []byte{})
}
err := d.db.Write(&batch, nil)
if err != nil {
return err
}
return d.cleanOldEntries()
}
func (d *cache) PutIDs(messageIDs [][]byte) error {
batch := leveldb.Batch{}
for _, id := range messageIDs {
batch.Put(id, []byte{})
}
err := d.db.Write(&batch, nil)
if err != nil {
return err
}
return d.cleanOldEntries()
}
func (d *cache) cleanOldEntries() error {
// Cleaning up everything that is older than 2 days
// We are using the fact that leveldb can do prefix queries and that
// the entries are sorted by keys.
// Here, we are looking for all the keys that are between
// 00000000.* and <yesterday's date>.*
// e.g. (0000000.* -> 20180424.*)
limit := d.yesterdayDateString()
r := &util.Range{
Start: db.Key(db.DeduplicatorCache, []byte("00000000")),
Limit: db.Key(db.DeduplicatorCache, []byte(limit)),
}
batch := leveldb.Batch{}
iter := d.db.NewIterator(r, nil)
for iter.Next() {
batch.Delete(iter.Key())
}
iter.Release()
return d.db.Write(&batch, nil)
}
func (d *cache) keyYesterday(filterID string, message *whisper.Message) []byte {
return prefixedKey(d.yesterdayDateString(), filterID, message)
}
func (d *cache) KeyToday(filterID string, message *whisper.Message) []byte {
return prefixedKey(d.todayDateString(), filterID, message)
}
func (d *cache) todayDateString() string {
return dateString(d.now())
}
func (d *cache) yesterdayDateString() string {
now := d.now()
yesterday := now.Add(-24 * time.Hour)
return dateString(yesterday)
}
func dateString(t time.Time) string {
// Layouts must use the reference time Mon Jan 2 15:04:05 MST 2006
return t.Format("20060102")
}
func prefixedKey(date, filterID string, message *whisper.Message) []byte {
return db.Key(db.DeduplicatorCache, []byte(date), []byte(filterID), key(message))
}
func key(message *whisper.Message) []byte {
data := make([]byte, len(message.Payload)+len(message.Topic))
copy(data[:], message.Payload)
copy(data[len(message.Payload):], message.Topic[:])
digest := sha3.Sum512(data)
return digest[:]
}