-
Notifications
You must be signed in to change notification settings - Fork 12
/
warp_sync.go
321 lines (258 loc) · 11.8 KB
/
warp_sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
package protocol
import (
"sync/atomic"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/iotaledger/hive.go/ads"
"github.com/iotaledger/hive.go/core/eventticker"
"github.com/iotaledger/hive.go/ds"
"github.com/iotaledger/hive.go/ds/reactive"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/hive.go/log"
"github.com/iotaledger/hive.go/runtime/workerpool"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
iotago "github.com/iotaledger/iota.go/v4"
"github.com/iotaledger/iota.go/v4/merklehasher"
)
// WarpSync is a subcomponent of the protocol that is responsible for handling warp sync requests and responses.
type WarpSync struct {
// protocol contains a reference to the Protocol instance that this component belongs to.
protocol *Protocol
// workerPool contains the worker pool that is used to process warp sync requests and responses asynchronously.
workerPool *workerpool.WorkerPool
// ticker contains the ticker that is used to send warp sync requests.
ticker *eventticker.EventTicker[iotago.SlotIndex, iotago.CommitmentID]
// Logger embeds a logger that can be used to log messages emitted by this chain.
log.Logger
}
// newWarpSync creates a new warp sync protocol instance for the given protocol.
func newWarpSync(protocol *Protocol) *WarpSync {
c := &WarpSync{
Logger: protocol.NewChildLogger("WarpSync"),
protocol: protocol,
workerPool: protocol.Workers.CreatePool("WarpSync", workerpool.WithWorkerCount(1)),
ticker: eventticker.New[iotago.SlotIndex, iotago.CommitmentID](protocol.Options.WarpSyncRequesterOptions...),
}
c.ticker.Events.Tick.Hook(c.SendRequest)
protocol.ConstructedEvent().OnTrigger(func() {
protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) {
return chain.WarpSyncMode.OnUpdate(func(_ bool, warpSyncModeEnabled bool) {
if warpSyncModeEnabled {
// We need to wait for all workers of the engine to finish and reset in a separate worker,
// since otherwise we're locking downstream (c.LatestSyncedSlot, c.chains.LatestSeenSlot, c.OutOfSyncThreshold of the chain).
// Which in turn can lead to a deadlock where the engine can't update the LatestSyncedSlot.
// By running it in the warpsync's single worker we also make sure that the engine is reset before
// actually warp syncing/processing new slots.
c.workerPool.Submit(func() {
engine.Workers.WaitChildren()
engine.Reset()
})
}
})
})
protocol.Commitments.WithElements(func(commitment *Commitment) (shutdown func()) {
return commitment.WarpSyncBlocks.OnUpdate(func(_ bool, warpSyncBlocks bool) {
if warpSyncBlocks {
c.ticker.StartTicker(commitment.ID())
} else {
c.ticker.StopTicker(commitment.ID())
}
})
})
})
return c
}
// SendRequest sends a warp sync request for the given commitment ID to all peers.
func (w *WarpSync) SendRequest(commitmentID iotago.CommitmentID) {
w.workerPool.Submit(func() {
w.protocol.Network.SendWarpSyncRequest(commitmentID)
w.LogDebug("request", "commitmentID", commitmentID)
})
}
// SendResponse sends a warp sync response for the given commitment ID to the given peer.
func (w *WarpSync) SendResponse(commitment *Commitment, blockIDsBySlotCommitment map[iotago.CommitmentID]iotago.BlockIDs, roots *iotago.Roots, transactionIDs iotago.TransactionIDs, to peer.ID) {
w.workerPool.Submit(func() {
w.protocol.Network.SendWarpSyncResponse(commitment.ID(), blockIDsBySlotCommitment, roots.TangleProof(), transactionIDs, roots.MutationProof(), to)
w.LogTrace("sent response", "commitment", commitment.LogName(), "toPeer", to)
})
}
// ProcessResponse processes the given warp sync response.
func (w *WarpSync) ProcessResponse(commitmentID iotago.CommitmentID, blockIDsBySlotCommitment map[iotago.CommitmentID]iotago.BlockIDs, proof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationProof *merklehasher.Proof[iotago.Identifier], from peer.ID) {
w.workerPool.Submit(func() {
commitment, err := w.protocol.Commitments.Get(commitmentID)
if err != nil {
if !ierrors.Is(err, ErrorCommitmentNotFound) {
w.LogError("failed to load commitment for response", "commitmentID", commitmentID, "fromPeer", from, "err", err)
} else {
w.LogTrace("failed to load commitment for response", "commitmentID", commitmentID, "fromPeer", from, "err", err)
}
return
}
chain := commitment.Chain.Get()
if chain == nil {
w.LogTrace("failed to get chain for response", "commitment", commitment.LogName(), "fromPeer", from)
return
}
if !chain.WarpSyncMode.Get() {
w.LogTrace("response for chain without warp-sync", "chain", chain.LogName(), "fromPeer", from)
return
}
targetEngine := commitment.TargetEngine()
if targetEngine == nil {
w.LogDebug("failed to get target engine for response", "commitment", commitment.LogName())
return
}
commitment.BlocksToWarpSync.Compute(func(blocksToWarpSync ds.Set[iotago.BlockID]) ds.Set[iotago.BlockID] {
if blocksToWarpSync != nil || !commitment.WarpSyncBlocks.Get() {
w.LogTrace("response for already synced commitment", "commitment", commitment.LogName(), "fromPeer", from)
return blocksToWarpSync
}
totalBlocks := uint32(0)
acceptedBlocks := ads.NewSet[iotago.Identifier](mapdb.NewMapDB(), iotago.Identifier.Bytes, iotago.IdentifierFromBytes, iotago.BlockID.Bytes, iotago.BlockIDFromBytes)
for _, blockIDs := range blockIDsBySlotCommitment {
for _, blockID := range blockIDs {
_ = acceptedBlocks.Add(blockID) // a mapdb can newer return an error
totalBlocks++
}
}
if !iotago.VerifyProof(proof, acceptedBlocks.Root(), commitment.RootsID()) {
w.LogError("failed to verify blocks proof", "commitment", commitment.LogName(), "blockIDs", blockIDsBySlotCommitment, "proof", proof, "fromPeer", from)
return blocksToWarpSync
}
acceptedTransactionIDs := ads.NewSet[iotago.Identifier](mapdb.NewMapDB(), iotago.Identifier.Bytes, iotago.IdentifierFromBytes, iotago.TransactionID.Bytes, iotago.TransactionIDFromBytes)
for _, transactionID := range transactionIDs {
_ = acceptedTransactionIDs.Add(transactionID) // a mapdb can never return an error
}
if !iotago.VerifyProof(mutationProof, acceptedTransactionIDs.Root(), commitment.RootsID()) {
w.LogError("failed to verify mutations proof", "commitment", commitment.ID(), commitment.Commitment.Commitment().String(), "acceptedTransactionIDsRoot", acceptedTransactionIDs.Root(), "transactionIDs len()", len(transactionIDs), "proof", mutationProof, "fromPeer", from)
return blocksToWarpSync
}
w.ticker.StopTicker(commitmentID)
targetEngine.Workers.WaitChildren()
if !chain.WarpSyncMode.Get() {
w.LogTrace("response for chain without warp-sync", "chain", chain.LogName(), "fromPeer", from)
return blocksToWarpSync
}
// Once all blocks are booked we
// 1. Mark all transactions as accepted
// 2. Mark all blocks as accepted
// 3. Force commitment of the slot
commitmentFunc := func() {
if !chain.WarpSyncMode.Get() {
return
}
// 0. Prepare data flow
var (
notarizedBlocksCount uint64
allBlocksNotarized = reactive.NewEvent()
)
// 1. Mark all transactions as accepted
for _, transactionID := range transactionIDs {
targetEngine.Ledger.SpendDAG().SetAccepted(transactionID)
}
// 2. Mark all blocks as accepted and wait for them to be notarized
if totalBlocks == 0 {
allBlocksNotarized.Trigger()
} else {
for _, blockIDs := range blockIDsBySlotCommitment {
for _, blockID := range blockIDs {
block, exists := targetEngine.BlockCache.Block(blockID)
if !exists { // this should never happen as we just booked these blocks in this slot.
continue
}
targetEngine.BlockGadget.SetAccepted(block)
block.Notarized().OnTrigger(func() {
if atomic.AddUint64(¬arizedBlocksCount, 1) == uint64(totalBlocks) {
allBlocksNotarized.Trigger()
}
})
}
}
}
allBlocksNotarized.OnTrigger(func() {
// This needs to happen in a separate worker since the trigger for block notarized while the lock in
// the notarization is still held.
w.workerPool.Submit(func() {
// 3. Force commitment of the slot
producedCommitment, err := targetEngine.Notarization.ForceCommit(commitmentID.Slot())
if err != nil {
w.protocol.LogError("failed to force commitment", "commitmentID", commitmentID, "err", err)
return
}
// 4. Verify that the produced commitment is the same as the initially requested one
if producedCommitment.ID() != commitmentID {
w.protocol.LogError("commitment does not match", "expectedCommitmentID", commitmentID, "producedCommitmentID", producedCommitment.ID())
return
}
})
})
}
// Once all blocks are fully booked we can mark the commitment that is minCommittableAge older as this
// commitment to be committable.
commitment.IsSynced.OnUpdateOnce(func(_ bool, _ bool) {
// update the flag in a worker since it can potentially cause a commit
w.workerPool.Submit(func() {
// we add +1 here to enable syncing of chains of empty commitments since we can assume that there is
// at least 1 additional slot building on top of the synced commitment as it would have otherwise
// not turned into a commitment in the first place.
if committableCommitment, exists := chain.Commitment(commitmentID.Slot() - targetEngine.LatestAPI().ProtocolParameters().MinCommittableAge() + 1); exists {
committableCommitment.IsCommittable.Set(true)
}
})
})
// force commit one by one and wait for the parent to be verified before we commit the next one
commitment.Parent.WithNonEmptyValue(func(parent *Commitment) (teardown func()) {
return parent.IsVerified.WithNonEmptyValue(func(_ bool) (teardown func()) {
return commitment.IsCommittable.OnTrigger(commitmentFunc)
})
})
if totalBlocks == 0 {
// mark empty slots as committable and synced
commitment.IsCommittable.Set(true)
commitment.IsSynced.Set(true)
return blocksToWarpSync
}
var bookedBlocks atomic.Uint32
blocksToWarpSync = ds.NewSet[iotago.BlockID]()
for _, blockIDs := range blockIDsBySlotCommitment {
for _, blockID := range blockIDs {
blocksToWarpSync.Add(blockID)
block, _ := targetEngine.BlockDAG.GetOrRequestBlock(blockID)
if block == nil {
w.protocol.LogError("failed to request block", "blockID", blockID)
continue
}
// We need to make sure that all blocks are fully booked and their weight propagated before we can
// move the window forward. This is in order to ensure that confirmation and finalization is correctly propagated.
block.WeightPropagated().OnUpdate(func(_ bool, _ bool) {
if bookedBlocks.Add(1) != totalBlocks {
return
}
commitment.IsSynced.Set(true)
})
}
}
w.LogDebug("received response", "commitment", commitment.LogName())
return blocksToWarpSync
})
})
}
// ProcessRequest processes the given warp sync request.
func (w *WarpSync) ProcessRequest(commitmentID iotago.CommitmentID, from peer.ID) {
loggedWorkerPoolTask(w.workerPool, func() (err error) {
commitmentAPI, err := w.protocol.Commitments.API(commitmentID)
if err != nil {
return ierrors.Wrap(err, "failed to load slot api")
}
blocks, blocksProof, transactionIDs, transactionIDsProof, err := commitmentAPI.Mutations()
if err != nil {
return ierrors.Wrap(err, "failed to get mutations")
}
w.protocol.Network.SendWarpSyncResponse(commitmentID, blocks, blocksProof, transactionIDs, transactionIDsProof, from)
return nil
}, w, "commitmentID", commitmentID, "fromPeer", from)
}
// Shutdown shuts down the warp sync protocol.
func (w *WarpSync) Shutdown() {
w.ticker.Shutdown()
}