-
Notifications
You must be signed in to change notification settings - Fork 12
/
chains.go
300 lines (240 loc) · 12.1 KB
/
chains.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
package protocol
import (
"cmp"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/iotaledger/hive.go/ds/reactive"
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/log"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
iotago "github.com/iotaledger/iota.go/v4"
)
// region Chains ///////////////////////////////////////////////////////////////////////////////////////////////////////
// Chains is a subcomponent of the protocol that exposes the chains that are managed by the protocol and that implements
// the chain switching logic.
type Chains struct {
// Set contains all non-evicted chains that are managed by the protocol.
reactive.Set[*Chain]
// Main contains the main chain.
Main reactive.Variable[*Chain]
// HeaviestClaimedCandidate contains the candidate chain with the heaviest claimed weight according to its latest commitment. The weight has neither been checked via attestations nor verified by downloading all data.
HeaviestClaimedCandidate *ChainsCandidate
// HeaviestAttestedCandidate contains the candidate chain with the heaviest weight as checked by attestations. The chain has not been instantiated into an engine yet.
HeaviestAttestedCandidate *ChainsCandidate
// HeaviestVerifiedCandidate contains the candidate chain with the heaviest verified weight, meaning the chain has been instantiated into an engine and the commitments have been produced by the engine itself.
HeaviestVerifiedCandidate *ChainsCandidate
// LatestSeenSlot contains the slot of the latest commitment of any received block.
LatestSeenSlot reactive.Variable[iotago.SlotIndex]
// protocol contains a reference to the Protocol instance that this component belongs to.
protocol *Protocol
// Logger contains a reference to the logger that is used by this component.
log.Logger
}
// newChains creates a new chains instance for the given protocol.
func newChains(protocol *Protocol) *Chains {
c := &Chains{
Set: reactive.NewSet[*Chain](),
Main: reactive.NewVariable[*Chain](),
LatestSeenSlot: reactive.NewVariable[iotago.SlotIndex](increasing[iotago.SlotIndex]),
protocol: protocol,
}
c.HeaviestClaimedCandidate = newChainsCandidate(c, (*Commitment).cumulativeWeight)
c.HeaviestAttestedCandidate = newChainsCandidate(c, (*Commitment).cumulativeAttestedWeight)
c.HeaviestVerifiedCandidate = newChainsCandidate(c, (*Commitment).cumulativeVerifiedWeight)
shutdown := lo.BatchReverse(
c.initLogger(protocol.NewChildLogger("Chains")),
c.initChainSwitching(),
protocol.ConstructedEvent().WithNonEmptyValue(func(_ bool) (shutdown func()) {
return c.deriveLatestSeenSlot(protocol)
}),
)
protocol.ShutdownEvent().OnTrigger(shutdown)
return c
}
// WithInitializedEngines is a reactive selector that executes the given callback for each managed chain that
// initialized its engine.
func (c *Chains) WithInitializedEngines(callback func(chain *Chain, engine *engine.Engine) (shutdown func())) (shutdown func()) {
return c.WithElements(func(chain *Chain) (shutdown func()) {
return chain.WithInitializedEngine(func(engine *engine.Engine) (shutdown func()) {
return callback(chain, engine)
})
})
}
// initLogger initializes the logger for this component.
func (c *Chains) initLogger(logger log.Logger) (shutdown func()) {
c.Logger = logger
return lo.BatchReverse(
c.Main.LogUpdates(c, log.LevelTrace, "Main", (*Chain).LogName),
c.HeaviestClaimedCandidate.LogUpdates(c, log.LevelDebug, "HeaviestClaimedCandidate", (*Chain).LogName),
c.HeaviestAttestedCandidate.LogUpdates(c, log.LevelDebug, "HeaviestAttestedCandidate", (*Chain).LogName),
c.HeaviestVerifiedCandidate.LogUpdates(c, log.LevelDebug, "HeaviestVerifiedCandidate", (*Chain).LogName),
logger.Shutdown,
)
}
// initChainSwitching initializes the chain switching logic.
func (c *Chains) initChainSwitching() (shutdown func()) {
mainChain := c.newChain()
mainChain.StartEngine.Set(true)
c.Main.Set(mainChain)
return lo.BatchReverse(
c.HeaviestClaimedCandidate.WithNonEmptyValue(func(heaviestClaimedCandidate *Chain) (shutdown func()) {
return heaviestClaimedCandidate.RequestAttestations.ToggleValue(true)
}),
c.HeaviestAttestedCandidate.OnUpdate(func(_ *Chain, heaviestAttestedCandidate *Chain) {
if heaviestAttestedCandidate != nil {
heaviestAttestedCandidate.StartEngine.Set(true)
}
}),
c.HeaviestVerifiedCandidate.OnUpdate(func(_ *Chain, heaviestVerifiedCandidate *Chain) {
if heaviestVerifiedCandidate != nil {
c.Main.Set(heaviestVerifiedCandidate)
}
}),
c.WithElements(c.trackHeaviestCandidates),
c.LatestSeenSlot.WithNonEmptyValue(c.updateMeasuredSlot),
)
}
func (c *Chains) trackHeaviestCandidates(chain *Chain) (teardown func()) {
return chain.LatestCommitment.OnUpdate(func(_ *Commitment, latestCommitment *Commitment) {
chain.DivergencePointVerified.OnTrigger(func() {
targetSlot := latestCommitment.ID().Index()
if evictionEvent := c.protocol.EvictionEvent(targetSlot); !evictionEvent.WasTriggered() {
c.HeaviestClaimedCandidate.registerCommitment(targetSlot, latestCommitment, evictionEvent)
latestCommitment.IsAttested.OnTrigger(func() {
c.HeaviestAttestedCandidate.registerCommitment(targetSlot, latestCommitment, evictionEvent)
})
latestCommitment.IsVerified.OnTrigger(func() {
c.HeaviestVerifiedCandidate.registerCommitment(targetSlot, latestCommitment, evictionEvent)
})
}
})
})
}
func (c *Chains) updateMeasuredSlot(latestSeenSlot iotago.SlotIndex) (teardown func()) {
measuredSlot := latestSeenSlot - chainSwitchingMeasurementOffset
return lo.BatchReverse(
c.HeaviestClaimedCandidate.measureAt(measuredSlot),
c.HeaviestAttestedCandidate.measureAt(measuredSlot),
c.HeaviestVerifiedCandidate.measureAt(measuredSlot),
)
}
// deriveLatestSeenSlot derives the latest seen slot from the protocol.
func (c *Chains) deriveLatestSeenSlot(protocol *Protocol) func() {
//nolint:revive
return protocol.Engines.Main.WithNonEmptyValue(func(mainEngine *engine.Engine) (shutdown func()) {
return lo.BatchReverse(
c.WithInitializedEngines(func(_ *Chain, engine *engine.Engine) (shutdown func()) {
return engine.LatestCommitment.OnUpdate(func(_ *model.Commitment, latestCommitment *model.Commitment) {
// Check the value to avoid having to acquire write locks inside of the Set method.
if c.LatestSeenSlot.Get() < latestCommitment.Slot() {
c.LatestSeenSlot.Set(latestCommitment.Slot())
}
})
}),
protocol.Network.OnBlockReceived(func(block *model.Block, _ peer.ID) {
// Check the value to avoid having to acquire write locks inside of the Set method.
if c.LatestSeenSlot.Get() < block.ProtocolBlock().Header.SlotCommitmentID.Slot() {
c.LatestSeenSlot.Set(block.ProtocolBlock().Header.SlotCommitmentID.Slot())
}
}),
)
})
}
// newChain creates a new chain instance and adds it to the set of chains.
func (c *Chains) newChain() *Chain {
chain := newChain(c)
if c.Add(chain) {
chain.IsEvicted.OnTrigger(func() { c.Delete(chain) })
}
return chain
}
// increasing is a generic function that returns the maximum of the two given values.
func increasing[T cmp.Ordered](currentValue T, newValue T) T {
return max(currentValue, newValue)
}
const chainSwitchingMeasurementOffset iotago.SlotIndex = 1
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region ChainsCandidate //////////////////////////////////////////////////////////////////////////////////////////////
// ChainsCandidate implements a wrapper for the logic of tracking the heaviest candidate of all Chains in respect to
// some monitored weight variable.
type ChainsCandidate struct {
// Variable contains the heaviest chain candidate.
reactive.Variable[*Chain]
// chains contains a reference to the Chains instance that this candidate belongs to.
chains *Chains
// weightVariable contains the weight variable that is used to determine the heaviest chain candidate.
weightVariable func(element *Commitment) reactive.Variable[uint64]
// sortedCommitmentsBySlot contains the sorted commitments for each slot.
sortedCommitmentsBySlot *shrinkingmap.ShrinkingMap[iotago.SlotIndex, reactive.SortedSet[*Commitment]]
}
// newChainsCandidate creates a new heaviest chain candidate.
func newChainsCandidate(chains *Chains, weightVariable func(element *Commitment) reactive.Variable[uint64]) *ChainsCandidate {
return &ChainsCandidate{
Variable: reactive.NewVariable[*Chain](),
chains: chains,
sortedCommitmentsBySlot: shrinkingmap.New[iotago.SlotIndex, reactive.SortedSet[*Commitment]](),
weightVariable: weightVariable,
}
}
// measureAt measures the heaviest chain candidate at the given slot and updates the variable as soon as the threshold
// of chainSwitchingThreshold slots with the same heaviest chain in respect to the given slot is reached.
func (c *ChainsCandidate) measureAt(slot iotago.SlotIndex) (teardown func()) {
// sanitize protocol parameters
chainSwitchingThreshold := c.chains.protocol.APIForSlot(slot).ProtocolParameters().ChainSwitchingThreshold()
if slot < iotago.SlotIndex(chainSwitchingThreshold) {
return nil
}
// get the sorted commitments for the given slot
sortedCommitments := c.sortedCommitments(slot, c.chains.protocol.EvictionEvent(slot))
// make sure the heaviest commitment was the heaviest for the last chainSwitchingThreshold slots before we update
return sortedCommitments.HeaviestElement().WithNonEmptyValue(func(heaviestCommitment *Commitment) (teardown func()) {
return c.weightVariable(heaviestCommitment).WithValue(func(candidateWeight uint64) (teardown func()) {
heaviestChain := heaviestCommitment.Chain.Get()
// abort if the heaviest commitment is the main chain or main chain is heavier
if mainChain := c.chains.Main.Get(); heaviestChain == mainChain {
return nil
} else if mainChain.CumulativeVerifiedWeightAt(heaviestCommitment.Slot()) > candidateWeight {
return nil
}
// create counter for the number of slots with the same chain
slotsWithSameChain := reactive.NewCounter[*Commitment](func(commitment *Commitment) bool {
return commitment.Chain.Get() == heaviestChain
})
// reactively counts the number of slots with the same chain
var teardownMonitoringFunctions []func()
for i := uint8(1); i < chainSwitchingThreshold; i++ {
if earlierCommitments, earlierCommitmentsExist := c.sortedCommitmentsBySlot.Get(slot - iotago.SlotIndex(i)); earlierCommitmentsExist {
teardownMonitoringFunctions = append(teardownMonitoringFunctions, slotsWithSameChain.Monitor(earlierCommitments.HeaviestElement()))
}
}
// reactively update the value in respect to the reached threshold
teardownUpdates := slotsWithSameChain.OnUpdate(func(_ int, slotsWithSameChain int) {
if slotsWithSameChain >= int(chainSwitchingThreshold)-1 {
c.Set(heaviestChain)
} else {
c.Set(nil)
}
})
// return all teardown functions
return lo.BatchReverse(append(teardownMonitoringFunctions, teardownUpdates)...)
})
})
}
// registerCommitment registers the given commitment for the given slot, which makes it become part of the weight
// measurement process.
func (c *ChainsCandidate) registerCommitment(slot iotago.SlotIndex, commitment *Commitment, evictionEvent reactive.Event) {
sortedCommitments := c.sortedCommitments(slot, evictionEvent)
sortedCommitments.Add(commitment)
}
// sortedCommitments returns the sorted commitments for the given slot and creates a new sorted set if it doesn't exist.
func (c *ChainsCandidate) sortedCommitments(slot iotago.SlotIndex, evictionEvent reactive.Event) (sortedCommitments reactive.SortedSet[*Commitment]) {
sortedCommitments, slotCreated := c.sortedCommitmentsBySlot.GetOrCreate(slot, func() reactive.SortedSet[*Commitment] {
return reactive.NewSortedSet(c.weightVariable)
})
if slotCreated {
evictionEvent.OnTrigger(func() { c.sortedCommitmentsBySlot.Delete(slot) })
}
return sortedCommitments
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////