-
Notifications
You must be signed in to change notification settings - Fork 12
/
commitments.go
365 lines (292 loc) · 13.5 KB
/
commitments.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
package protocol
import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/iotaledger/hive.go/core/eventticker"
"github.com/iotaledger/hive.go/ds/reactive"
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/log"
"github.com/iotaledger/hive.go/runtime/workerpool"
"github.com/iotaledger/iota-core/pkg/core/promise"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
iotago "github.com/iotaledger/iota.go/v4"
)
// Commitments is a subcomponent of the protocol that exposes the commitments that are managed by the protocol and that
// are either published from the network or created by an engine of the node.
type Commitments struct {
// Set contains all non-evicted commitments that are managed by the protocol.
reactive.Set[*Commitment]
// Root contains the root commitment.
Root reactive.Variable[*Commitment]
// protocol contains a reference to the Protocol instance that this component belongs to.
protocol *Protocol
// cachedRequests contains Promise instances for all non-evicted commitments that were requested by the Protocol.
// It acts as a cache and a way to address commitments generically even if they are still unsolid.
cachedRequests *shrinkingmap.ShrinkingMap[iotago.CommitmentID, *promise.Promise[*Commitment]]
// workerPool contains the worker pool that is used to process commitment requests and responses asynchronously.
workerPool *workerpool.WorkerPool
// requester contains the ticker that is used to send commitment requests.
requester *eventticker.EventTicker[iotago.SlotIndex, iotago.CommitmentID]
// Logger contains a reference to the logger that is used by this component.
log.Logger
}
// newCommitments creates a new commitments instance for the given protocol.
func newCommitments(protocol *Protocol) *Commitments {
c := &Commitments{
Set: reactive.NewSet[*Commitment](),
Root: reactive.NewVariable[*Commitment](),
protocol: protocol,
cachedRequests: shrinkingmap.New[iotago.CommitmentID, *promise.Promise[*Commitment]](),
workerPool: protocol.Workers.CreatePool("Commitments"),
requester: eventticker.New[iotago.SlotIndex, iotago.CommitmentID](protocol.Options.CommitmentRequesterOptions...),
}
shutdown := lo.BatchReverse(
c.initLogger(),
c.initEngineCommitmentSynchronization(),
c.initRequester(),
)
protocol.ShutdownEvent().OnTrigger(shutdown)
return c
}
// Get returns the Commitment for the given commitmentID. If the Commitment is not available yet, it will return an
// ErrorCommitmentNotFound. It is possible to trigger a request for the Commitment by passing true as the second
// argument.
func (c *Commitments) Get(commitmentID iotago.CommitmentID, requestIfMissing ...bool) (commitment *Commitment, err error) {
cachedRequest, exists := c.cachedRequests.Get(commitmentID)
if !exists && lo.First(requestIfMissing) {
if cachedRequest = c.cachedRequest(commitmentID, true); cachedRequest.WasRejected() {
return nil, ierrors.Wrapf(cachedRequest.Err(), "failed to request commitment %s", commitmentID)
}
}
if cachedRequest == nil || !cachedRequest.WasCompleted() {
return nil, ErrorCommitmentNotFound
}
return cachedRequest.Result(), cachedRequest.Err()
}
// API returns the CommitmentAPI for the given commitmentID. If the Commitment is not available, it will return
// ErrorCommitmentNotFound.
func (c *Commitments) API(commitmentID iotago.CommitmentID) (commitmentAPI *engine.CommitmentAPI, err error) {
if commitmentID.Slot() <= c.Root.Get().Slot() {
return c.protocol.Engines.Main.Get().CommitmentAPI(commitmentID)
}
commitment, err := c.Get(commitmentID)
if err != nil {
return nil, ierrors.Wrap(err, "failed to load commitment")
}
return commitment.TargetEngine().CommitmentAPI(commitmentID)
}
// initLogger initializes the logger for this component.
func (c *Commitments) initLogger() (shutdown func()) {
c.Logger = c.protocol.NewChildLogger("Commitments")
return lo.BatchReverse(
c.Root.LogUpdates(c, log.LevelTrace, "Root", (*Commitment).LogName),
c.Logger.Shutdown,
)
}
// initEngineCommitmentSynchronization initializes the synchronization of commitments that are published by the engines.
func (c *Commitments) initEngineCommitmentSynchronization() func() {
return c.protocol.ConstructedEvent().WithNonEmptyValue(func(_ bool) (shutdown func()) {
return lo.BatchReverse(
// advance the root commitment of the main chain
c.protocol.Chains.Main.WithNonEmptyValue(func(mainChain *Chain) (shutdown func()) {
return mainChain.WithInitializedEngine(func(mainEngine *engine.Engine) (shutdown func()) {
return c.publishRootCommitment(mainChain, mainEngine)
})
}),
// publish the commitments that are produced by the engines
c.protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) {
return c.publishEngineCommitments(chain, engine)
}),
)
})
}
// initRequester initializes the requester that is used to request commitments from the network.
func (c *Commitments) initRequester() (shutdown func()) {
unsubscribeFromTicker := c.requester.Events.Tick.Hook(c.sendRequest).Unhook
return func() {
unsubscribeFromTicker()
c.requester.Shutdown()
}
}
// publishRootCommitment publishes the root commitment of the main engine.
func (c *Commitments) publishRootCommitment(mainChain *Chain, mainEngine *engine.Engine) func() {
return mainEngine.RootCommitment.OnUpdate(func(_ *model.Commitment, rootCommitment *model.Commitment) {
publishedCommitment, published, err := c.publishCommitment(rootCommitment)
if err != nil {
c.LogError("failed to publish new root commitment", "id", rootCommitment.ID(), "error", err)
return
}
publishedCommitment.IsRoot.Set(true)
if published {
publishedCommitment.Chain.Set(mainChain)
}
// Update the forking point of a chain only if the root is empty or root belongs to the main chain or the published commitment is on the main chain.
// to avoid updating ForkingPoint of the new mainChain into the past.
if c.Root.Get() == nil || c.Root.Get().Chain.Get() == mainChain || publishedCommitment.Chain.Get() == mainChain {
mainChain.ForkingPoint.Set(publishedCommitment)
}
c.Root.Set(publishedCommitment)
})
}
// publishEngineCommitments publishes the commitments of the given engine to its chain.
func (c *Commitments) publishEngineCommitments(chain *Chain, engine *engine.Engine) (shutdown func()) {
latestPublishedSlot := chain.LastCommonSlot()
return engine.LatestCommitment.OnUpdate(func(_ *model.Commitment, latestCommitment *model.Commitment) {
loadCommitment := func(slot iotago.SlotIndex) (*model.Commitment, error) {
// prevent disk access if possible
if slot == latestCommitment.Slot() {
return latestCommitment, nil
}
return engine.Storage.Commitments().Load(slot)
}
for ; latestPublishedSlot < latestCommitment.Slot(); latestPublishedSlot++ {
// retrieve the commitment to publish
commitment, err := loadCommitment(latestPublishedSlot + 1)
if err != nil {
c.LogError("failed to load commitment to publish from engine", "slot", latestPublishedSlot+1, "err", err)
return
}
// publish the commitment
publishedCommitment, published, err := c.publishCommitment(commitment, true)
if err != nil {
c.LogError("failed to publish commitment from engine", "engine", engine.LogName(), "commitment", commitment, "err", err)
return
}
// force the chain before initializing the behavior to prevent the creation of unnecessary chains
publishedCommitment.forceChain(chain)
if published {
publishedCommitment.initBehavior()
}
// mark it as produced by ourselves and force it to be on the right chain (in case our chain produced a
// different commitment than the one we erroneously expected it to be - we always trust our engine most).
publishedCommitment.AttestedWeight.Set(publishedCommitment.Weight.Get())
publishedCommitment.IsVerified.Set(true)
}
})
}
// publishCommitment publishes the given commitment and returns the singleton Commitment instance that is used to
// represent it in our data structure (together with a boolean that indicates if we were the first goroutine to publish
// the commitment).
func (c *Commitments) publishCommitment(commitment *model.Commitment, initManually ...bool) (publishedCommitment *Commitment, published bool, err error) {
// retrieve promise and abort if it was already rejected
cachedRequest := c.cachedRequest(commitment.ID())
if cachedRequest.WasRejected() {
return nil, false, ierrors.Wrapf(cachedRequest.Err(), "failed to request commitment %s", commitment.ID())
}
// otherwise try to publish it and determine if we were the goroutine that published it
cachedRequest.ResolveDynamically(func() *Commitment {
publishedCommitment = newCommitment(c, commitment)
published = true
if !lo.First(initManually) {
publishedCommitment.initBehavior()
}
return publishedCommitment
})
return cachedRequest.Result(), published, nil
}
// cachedRequest returns a singleton Promise for the given commitmentID. If the Promise does not exist yet, it will be
// created and optionally requested from the network if missing. Once the promise is resolved, the Commitment is
// initialized and provided to the consumers.
func (c *Commitments) cachedRequest(commitmentID iotago.CommitmentID, requestIfMissing ...bool) *promise.Promise[*Commitment] {
// handle evicted slots
slotEvicted := c.protocol.EvictionEvent(commitmentID.Index())
if slotEvicted.WasTriggered() {
return promise.New[*Commitment]().Reject(ErrorSlotEvicted)
}
// create a new promise or return the existing one
cachedRequest, promiseCreated := c.cachedRequests.GetOrCreate(commitmentID, lo.NoVariadic(promise.New[*Commitment]))
if !promiseCreated {
return cachedRequest
}
// start ticker if requested
if lo.First(requestIfMissing) {
c.requester.StartTicker(commitmentID)
cachedRequest.OnComplete(func() {
c.requester.StopTicker(commitmentID)
})
}
// handle successful resolutions
cachedRequest.OnSuccess(func(commitment *Commitment) {
c.initCommitment(commitment, slotEvicted)
})
// handle failed resolutions
cachedRequest.OnError(func(err error) {
c.LogDebug("request failed", "commitmentID", commitmentID, "error", err)
})
// tear down the promise once the slot is evicted
slotEvicted.OnTrigger(func() {
c.cachedRequests.Delete(commitmentID)
cachedRequest.Reject(ErrorSlotEvicted)
})
return cachedRequest
}
// initCommitment initializes the given commitment in the protocol.
func (c *Commitments) initCommitment(commitment *Commitment, slotEvicted reactive.Event) {
commitment.LogDebug("created", "id", commitment.ID())
// solidify the parent of the commitment
if root := c.Root.Get(); root != nil && commitment.Slot() > root.Slot() {
c.cachedRequest(commitment.PreviousCommitmentID(), true).OnSuccess(func(parent *Commitment) {
parent.IsEvicted.OnTrigger(commitment.Parent.ToggleValue(parent))
})
}
// add commitment to the set
c.Add(commitment)
// tear down the commitment once the slot is evicted
slotEvicted.OnTrigger(func() {
c.Delete(commitment)
commitment.IsEvicted.Trigger()
})
}
// sendRequest sends a commitment request for the given commitment ID to all peers.
func (c *Commitments) sendRequest(commitmentID iotago.CommitmentID) {
c.workerPool.Submit(func() {
c.protocol.Network.RequestSlotCommitment(commitmentID)
c.LogDebug("request", "commitment", commitmentID)
})
}
// processRequest processes the given commitment request.
func (c *Commitments) processRequest(commitmentID iotago.CommitmentID, from peer.ID) {
loadCommitment := func() (*model.Commitment, error) {
if commitment, err := c.Get(commitmentID); err == nil {
return commitment.Commitment, nil
} else if !ierrors.Is(err, ErrorCommitmentNotFound) || commitmentID.Slot() > c.Root.Get().Slot() {
return nil, ierrors.Wrap(err, "failed to load commitment metadata")
}
commitmentAPI, err := c.protocol.Engines.Main.Get().CommitmentAPI(commitmentID)
if err != nil {
return nil, ierrors.Wrap(err, "failed to load engine API")
}
return commitmentAPI.Commitment()
}
loggedWorkerPoolTask(c.workerPool, func() error {
commitment, err := loadCommitment()
if err != nil {
return ierrors.Wrap(err, "failed to load commitment")
}
c.protocol.Network.SendSlotCommitment(commitment, from)
return nil
}, c, "commitmentID", commitmentID, "fromPeer", from)
}
// processResponse processes the given commitment response.
func (c *Commitments) processResponse(commitment *model.Commitment, from peer.ID) {
c.workerPool.Submit(func() {
// make sure the main engine is available to process the response
mainEngine := c.protocol.Engines.Main.Get()
if mainEngine == nil {
c.LogError("main engine unavailable for response", "commitment", commitment.ID(), "fromPeer", from)
return
}
// verify the commitment's version corresponds to the protocol version for the slot.
if apiForSlot := mainEngine.APIForSlot(commitment.Slot()); apiForSlot.Version() != commitment.Commitment().ProtocolVersion {
c.LogDebug("received commitment with invalid protocol version", "commitment", commitment.ID(), "version", commitment.Commitment().ProtocolVersion, "expectedVersion", apiForSlot.Version(), "fromPeer", from)
return
}
if publishedCommitment, published, err := c.protocol.Commitments.publishCommitment(commitment); err != nil {
c.LogError("failed to process commitment", "fromPeer", from, "err", err)
} else if published {
c.LogTrace("received response", "commitment", publishedCommitment.LogName(), "fromPeer", from)
}
})
}