-
Notifications
You must be signed in to change notification settings - Fork 175
/
epoch_lookup.go
282 lines (244 loc) · 9.41 KB
/
epoch_lookup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
package epochs
import (
"fmt"
"sync"
"go.uber.org/atomic"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/events"
)
// epochRange captures the counter and view range of an epoch (inclusive on both ends)
type epochRange struct {
counter uint64
firstView uint64
finalView uint64
}
// exists returns true when the epochRange is initialized (anything besides the zero value for the struct).
// It is useful for checking existence while iterating the epochRangeCache.
func (er epochRange) exists() bool {
return er != epochRange{}
}
// epochRangeCache stores at most the 3 latest epoch ranges.
// Ranges are ordered by counter (ascending) and right-aligned.
// For example, if we only have one epoch cached, `epochRangeCache[0]` and `epochRangeCache[1]` are `nil`.
// Not safe for concurrent use.
type epochRangeCache [3]epochRange
// latest returns the latest cached epoch range, or nil if no epochs are cached.
func (cache *epochRangeCache) latest() epochRange {
return cache[2]
}
// combinedRange returns the endpoints of the combined view range of all cached
// epochs. In particular, we return the lowest firstView and the greatest finalView.
// At least one epoch must already be cached, otherwise this function will panic.
func (cache *epochRangeCache) combinedRange() (firstView uint64, finalView uint64) {
// low end of the range is the first view of the first cached epoch
for _, epoch := range cache {
if epoch.exists() {
firstView = epoch.firstView
break
}
}
// high end of the range is the final view of the latest cached epoch
finalView = cache.latest().finalView
return
}
// add inserts an epoch range to the cache.
// Validates that epoch counters and view ranges are sequential.
// Adding the same epoch multiple times is a no-op.
// Guarantees ordering and alignment properties of epochRangeCache are preserved.
// No errors are expected during normal operation.
func (cache *epochRangeCache) add(epoch epochRange) error {
// sanity check: ensure the epoch we are adding is considered a non-zero value
// this helps ensure internal consistency in this component, but if we ever trip this check, something is seriously wrong elsewhere
if !epoch.exists() {
return fmt.Errorf("sanity check failed: caller attempted to cache invalid zero epoch")
}
latestCachedEpoch := cache.latest()
// initial case - no epoch ranges are stored yet
if !latestCachedEpoch.exists() {
cache[2] = epoch
return nil
}
// adding the same epoch multiple times is a no-op
if latestCachedEpoch == epoch {
return nil
}
// sanity check: ensure counters/views are sequential
if epoch.counter != latestCachedEpoch.counter+1 {
return fmt.Errorf("non-sequential epoch counters: adding epoch %d when latest cached epoch is %d", epoch.counter, latestCachedEpoch.counter)
}
if epoch.firstView != latestCachedEpoch.finalView+1 {
return fmt.Errorf("non-sequential epoch view ranges: adding range [%d,%d] when latest cached range is [%d,%d]",
epoch.firstView, epoch.finalView, latestCachedEpoch.firstView, latestCachedEpoch.finalView)
}
// typical case - displacing existing epoch ranges
// insert new epoch range, shifting existing epochs left
cache[0] = cache[1] // ejects oldest epoch
cache[1] = cache[2]
cache[2] = epoch
return nil
}
// EpochLookup implements the EpochLookup interface using protocol state to match views to epochs.
// CAUTION: EpochLookup should only be used for querying the previous, current, or next epoch.
type EpochLookup struct {
state protocol.State
mu sync.RWMutex
epochs epochRangeCache
committedEpochsCh chan *flow.Header // protocol events for newly committed epochs (the first block of the epoch is passed over the channel)
epochFallbackIsTriggered *atomic.Bool // true when epoch fallback is triggered
events.Noop // implements protocol.Consumer
component.Component
}
var _ protocol.Consumer = (*EpochLookup)(nil)
var _ module.EpochLookup = (*EpochLookup)(nil)
// NewEpochLookup instantiates a new EpochLookup
func NewEpochLookup(state protocol.State) (*EpochLookup, error) {
lookup := &EpochLookup{
state: state,
committedEpochsCh: make(chan *flow.Header, 1),
epochFallbackIsTriggered: atomic.NewBool(false),
}
lookup.Component = component.NewComponentManagerBuilder().
AddWorker(lookup.handleProtocolEvents).
Build()
final := state.Final()
// we cache the previous epoch, if one exists
exists, err := protocol.PreviousEpochExists(final)
if err != nil {
return nil, fmt.Errorf("could not check previous epoch exists: %w", err)
}
if exists {
err := lookup.cacheEpoch(final.Epochs().Previous())
if err != nil {
return nil, fmt.Errorf("could not prepare previous epoch: %w", err)
}
}
// we always cache the current epoch
err = lookup.cacheEpoch(final.Epochs().Current())
if err != nil {
return nil, fmt.Errorf("could not prepare current epoch: %w", err)
}
// we cache the next epoch, if it is committed
phase, err := final.Phase()
if err != nil {
return nil, fmt.Errorf("could not check epoch phase: %w", err)
}
if phase == flow.EpochPhaseCommitted {
err := lookup.cacheEpoch(final.Epochs().Next())
if err != nil {
return nil, fmt.Errorf("could not prepare previous epoch: %w", err)
}
}
// if epoch fallback was triggered, note it here
triggered, err := state.Params().EpochFallbackTriggered()
if err != nil {
return nil, fmt.Errorf("could not check epoch fallback: %w", err)
}
if triggered {
lookup.epochFallbackIsTriggered.Store(true)
}
return lookup, nil
}
// cacheEpoch caches the given epoch's view range. Must only be called with committed epochs.
// No errors are expected during normal operation.
func (lookup *EpochLookup) cacheEpoch(epoch protocol.Epoch) error {
counter, err := epoch.Counter()
if err != nil {
return err
}
firstView, err := epoch.FirstView()
if err != nil {
return err
}
finalView, err := epoch.FinalView()
if err != nil {
return err
}
cachedEpoch := epochRange{
counter: counter,
firstView: firstView,
finalView: finalView,
}
lookup.mu.Lock()
err = lookup.epochs.add(cachedEpoch)
lookup.mu.Unlock()
if err != nil {
return fmt.Errorf("could not add epoch %d: %w", counter, err)
}
return nil
}
// EpochForViewWithFallback returns the counter of the epoch that the input view belongs to.
// If epoch fallback has been triggered, returns the last committed epoch counter
// in perpetuity for any inputs beyond the last committed epoch view range.
// For example, if we trigger epoch fallback during epoch 10, and reach the final
// view of epoch 10 before epoch 11 has finished being setup, this function will
// return 10 even for input views beyond the final view of epoch 10.
//
// Returns model.ErrViewForUnknownEpoch if the input does not fall within the range of a known epoch.
func (lookup *EpochLookup) EpochForViewWithFallback(view uint64) (uint64, error) {
lookup.mu.RLock()
defer lookup.mu.RUnlock()
firstView, finalView := lookup.epochs.combinedRange()
// LEGEND:
// * -> view argument
// [----| -> epoch view range
// view is before any known epochs
// ---*---[----|----|----]-------
if view < firstView {
return 0, model.ErrViewForUnknownEpoch
}
// view is after any known epochs
// -------[----|----|----]---*---
if view > finalView {
// if epoch fallback is triggered, we treat this view as part of the last committed epoch
if lookup.epochFallbackIsTriggered.Load() {
return lookup.epochs.latest().counter, nil
}
// otherwise, we are waiting for the epoch including this view to be committed
return 0, model.ErrViewForUnknownEpoch
}
// view is within a known epoch
for _, epoch := range lookup.epochs {
if !epoch.exists() {
continue
}
if epoch.firstView <= view && view <= epoch.finalView {
return epoch.counter, nil
}
}
// reaching this point indicates a corrupted state or internal bug
return 0, fmt.Errorf("sanity check failed: cached epochs (%v) does not contain input view %d", lookup.epochs, view)
}
// handleProtocolEvents processes queued Epoch events `EpochCommittedPhaseStarted`
// and `EpochEmergencyFallbackTriggered`. This function permanently utilizes a worker
// routine until the `Component` terminates.
// When we observe a new epoch being committed, we compute
// the leader selection and cache static info for the epoch. When we observe
// epoch emergency fallback being triggered, we inject a fallback epoch.
func (lookup *EpochLookup) handleProtocolEvents(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
for {
select {
case <-ctx.Done():
return
case block := <-lookup.committedEpochsCh:
epoch := lookup.state.AtBlockID(block.ID()).Epochs().Next()
err := lookup.cacheEpoch(epoch)
if err != nil {
ctx.Throw(err)
}
}
}
}
// EpochCommittedPhaseStarted informs the `committee.Consensus` that the block starting the Epoch Committed Phase has been finalized.
func (lookup *EpochLookup) EpochCommittedPhaseStarted(_ uint64, first *flow.Header) {
lookup.committedEpochsCh <- first
}
// EpochEmergencyFallbackTriggered passes the protocol event to the worker thread.
func (lookup *EpochLookup) EpochEmergencyFallbackTriggered() {
lookup.epochFallbackIsTriggered.Store(true)
}