Skip to content

Commit

Permalink
Merge pull request #5773 from onflow/yurii/5724-epoch-extension-injec…
Browse files Browse the repository at this point in the history
…tion

[EFM Recovery] Dynamic Protocol State injects `EpochExtension`s
  • Loading branch information
durkmurder committed May 9, 2024
2 parents 83ef8ad + 4e20394 commit 2ab36aa
Show file tree
Hide file tree
Showing 6 changed files with 479 additions and 19 deletions.
50 changes: 48 additions & 2 deletions model/flow/protocol_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flow
import (
"fmt"

clone "github.com/huandu/go-clone/generic"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -59,6 +60,21 @@ type EpochStateContainer struct {
// epoch are only allowed to listen to the network but not actively contribute. Such
// nodes are _not_ part of `Identities`.
ActiveIdentities DynamicIdentityEntryList

// EpochExtensions contains potential EFM-extensions of this epoch. In the happy path
// it is nil or empty. An Epoch in which Epoch-Fallback-Mode [EFM] is triggered, will
// have at least one extension. By convention, the initial extension must satisfy
// EpochSetup.FinalView + 1 = EpochExtensions[0].FirstView
// and each consecutive pair of slice elements must obey
// EpochExtensions[i].FinalView+1 = EpochExtensions[i+1].FirstView
EpochExtensions []EpochExtension
}

// EpochExtension represents a range of views, which contiguously extends this epoch.
type EpochExtension struct {
FirstView uint64
FinalView uint64
TargetEndTime uint64
}

// ID returns an identifier for this EpochStateContainer by hashing internal fields.
Expand Down Expand Up @@ -90,6 +106,7 @@ func (c *EpochStateContainer) Copy() *EpochStateContainer {
SetupID: c.SetupID,
CommitID: c.CommitID,
ActiveIdentities: c.ActiveIdentities.Copy(),
EpochExtensions: clone.Clone(c.EpochExtensions),
}
}

