-
Notifications
You must be signed in to change notification settings - Fork 246
/
history.go
224 lines (194 loc) · 5.72 KB
/
history.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package db
import (
"encoding/binary"
"encoding/json"
"errors"
"time"
"github.com/syndtr/goleveldb/leveldb/util"
"github.com/status-im/status-go/eth-node/types"
)
var (
// ErrEmptyKey returned if key is not expected to be empty.
ErrEmptyKey = errors.New("TopicHistoryKey is empty")
)
// DB is a common interface for DB operations.
type DB interface {
Get([]byte) ([]byte, error)
Put([]byte, []byte) error
Delete([]byte) error
Range([]byte, []byte) *util.Range
NewIterator(*util.Range) NamespaceIterator
}
// TopicHistoryKey defines bytes that are used as unique key for TopicHistory.
// first 4 bytes are types.TopicType bytes
// next 8 bytes are time.Duration encoded in big endian notation.
type TopicHistoryKey [12]byte
// LoadTopicHistoryFromKey unmarshalls key into topic and duration and loads value of topic history
// from given database.
func LoadTopicHistoryFromKey(db DB, key TopicHistoryKey) (th TopicHistory, err error) {
if (key == TopicHistoryKey{}) {
return th, ErrEmptyKey
}
topic := types.TopicType{}
copy(topic[:], key[:4])
duration := binary.BigEndian.Uint64(key[4:])
th = TopicHistory{db: db, Topic: topic, Duration: time.Duration(duration)}
return th, th.Load()
}
// TopicHistory stores necessary information.
type TopicHistory struct {
db DB
// whisper topic
Topic types.TopicType
Duration time.Duration
// Timestamp that was used for the first request with this topic.
// Used to identify overlapping ranges.
First time.Time
// Timestamp of the last synced envelope.
Current time.Time
End time.Time
RequestID types.Hash
}
// Key returns unique identifier for this TopicHistory.
func (t TopicHistory) Key() TopicHistoryKey {
key := TopicHistoryKey{}
copy(key[:], t.Topic[:])
binary.BigEndian.PutUint64(key[4:], uint64(t.Duration))
return key
}
// Value marshalls TopicHistory into bytes.
func (t TopicHistory) Value() ([]byte, error) {
return json.Marshal(t)
}
// Load TopicHistory from db using key and unmarshalls it.
func (t *TopicHistory) Load() error {
key := t.Key()
if (key == TopicHistoryKey{}) {
return errors.New("key is empty")
}
value, err := t.db.Get(key[:])
if err != nil {
return err
}
return json.Unmarshal(value, t)
}
// Save persists TopicHistory on disk.
func (t TopicHistory) Save() error {
key := t.Key()
val, err := t.Value()
if err != nil {
return err
}
return t.db.Put(key[:], val)
}
// Delete removes topic history from database.
func (t TopicHistory) Delete() error {
key := t.Key()
return t.db.Delete(key[:])
}
// SameRange returns true if topic has same range, which means:
// true if Current is zero and Duration is the same
// and true if Current is the same
func (t TopicHistory) SameRange(other TopicHistory) bool {
zero := time.Time{}
if t.Current == zero && other.Current == zero {
return t.Duration == other.Duration
}
return t.Current == other.Current
}
// Pending returns true if this topic was requested from a mail server.
func (t TopicHistory) Pending() bool {
return t.RequestID != types.Hash{}
}
// HistoryRequest is kept in the database while request is in the progress.
// Stores necessary information to identify topics with associated ranges included in the request.
type HistoryRequest struct {
requestDB DB
topicDB DB
histories []TopicHistory
// Generated ID
ID types.Hash
// List of the topics
TopicHistoryKeys []TopicHistoryKey
}
// AddHistory adds instance to internal list of instance and add instance key to the list
// which will be persisted on disk.
func (req *HistoryRequest) AddHistory(history TopicHistory) {
req.histories = append(req.histories, history)
req.TopicHistoryKeys = append(req.TopicHistoryKeys, history.Key())
}
// Histories returns internal lsit of topic histories.
func (req *HistoryRequest) Histories() []TopicHistory {
// TODO Lazy load from database on first access
return req.histories
}
// Value returns content of HistoryRequest as bytes.
func (req HistoryRequest) Value() ([]byte, error) {
return json.Marshal(req)
}
// Save persists all attached histories and request itself on the disk.
func (req HistoryRequest) Save() error {
for i := range req.histories {
th := &req.histories[i]
th.RequestID = req.ID
if err := th.Save(); err != nil {
return err
}
}
val, err := req.Value()
if err != nil {
return err
}
return req.requestDB.Put(req.ID.Bytes(), val)
}
// Replace saves request with new ID and all data attached to the old one.
func (req HistoryRequest) Replace(id types.Hash) error {
if (req.ID != types.Hash{}) {
if err := req.Delete(); err != nil {
return err
}
}
req.ID = id
return req.Save()
}
// Delete HistoryRequest from store and update every topic.
func (req HistoryRequest) Delete() error {
return req.requestDB.Delete(req.ID.Bytes())
}
// Load reads request and topic histories content from disk and unmarshalls them.
func (req *HistoryRequest) Load() error {
val, err := req.requestDB.Get(req.ID.Bytes())
if err != nil {
return err
}
return req.RawUnmarshall(val)
}
func (req *HistoryRequest) loadHistories() error {
for _, hk := range req.TopicHistoryKeys {
th, err := LoadTopicHistoryFromKey(req.topicDB, hk)
if err != nil {
return err
}
req.histories = append(req.histories, th)
}
return nil
}
// RawUnmarshall unmarshall given bytes into the structure.
// Used in range queries to unmarshall content of the iter.Value directly into request struct.
func (req *HistoryRequest) RawUnmarshall(val []byte) error {
err := json.Unmarshal(val, req)
if err != nil {
return err
}
return req.loadHistories()
}
// Includes checks if TopicHistory is included into the request.
func (req *HistoryRequest) Includes(history TopicHistory) bool {
key := history.Key()
for i := range req.TopicHistoryKeys {
if key == req.TopicHistoryKeys[i] {
return true
}
}
return false
}