-
Notifications
You must be signed in to change notification settings - Fork 12
/
chain.go
373 lines (302 loc) · 14.8 KB
/
chain.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
package protocol
import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/iotaledger/hive.go/ds/reactive"
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/log"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
iotago "github.com/iotaledger/iota.go/v4"
)
// Chain represents a chain of commitments.
type Chain struct {
// ForkingPoint contains the first commitment of this chain.
ForkingPoint reactive.Variable[*Commitment]
// ParentChain contains the chain that this chain forked from.
ParentChain reactive.Variable[*Chain]
// ChildChains contains the set of all chains that forked from this chain.
ChildChains reactive.Set[*Chain]
// LatestCommitment contains the latest commitment of this chain.
LatestCommitment reactive.Variable[*Commitment]
// LatestAttestedCommitment contains the latest commitment of this chain for which attestations were received.
LatestAttestedCommitment reactive.Variable[*Commitment]
// LatestProducedCommitment contains the latest commitment of this chain that we produced ourselves by booking the
// corresponding blocks in the Engine.
LatestProducedCommitment reactive.Variable[*Commitment]
// ClaimedWeight contains the claimed weight of this chain which is derived from the cumulative weight of the
// LatestCommitment.
ClaimedWeight reactive.Variable[uint64]
// AttestedWeight contains the attested weight of this chain which is derived from the cumulative weight of all
// attestations up to the LatestAttestedCommitment.
AttestedWeight reactive.Variable[uint64]
// VerifiedWeight contains the verified weight of this chain which is derived from the cumulative weight of the
// latest verified commitment.
VerifiedWeight reactive.Variable[uint64]
// WarpSyncMode contains a flag that indicates whether this chain is in warp sync mode.
WarpSyncMode reactive.Variable[bool]
// LatestSyncedSlot contains the latest commitment of this chain for which all blocks were booked.
LatestSyncedSlot reactive.Variable[iotago.SlotIndex]
// OutOfSyncThreshold contains the slot at which the chain will consider itself to be out of sync and switch to warp
// sync mode. It is derived from the latest network slot minus two times the max committable age.
OutOfSyncThreshold reactive.Variable[iotago.SlotIndex]
// RequestAttestations contains a flag that indicates whether this chain should verify the claimed weight by
// requesting attestations.
RequestAttestations reactive.Variable[bool]
// StartEngine contains a flag that indicates whether this chain should verify the state by processing blocks in an
// engine.
StartEngine reactive.Variable[bool]
// Engine contains the engine instance that is used to process blocks for this chain.
Engine reactive.Variable[*engine.Engine]
// IsEvicted contains a flag that indicates whether this chain was evicted.
IsEvicted reactive.Event
// chains contains a reference to the Chains instance that this chain belongs to.
chains *Chains
// commitments contains the commitments that make up this chain.
commitments *shrinkingmap.ShrinkingMap[iotago.SlotIndex, *Commitment]
// Logger embeds a logger that can be used to log messages emitted by this chain.
log.Logger
}
// newChain creates a new chain instance.
func newChain(chains *Chains) *Chain {
c := &Chain{
ForkingPoint: reactive.NewVariable[*Commitment](),
ParentChain: reactive.NewVariable[*Chain](),
ChildChains: reactive.NewSet[*Chain](),
LatestCommitment: reactive.NewVariable[*Commitment](),
LatestAttestedCommitment: reactive.NewVariable[*Commitment](),
LatestProducedCommitment: reactive.NewVariable[*Commitment](),
ClaimedWeight: reactive.NewVariable[uint64](),
AttestedWeight: reactive.NewVariable[uint64](),
VerifiedWeight: reactive.NewVariable[uint64](),
WarpSyncMode: reactive.NewVariable[bool]().Init(true),
LatestSyncedSlot: reactive.NewVariable[iotago.SlotIndex](),
OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
RequestAttestations: reactive.NewVariable[bool](),
StartEngine: reactive.NewVariable[bool](),
Engine: reactive.NewVariable[*engine.Engine](),
IsEvicted: reactive.NewEvent(),
chains: chains,
commitments: shrinkingmap.New[iotago.SlotIndex, *Commitment](),
}
shutdown := lo.Batch(
c.initLogger(),
c.initDerivedProperties(),
)
c.IsEvicted.OnTrigger(shutdown)
return c
}
// WithInitializedEngine is a reactive selector that executes the given callback once an Engine for this chain was
// initialized.
func (c *Chain) WithInitializedEngine(callback func(engineInstance *engine.Engine) (shutdown func())) (shutdown func()) {
return c.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) {
return engineInstance.Initialized.WithNonEmptyValue(func(_ bool) (shutdown func()) {
return callback(engineInstance)
})
})
}
// LastCommonSlot returns the slot of the last commitment that is common to this chain and its parent chain.
func (c *Chain) LastCommonSlot() iotago.SlotIndex {
if forkingPoint := c.ForkingPoint.Get(); forkingPoint != nil {
if isRoot := forkingPoint.IsRoot.Get(); isRoot {
return forkingPoint.Slot()
}
return forkingPoint.Slot() - 1
}
panic("chain has no forking point")
}
// DispatchBlock dispatches the given block to the chain and its children (it is allowed to call this method on a nil
// receiver, in which case it will be a no-op with a return value of false).
func (c *Chain) DispatchBlock(block *model.Block, src peer.ID) (dispatched bool) {
if c == nil {
return false
}
dispatched = c.dispatchBlockToSpawnedEngine(block, src)
for _, childChain := range c.ChildChains.ToSlice() {
dispatched = childChain.DispatchBlock(block, src) || dispatched
}
return dispatched
}
// Commitment returns the Commitment for the given slot from the perspective of this chain.
func (c *Chain) Commitment(slot iotago.SlotIndex) (commitment *Commitment, exists bool) {
for currentChain := c; currentChain != nil; {
switch forkingPoint := currentChain.ForkingPoint.Get(); {
case forkingPoint.Slot() == slot:
return forkingPoint, true
case slot > forkingPoint.Slot():
return currentChain.commitments.Get(slot)
default:
currentChain = c.ParentChain.Get()
}
}
return nil, false
}
// LatestEngine returns the latest engine instance that was spawned by the chain itself or one of its ancestors.
func (c *Chain) LatestEngine() *engine.Engine {
currentChain, currentEngine := c, c.Engine.Get()
for ; currentEngine == nil; currentEngine = currentChain.Engine.Get() {
if currentChain = c.ParentChain.Get(); currentChain == nil {
return nil
}
}
return currentEngine
}
// initLogger initializes the Logger of this chain.
func (c *Chain) initLogger() (shutdown func()) {
c.Logger = c.chains.NewChildLogger("", true)
return lo.Batch(
c.WarpSyncMode.LogUpdates(c, log.LevelTrace, "WarpSyncMode"),
c.LatestSyncedSlot.LogUpdates(c, log.LevelTrace, "LatestSyncedSlot"),
c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"),
c.ForkingPoint.LogUpdates(c, log.LevelTrace, "ForkingPoint", (*Commitment).LogName),
c.ClaimedWeight.LogUpdates(c, log.LevelTrace, "ClaimedWeight"),
c.AttestedWeight.LogUpdates(c, log.LevelTrace, "AttestedWeight"),
c.VerifiedWeight.LogUpdates(c, log.LevelTrace, "VerifiedWeight"),
c.LatestCommitment.LogUpdates(c, log.LevelTrace, "LatestCommitment", (*Commitment).LogName),
c.LatestAttestedCommitment.LogUpdates(c, log.LevelTrace, "LatestAttestedCommitment", (*Commitment).LogName),
c.LatestProducedCommitment.LogUpdates(c, log.LevelDebug, "LatestProducedCommitment", (*Commitment).LogName),
c.RequestAttestations.LogUpdates(c, log.LevelTrace, "RequestAttestations"),
c.StartEngine.LogUpdates(c, log.LevelDebug, "StartEngine"),
c.Engine.LogUpdates(c, log.LevelTrace, "Engine", (*engine.Engine).LogName),
c.IsEvicted.LogUpdates(c, log.LevelTrace, "IsEvicted"),
c.Logger.UnsubscribeFromParentLogger,
)
}
// initDerivedProperties initializes the behavior of this chain by setting up the relations between its properties.
func (c *Chain) initDerivedProperties() (shutdown func()) {
return lo.Batch(
c.deriveClaimedWeight(),
c.deriveVerifiedWeight(),
c.deriveLatestAttestedWeight(),
c.deriveWarpSyncMode(),
c.ForkingPoint.WithValue(c.deriveParentChain),
c.ParentChain.WithNonEmptyValue(lo.Bind(c, (*Chain).deriveChildChains)),
c.Engine.WithNonEmptyValue(c.deriveOutOfSyncThreshold),
)
}
// deriveWarpSyncMode defines how a chain determines whether it is in warp sync mode or not.
func (c *Chain) deriveWarpSyncMode() func() {
return c.WarpSyncMode.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestSyncedSlot iotago.SlotIndex, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
// if warp sync mode is enabled, keep it enabled until we have synced all slots
if warpSyncMode {
return latestSyncedSlot < latestSeenSlot
}
// if warp sync mode is disabled, enable it only if we fall below the out of sync threshold
return latestSyncedSlot < outOfSyncThreshold
}, c.LatestSyncedSlot, c.chains.LatestSeenSlot, c.OutOfSyncThreshold, c.WarpSyncMode.Get()))
}
// deriveClaimedWeight defines how a chain determines its claimed weight (by setting the cumulative weight of the
// latest commitment).
func (c *Chain) deriveClaimedWeight() (shutdown func()) {
return c.ClaimedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestCommitment *Commitment) uint64 {
if latestCommitment == nil {
return 0
}
return latestCommitment.CumulativeWeight()
}, c.LatestCommitment))
}
// deriveLatestAttestedWeight defines how a chain determines its attested weight (by inheriting the cumulative attested
// weight of the latest attested commitment). It uses inheritance instead of simply setting the value as the cumulative
// attested weight can change over time depending on the attestations that are received.
func (c *Chain) deriveLatestAttestedWeight() func() {
return c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) {
return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight)
})
}
// deriveVerifiedWeight defines how a chain determines its verified weight (by setting the cumulative weight of the
// latest produced commitment).
func (c *Chain) deriveVerifiedWeight() func() {
return c.VerifiedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestProducedCommitment *Commitment) uint64 {
if latestProducedCommitment == nil {
return 0
}
return latestProducedCommitment.CumulativeWeight()
}, c.LatestProducedCommitment))
}
// deriveChildChains defines how a chain determines its ChildChains (by adding each child to the set).
func (c *Chain) deriveChildChains(child *Chain) func() {
c.ChildChains.Add(child)
return func() {
c.ChildChains.Delete(child)
}
}
// deriveParentChain defines how a chain determines its parent chain from its forking point (it inherits the Chain from
// the parent commitment of the forking point or nil if either of them is still unknown).
func (c *Chain) deriveParentChain(forkingPoint *Commitment) (shutdown func()) {
if forkingPoint != nil {
return forkingPoint.Parent.WithValue(func(parentCommitment *Commitment) (shutdown func()) {
if parentCommitment != nil {
return c.ParentChain.InheritFrom(parentCommitment.Chain)
}
c.ParentChain.Set(nil)
return nil
})
}
c.ParentChain.Set(nil)
return nil
}
// deriveOutOfSyncThreshold defines how a chain determines its "out of sync" threshold (the latest seen slot minus 2
// times the max committable age or 0 if this would cause an overflow to the negative numbers).
func (c *Chain) deriveOutOfSyncThreshold(engineInstance *engine.Engine) func() {
return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot {
return latestSeenSlot - outOfSyncOffset
}
return 0
}, c.chains.LatestSeenSlot))
}
// addCommitment adds the given commitment to this chain.
func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) {
c.commitments.Set(newCommitment.Slot(), newCommitment)
c.LatestCommitment.Set(newCommitment)
return lo.Batch(
newCommitment.IsAttested.OnTrigger(func() { c.LatestAttestedCommitment.Set(newCommitment) }),
newCommitment.IsVerified.OnTrigger(func() { c.LatestProducedCommitment.Set(newCommitment) }),
newCommitment.IsSynced.OnTrigger(func() { c.LatestSyncedSlot.Set(newCommitment.Slot()) }),
)
}
// dispatchBlockToSpawnedEngine dispatches the given block to the spawned engine of this chain (if it exists).
func (c *Chain) dispatchBlockToSpawnedEngine(block *model.Block, src peer.ID) (dispatched bool) {
// abort if we do not have a spawned engine
engineInstance := c.Engine.Get()
if engineInstance == nil {
return false
}
// abort if the target slot is below the latest commitment
issuingTime := block.ProtocolBlock().Header.IssuingTime
targetSlot := engineInstance.APIForTime(issuingTime).TimeProvider().SlotFromTime(issuingTime)
if targetSlot <= engineInstance.LatestCommitment.Get().Slot() {
return false
}
// perform additional checks if we are in warp sync mode (only let blocks pass that we requested)
if c.WarpSyncMode.Get() {
// abort if the target commitment does not exist
targetCommitment, targetCommitmentExists := c.Commitment(targetSlot)
if !targetCommitmentExists {
return false
}
// abort if the block is not part of the blocks to warp sync
blocksToWarpSync := targetCommitment.BlocksToWarpSync.Get()
if blocksToWarpSync == nil || !blocksToWarpSync.Has(block.ID()) {
return false
}
}
// dispatch the block to the spawned engine if all previous checks passed
engineInstance.ProcessBlockFromPeer(block, src)
return true
}
// claimedWeight is a getter for the ClaimedWeight variable of this chain, which is internally used to be able to
// "address" the variable across multiple chains in a generic way.
func (c *Chain) claimedWeight() reactive.Variable[uint64] {
return c.ClaimedWeight
}
// verifiedWeight is a getter for the VerifiedWeight variable of this chain, which is internally used to be able to
// "address" the variable across multiple chains in a generic way.
func (c *Chain) verifiedWeight() reactive.Variable[uint64] {
return c.VerifiedWeight
}
// attestedWeight is a getter for the AttestedWeight variable of this chain, which is internally used to be able to
// "address" the variable across multiple chains in a generic way.
func (c *Chain) attestedWeight() reactive.Variable[uint64] {
return c.AttestedWeight
}