/
warpsync.go
371 lines (315 loc) · 13.1 KB
/
warpsync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
package gossip
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/syncutils"
"github.com/iotaledger/hornet/pkg/common"
"github.com/iotaledger/hornet/pkg/dag"
"github.com/iotaledger/hornet/pkg/model/hornet"
"github.com/iotaledger/hornet/pkg/model/milestone"
"github.com/iotaledger/hornet/pkg/model/storage"
"github.com/iotaledger/hornet/pkg/model/syncmanager"
)
// NewWarpSync creates a new WarpSync instance with the given advancement range and criteria func.
// If no advancement func is provided, the WarpSync uses AdvanceAtPercentageReached with DefaultAdvancementThreshold.
func NewWarpSync(advRange int, advanceCheckpointCriteriaFunc ...AdvanceCheckpointCriteria) *WarpSync {
ws := &WarpSync{
AdvancementRange: advRange,
Events: &Events{
CheckpointUpdated: events.NewEvent(CheckpointCaller),
TargetUpdated: events.NewEvent(TargetCaller),
Start: events.NewEvent(SyncStartCaller),
Done: events.NewEvent(SyncDoneCaller),
},
}
if len(advanceCheckpointCriteriaFunc) > 0 {
ws.advCheckpointCriteria = advanceCheckpointCriteriaFunc[0]
} else {
ws.advCheckpointCriteria = AdvanceAtPercentageReached(DefaultAdvancementThreshold)
}
return ws
}
func SyncStartCaller(handler interface{}, params ...interface{}) {
handler.(func(target milestone.Index, newCheckpoint milestone.Index, msRange int32))(params[0].(milestone.Index), params[1].(milestone.Index), params[2].(int32))
}
func SyncDoneCaller(handler interface{}, params ...interface{}) {
handler.(func(delta int, referencedMessagesTotal int, dur time.Duration))(params[0].(int), params[1].(int), params[2].(time.Duration))
}
func CheckpointCaller(handler interface{}, params ...interface{}) {
handler.(func(newCheckpoint milestone.Index, oldCheckpoint milestone.Index, msRange int32, target milestone.Index))(params[0].(milestone.Index), params[1].(milestone.Index), params[2].(int32), params[3].(milestone.Index))
}
func TargetCaller(handler interface{}, params ...interface{}) {
handler.(func(checkpoint milestone.Index, target milestone.Index))(params[0].(milestone.Index), params[1].(milestone.Index))
}
// Events holds WarpSync related events.
type Events struct {
// Fired when a new set of milestones should be requested.
CheckpointUpdated *events.Event
// Fired when the target milestone is updated.
TargetUpdated *events.Event
// Fired when warp synchronization starts.
Start *events.Event
// Fired when the warp synchronization is done.
Done *events.Event
}
// AdvanceCheckpointCriteria is a function which determines whether the checkpoint should be advanced.
type AdvanceCheckpointCriteria func(currentConfirmed, previousCheckpoint, currentCheckpoint milestone.Index) bool
// DefaultAdvancementThreshold is the default threshold at which a checkpoint advancement is done.
// Per default an advancement is always done as soon the confirmed milestone enters the range between
// the previous and current checkpoint.
const DefaultAdvancementThreshold = 0.0
// AdvanceAtPercentageReached is an AdvanceCheckpointCriteria which advances the checkpoint
// when the current one was reached by >=X% by the current confirmed milestone in relation to the previous checkpoint.
func AdvanceAtPercentageReached(threshold float64) AdvanceCheckpointCriteria {
return func(currentConfirmed, previousCheckpoint, currentCheckpoint milestone.Index) bool {
// the previous checkpoint can be over the current confirmed milestone,
// as advancements move the checkpoint window above the confirmed milestone
if currentConfirmed < previousCheckpoint {
return false
}
checkpointDelta := currentCheckpoint - previousCheckpoint
progress := currentConfirmed - previousCheckpoint
return float64(progress)/float64(checkpointDelta) >= threshold
}
}
// WarpSync is metadata about doing a synchronization via STING messages.
type WarpSync struct {
sync.Mutex
// The used advancement range per checkpoint.
AdvancementRange int
// The Events of the warpsync.
Events *Events
// The criteria whether to advance to the next checkpoint.
advCheckpointCriteria AdvanceCheckpointCriteria
// The current confirmed milestone of the node.
CurrentConfirmedMilestone milestone.Index
// The starting time of the synchronization.
StartTime time.Time
// The starting point of the synchronization.
InitMilestone milestone.Index
// The target milestone to which to synchronize to.
TargetMilestone milestone.Index
// The previous checkpoint of the synchronization.
PreviousCheckpoint milestone.Index
// The current checkpoint of the synchronization.
CurrentCheckpoint milestone.Index
// The amount of referenced messages during this warpsync run.
referencedMessagesTotal int
}
// UpdateCurrentConfirmedMilestone updates the current confirmed milestone index state.
func (ws *WarpSync) UpdateCurrentConfirmedMilestone(current milestone.Index) {
ws.Lock()
defer ws.Unlock()
if current <= ws.CurrentConfirmedMilestone {
return
}
ws.CurrentConfirmedMilestone = current
// synchronization not started
if ws.CurrentCheckpoint == 0 {
return
}
// finished
if ws.TargetMilestone != 0 && ws.CurrentConfirmedMilestone >= ws.TargetMilestone {
ws.Events.Done.Trigger(int(ws.TargetMilestone-ws.InitMilestone), ws.referencedMessagesTotal, time.Since(ws.StartTime))
ws.reset()
return
}
// check whether advancement criteria is fulfilled
if !ws.advCheckpointCriteria(ws.CurrentConfirmedMilestone, ws.PreviousCheckpoint, ws.CurrentCheckpoint) {
return
}
oldCheckpoint := ws.CurrentCheckpoint
if msRange := ws.advanceCheckpoint(); msRange != 0 {
ws.Events.CheckpointUpdated.Trigger(ws.CurrentCheckpoint, oldCheckpoint, msRange, ws.TargetMilestone)
}
}
// UpdateTargetMilestone updates the synchronization target if it is higher than the current one and
// triggers a synchronization start if the target was set for the first time.
func (ws *WarpSync) UpdateTargetMilestone(target milestone.Index) {
ws.Lock()
defer ws.Unlock()
if target <= ws.TargetMilestone {
return
}
ws.TargetMilestone = target
// as a special case, while we are warp syncing and within the last checkpoint range,
// new target milestones need to shift the checkpoint to the new target, in order
// to fire an 'updated checkpoint event'/respectively updating the request queue filter.
// since we will request missing parents for the new target, it will still solidify
// even though we discarded requests for a short period of time parents when the
// request filter wasn't yet updated.
if ws.CurrentCheckpoint != 0 && ws.CurrentCheckpoint+milestone.Index(ws.AdvancementRange) > ws.TargetMilestone {
oldCheckpoint := ws.CurrentCheckpoint
reqRange := ws.TargetMilestone - ws.CurrentCheckpoint
ws.CurrentCheckpoint = ws.TargetMilestone
ws.Events.CheckpointUpdated.Trigger(ws.CurrentCheckpoint, oldCheckpoint, int32(reqRange), ws.TargetMilestone)
}
if ws.CurrentCheckpoint != 0 {
// if synchronization was already started, only update the target
ws.Events.TargetUpdated.Trigger(ws.CurrentCheckpoint, ws.TargetMilestone)
return
}
// do not start the synchronization if current confirmed is newer than the target or the delta is smaller than 2
if ws.CurrentConfirmedMilestone >= ws.TargetMilestone || target-ws.CurrentConfirmedMilestone < 2 {
return
}
// start the synchronization
ws.StartTime = time.Now()
ws.InitMilestone = ws.CurrentConfirmedMilestone
ws.PreviousCheckpoint = ws.CurrentConfirmedMilestone
advancementRange := ws.advanceCheckpoint()
ws.Events.Start.Trigger(ws.TargetMilestone, ws.CurrentCheckpoint, advancementRange)
}
// AddReferencedMessagesCount adds the amount of referenced messages to collect stats.
func (ws *WarpSync) AddReferencedMessagesCount(referencedMessagesCount int) {
ws.Lock()
defer ws.Unlock()
ws.referencedMessagesTotal += referencedMessagesCount
}
// advances the next checkpoint by either incrementing from the current
// via the checkpoint range or max to the target of the synchronization.
// returns the chosen range.
func (ws *WarpSync) advanceCheckpoint() int32 {
if ws.CurrentCheckpoint != 0 {
ws.PreviousCheckpoint = ws.CurrentCheckpoint
}
advRange := milestone.Index(ws.AdvancementRange)
// make sure we advance max to the target milestone
if ws.TargetMilestone-ws.CurrentConfirmedMilestone <= advRange || ws.TargetMilestone-ws.CurrentCheckpoint <= advRange {
deltaRange := ws.TargetMilestone - ws.CurrentCheckpoint
if deltaRange > ws.TargetMilestone-ws.CurrentConfirmedMilestone {
deltaRange = ws.TargetMilestone - ws.CurrentConfirmedMilestone
}
ws.CurrentCheckpoint = ws.TargetMilestone
return int32(deltaRange)
}
// at start simply advance from the current confirmed
if ws.CurrentCheckpoint == 0 {
ws.CurrentCheckpoint = ws.CurrentConfirmedMilestone + advRange
return int32(advRange)
}
ws.CurrentCheckpoint = ws.CurrentCheckpoint + advRange
return int32(advRange)
}
// resets the warp sync.
func (ws *WarpSync) reset() {
ws.StartTime = time.Time{}
ws.InitMilestone = 0
ws.TargetMilestone = 0
ws.PreviousCheckpoint = 0
ws.CurrentCheckpoint = 0
ws.referencedMessagesTotal = 0
}
// WarpSyncMilestoneRequester walks the cones of existing but non-solid milestones and memoizes already walked messages and milestones.
type WarpSyncMilestoneRequester struct {
syncutils.Mutex
// used to cancel the warp sync requester.
ctx context.Context
// used to access the node storage.
storage *storage.Storage
// used to determine the sync status of the node.
syncManager *syncmanager.SyncManager
// used to request messages from peers.
requester *Requester
// do not remove requests if the enqueue time is over the given threshold.
preventDiscard bool
// map of already traversed messages to to prevent traversing the same cones multiple times.
traversed map[string]struct{}
}
// NewWarpSyncMilestoneRequester creates a new WarpSyncMilestoneRequester instance.
func NewWarpSyncMilestoneRequester(
ctx context.Context,
dbStorage *storage.Storage,
syncManager *syncmanager.SyncManager,
requester *Requester,
preventDiscard bool) *WarpSyncMilestoneRequester {
return &WarpSyncMilestoneRequester{
ctx: ctx,
storage: dbStorage,
syncManager: syncManager,
requester: requester,
preventDiscard: preventDiscard,
traversed: make(map[string]struct{}),
}
}
// requestMissingMilestoneParents traverses the parents of a given milestone and requests each missing parent.
// Already requested milestones or traversed messages will be ignored, to circumvent requesting
// the same parents multiple times.
func (w *WarpSyncMilestoneRequester) requestMissingMilestoneParents(msIndex milestone.Index, milestoneMessageID hornet.MessageID) error {
if msIndex <= w.syncManager.ConfirmedMilestoneIndex() {
return nil
}
return dag.TraverseParentsOfMessage(
w.ctx,
w.storage,
milestoneMessageID,
// traversal stops if no more messages pass the given condition
// Caution: condition func is not in DFS order
func(cachedMsgMeta *storage.CachedMetadata) (bool, error) { // meta +1
defer cachedMsgMeta.Release(true) // meta -1
mapKey := cachedMsgMeta.Metadata().MessageID().ToMapKey()
if _, previouslyTraversed := w.traversed[mapKey]; previouslyTraversed {
return false, nil
}
w.traversed[mapKey] = struct{}{}
if cachedMsgMeta.Metadata().IsSolid() {
return false, nil
}
return true, nil
},
// consumer
nil,
// called on missing parents
func(parentMessageID hornet.MessageID) error {
w.requester.Request(parentMessageID, msIndex, w.preventDiscard)
return nil
},
// called on solid entry points
// Ignore solid entry points (snapshot milestone included)
nil,
false)
}
// Cleanup cleans up traversed messages to free memory.
func (w *WarpSyncMilestoneRequester) Cleanup() {
w.Lock()
defer w.Unlock()
w.traversed = make(map[string]struct{})
}
// RequestMilestoneRange requests up to N milestones nearest to the current confirmed milestone index.
// Returns the number of milestones requested.
func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(rangeToRequest int, from ...milestone.Index) (int, milestone.Index, milestone.Index) {
w.Lock()
defer w.Unlock()
var requested int
startingPoint := w.syncManager.ConfirmedMilestoneIndex()
if len(from) > 0 {
startingPoint = from[0]
}
startIndex := startingPoint + 1
endIndex := startingPoint + milestone.Index(rangeToRequest)
var msIndexes []milestone.Index
for i := milestone.Index(1); i <= milestone.Index(rangeToRequest); i++ {
msIndexToRequest := startingPoint + i
cachedMilestone := w.storage.CachedMilestoneOrNil(msIndexToRequest) // milestone +1
if cachedMilestone == nil {
// only request if we do not have the milestone
requested++
msIndexes = append(msIndexes, msIndexToRequest)
continue
}
cachedMilestone.Release(true) // milestone -1
// milestone already exists
if err := w.requestMissingMilestoneParents(msIndexToRequest, cachedMilestone.Milestone().MessageID); err != nil && errors.Is(err, common.ErrOperationAborted) {
// do not proceed if the node was shut down
return 0, 0, 0
}
}
// enqueue every milestone request to the request queue
for _, msIndex := range msIndexes {
w.requester.Request(msIndex, msIndex)
}
return requested, startIndex, endIndex
}