Expand Down Expand Up @@ -149,12 +166,19 @@ func NewRichProtocolStateEntry(

// If previous epoch is specified: ensure respective epoch service events are not nil and consistent with commitments in `ProtocolStateEntry.PreviousEpoch`
if protocolState.PreviousEpoch != nil {
if protocolState.PreviousEpoch.SetupID != previousEpochSetup.ID() { // calling ID() will panic is EpochSetup event is nil
if protocolState.PreviousEpoch.SetupID != previousEpochSetup.ID() { // calling ID() will panic if EpochSetup event is nil
return nil, fmt.Errorf("supplied previous epoch's setup event (%x) does not match commitment (%x) in ProtocolStateEntry", previousEpochSetup.ID(), protocolState.PreviousEpoch.SetupID)
}
if protocolState.PreviousEpoch.CommitID != previousEpochCommit.ID() { // calling ID() will panic is EpochCommit event is nil
if protocolState.PreviousEpoch.CommitID != previousEpochCommit.ID() { // calling ID() will panic if EpochCommit event is nil
return nil, fmt.Errorf("supplied previous epoch's commit event (%x) does not match commitment (%x) in ProtocolStateEntry", previousEpochCommit.ID(), protocolState.PreviousEpoch.CommitID)
}
} else {
if previousEpochSetup != nil {
return nil, fmt.Errorf("no previous epoch but gotten non-nil EpochSetup event")
}
if previousEpochCommit != nil {
return nil, fmt.Errorf("no previous epoch but gotten non-nil EpochCommit event")
}
}

// For current epoch: ensure respective epoch service events are not nil and consistent with commitments in `ProtocolStateEntry.CurrentEpoch`
Expand All @@ -175,6 +199,13 @@ func NewRichProtocolStateEntry(
var err error
nextEpoch := protocolState.NextEpoch
if nextEpoch == nil { // in staking phase: build full identity table for current epoch according to (1)
if nextEpochSetup != nil {
return nil, fmt.Errorf("no next epoch but gotten non-nil EpochSetup event")
}
if nextEpochCommit != nil {
return nil, fmt.Errorf("no next epoch but gotten non-nil EpochCommit event")
}

var previousEpochIdentitySkeletons IdentitySkeletonList
var previousEpochDynamicIdentities DynamicIdentityEntryList
if previousEpochSetup != nil {
Expand All @@ -200,6 +231,10 @@ func NewRichProtocolStateEntry(
if nextEpoch.CommitID != nextEpochCommit.ID() {
return nil, fmt.Errorf("supplied next epoch's commit event (%x) does not match commitment (%x) in ProtocolStateEntry", nextEpoch.CommitID, nextEpochCommit.ID())
}
} else {
if nextEpochCommit != nil {
return nil, fmt.Errorf("next epoch not yet committed but got EpochCommit event")
}
}

result.CurrentEpochIdentityTable, err = BuildIdentityTable(
Expand Down Expand Up @@ -280,6 +315,17 @@ func (e *RichProtocolStateEntry) Copy() *RichProtocolStateEntry {
}
}

// CurrentEpochFinalView returns the final view of the current epoch, taking into account possible epoch extensions.
// If there are no epoch extensions, the final view is the final view of the current epoch setup,
// otherwise it is the final view of the last epoch extension.
func (e *RichProtocolStateEntry) CurrentEpochFinalView() uint64 {
l := len(e.CurrentEpoch.EpochExtensions)
if l > 0 {
return e.CurrentEpoch.EpochExtensions[l-1].FinalView
}
return e.CurrentEpochSetup.FinalView
}

// EpochPhase returns the current epoch phase.
// The receiver ProtocolStateEntry must be properly constructed.
func (e *ProtocolStateEntry) EpochPhase() EpochPhase {
Expand Down
7 changes: 5 additions & 2 deletions state/protocol/badger/mutator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1975,9 +1975,12 @@ func TestEmergencyEpochFallback(t *testing.T) {
require.NoError(t, err)
assertEpochEmergencyFallbackTriggered(t, state, true) // triggered after finalization

// block 5 is the first block past the current epoch boundary
block4 := unittest.BlockWithParentProtocolState(block3)
// block 4 is the first block past the current epoch boundary
block4 := unittest.BlockWithParentFixture(block3.Header)
block4.Header.View = epoch1Setup.FinalView + 1
block4.SetPayload(flow.Payload{
ProtocolStateID: calculateExpectedStateId(t, mutableState)(block4.Header, nil),
})
err = state.Extend(context.Background(), block4)
require.NoError(t, err)
err = state.Finalize(context.Background(), block4.ID())
Expand Down
2 changes: 1 addition & 1 deletion state/protocol/protocol_state/epochs/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (f *EpochStateMachineFactory) Create(candidateView uint64, parentID flow.Id
return NewHappyPathStateMachine(candidateView, parentState)
},
func(candidateView uint64, parentState *flow.RichProtocolStateEntry) (StateMachine, error) {
return NewFallbackStateMachine(candidateView, parentState), nil
return NewFallbackStateMachine(f.params, candidateView, parentState)
},
)
}
63 changes: 60 additions & 3 deletions state/protocol/protocol_state/epochs/fallback_statemachine.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package epochs

import (
"fmt"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/state/protocol"
)

// DefaultEpochExtensionViewCount is a default length of epoch extension in views, approximately 1 day.
// TODO(efm-recovery): replace this with value from KV store or protocol.GlobalParams
const DefaultEpochExtensionViewCount = 100_000

// FallbackStateMachine is a special structure that encapsulates logic for processing service events
// when protocol is in epoch fallback mode. The FallbackStateMachine ignores EpochSetup and EpochCommit
// events but still processes ejection events.
Expand All @@ -18,16 +25,66 @@ var _ StateMachine = (*FallbackStateMachine)(nil)

// NewFallbackStateMachine constructs a state machine for epoch fallback, it automatically sets
// InvalidEpochTransitionAttempted to true, thereby recording that we have entered epoch fallback mode.
func NewFallbackStateMachine(view uint64, parentState *flow.RichProtocolStateEntry) *FallbackStateMachine {
// No errors are expected during normal operations.
func NewFallbackStateMachine(params protocol.GlobalParams, view uint64, parentState *flow.RichProtocolStateEntry) (*FallbackStateMachine, error) {
state := parentState.ProtocolStateEntry.Copy()
state.InvalidEpochTransitionAttempted = true
return &FallbackStateMachine{
nextEpochCommitted := state.EpochPhase() == flow.EpochPhaseCommitted
// we are entering fallback mode, this logic needs to be executed only once
if !state.InvalidEpochTransitionAttempted {
// the next epoch has not been committed, but possibly setup, make sure it is cleared
if !nextEpochCommitted {
state.NextEpoch = nil
}
state.InvalidEpochTransitionAttempted = true
}

sm := &FallbackStateMachine{
baseStateMachine: baseStateMachine{
parentState: parentState,
state: state,
view: view,
},
}

if !nextEpochCommitted && view+params.EpochCommitSafetyThreshold() >= parentState.CurrentEpochFinalView() {
// we have reached safety threshold and we are still in the fallback mode
// prepare a new extension for the current epoch.
err := sm.extendCurrentEpoch(flow.EpochExtension{
FirstView: parentState.CurrentEpochFinalView() + 1,
FinalView: parentState.CurrentEpochFinalView() + DefaultEpochExtensionViewCount, // TODO: replace with EpochExtensionLength
TargetEndTime: 0, // TODO: calculate and set target end time
})
if err != nil {
return nil, err
}
}

return sm, nil
}

// extendCurrentEpoch appends an epoch extension to the current epoch from underlying state.
// Internally, it performs sanity checks to ensure that the epoch extension is contiguous with the current epoch.
// It also ensures that the next epoch is not present, as epoch extensions are only allowed for the current epoch.
// No errors are expected during normal operation.
func (m *FallbackStateMachine) extendCurrentEpoch(epochExtension flow.EpochExtension) error {
state := m.state
if len(state.CurrentEpoch.EpochExtensions) > 0 {
lastExtension := state.CurrentEpoch.EpochExtensions[len(state.CurrentEpoch.EpochExtensions)-1]
if lastExtension.FinalView+1 != epochExtension.FirstView {
return fmt.Errorf("epoch extension is not contiguous with the last extension")
}
} else {
if epochExtension.FirstView != m.parentState.CurrentEpochSetup.FinalView+1 {
return fmt.Errorf("first epoch extension is not contiguous with current epoch")
}
}

if state.NextEpoch != nil {
return fmt.Errorf("cannot extend current epoch when next epoch is present")
}

state.CurrentEpoch.EpochExtensions = append(state.CurrentEpoch.EpochExtensions, epochExtension)
return nil
}

// ProcessEpochSetup processes epoch setup service events, for epoch fallback we are ignoring this event.
Expand Down
Loading

0 comments on commit 2ab36aa

Please sign in to comment.