Skip to content

Commit

Permalink
go/consensus/tendermint: Simplify block context, include time
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Jun 23, 2023
1 parent 1eb7c59 commit b386216
Show file tree
Hide file tree
Showing 37 changed files with 179 additions and 318 deletions.
4 changes: 1 addition & 3 deletions go/consensus/tendermint/abci/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -54,9 +53,8 @@ func (s *testSubscriber) ExecuteMessage(ctx *api.Context, kind, msg interface{})
func TestMessageDispatcher(t *testing.T) {
require := require.New(t)

now := time.Unix(1580461674, 0)
appState := api.NewMockApplicationState(&api.MockApplicationStateConfig{})
ctx := appState.NewContext(api.ContextBeginBlock, now)
ctx := appState.NewContext(api.ContextBeginBlock)
defer ctx.Close()

var md messageDispatcher
Expand Down
54 changes: 28 additions & 26 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,6 @@ type abciMux struct {
appsByLexOrder []api.Application
appBlessed api.Application

currentTime time.Time

haltOnce sync.Once
haltHooks []consensus.HaltHook

Expand Down Expand Up @@ -293,7 +291,13 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
panic("mux: invalid genesis application state")
}

mux.currentTime = st.Time
// Reset block context -- we don't really have a block but we need the time.
mux.state.blockLock.Lock()
mux.state.blockTime = st.Time
mux.state.blockCtx = api.NewBlockContext(api.BlockInfo{
Time: st.Time,
})
mux.state.blockLock.Unlock()

if st.Height != req.InitialHeight || uint64(st.Height) != mux.state.initialHeight {
panic(fmt.Errorf("mux: inconsistent initial height (genesis: %d abci: %d state: %d)", st.Height, req.InitialHeight, mux.state.initialHeight))
Expand Down Expand Up @@ -324,7 +328,7 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
// Call InitChain() on all applications.
mux.logger.Debug("InitChain: initializing applications")

ctx := mux.state.NewContext(api.ContextInitChain, mux.currentTime)
ctx := mux.state.NewContext(api.ContextInitChain)
defer ctx.Close()

for _, app := range mux.appsByLexOrder {
Expand Down Expand Up @@ -358,7 +362,7 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
panic(fmt.Errorf("mux: failed to set consensus parameters: %w", err))
}
// Since InitChain does not have a commit step, perform some state updates here.
if err = mux.state.doInitChain(st.Time); err != nil {
if err = mux.state.doInitChain(); err != nil {
panic(fmt.Errorf("mux: failed to init chain state: %w", err))
}

Expand Down Expand Up @@ -558,24 +562,25 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
"block_height", blockHeight,
)

mux.currentTime = req.Header.Time

params := mux.state.ConsensusParameters()

// Create empty block context.
mux.state.blockCtx = api.NewBlockContext()
// Create BeginBlock context.
ctx := mux.state.NewContext(api.ContextBeginBlock, mux.currentTime)
defer ctx.Close()

// Reset block context for the new block.
blockCtx := api.NewBlockContext(api.BlockInfo{
Time: req.Header.Time,
ProposerAddress: req.Header.ProposerAddress,
LastCommitInfo: req.LastCommitInfo,
ValidatorMisbehavior: req.ByzantineValidators,
})
if params.MaxBlockGas > 0 {
api.SetBlockGasAccountant(ctx, api.NewGasAccountant(params.MaxBlockGas))
blockCtx.GasAccountant = api.NewGasAccountant(params.MaxBlockGas)
} else {
api.SetBlockGasAccountant(ctx, api.NewNopGasAccountant())
blockCtx.GasAccountant = api.NewNopGasAccountant()
}
api.SetBlockProposer(ctx, req.Header.ProposerAddress)
api.SetLastCommitInfo(ctx, req.LastCommitInfo)
api.SetValidatorMisbehavior(ctx, req.ByzantineValidators)
mux.state.blockCtx = blockCtx

// Create BeginBlock context.
ctx := mux.state.NewContext(api.ContextBeginBlock)
defer ctx.Close()

currentEpoch, err := mux.state.GetCurrentEpoch(ctx)
if err != nil {
Expand Down Expand Up @@ -656,7 +661,7 @@ func (mux *abciMux) notifyInvalidatedCheckTx(txHash hash.Hash, err error) {
}

func (mux *abciMux) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
ctx := mux.state.NewContext(api.ContextCheckTx, mux.currentTime)
ctx := mux.state.NewContext(api.ContextCheckTx)
defer ctx.Close()

if err := mux.executeTx(ctx, req.Tx); err != nil {
Expand Down Expand Up @@ -703,7 +708,7 @@ func (mux *abciMux) DeliverTx(req types.RequestDeliverTx) types.ResponseDeliverT
return *resp
}

ctx := mux.state.NewContext(api.ContextDeliverTx, mux.currentTime)
ctx := mux.state.NewContext(api.ContextDeliverTx)
defer ctx.Close()

if err := mux.executeTx(ctx, req.Tx); err != nil {
Expand Down Expand Up @@ -751,7 +756,7 @@ func (mux *abciMux) EndBlock(req types.RequestEndBlock) types.ResponseEndBlock {
"block_height", mux.state.BlockHeight(),
)

ctx := mux.state.NewContext(api.ContextEndBlock, mux.currentTime)
ctx := mux.state.NewContext(api.ContextEndBlock)
defer ctx.Close()

// Dispatch EndBlock to all applications.
Expand Down Expand Up @@ -794,14 +799,11 @@ func (mux *abciMux) EndBlock(req types.RequestEndBlock) types.ResponseEndBlock {
},
}

// Clear block context.
mux.state.blockCtx = nil

return resp
}

func (mux *abciMux) Commit() types.ResponseCommit {
lastRetainedVersion, err := mux.state.doCommit(mux.currentTime)
lastRetainedVersion, err := mux.state.doCommit()
if err != nil {
mux.logger.Error("Commit failed",
"err", err,
Expand Down Expand Up @@ -903,7 +905,7 @@ func (mux *abciMux) finishInitialization() error {

// Notify applications that state has been synced. This is used to make sure that things
// like pending upgrade descriptors are refreshed immediately.
ctx := mux.state.NewContext(api.ContextEndBlock, mux.currentTime)
ctx := mux.state.NewContext(api.ContextEndBlock)
defer ctx.Close()

if _, err := mux.md.Publish(ctx, api.MessageStateSyncCompleted, nil); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/abci/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (mux *abciMux) ApplySnapshotChunk(req types.RequestApplySnapshotChunk) type
mux.state.resetProposal()
defer mux.state.closeProposal()

ctx := mux.state.NewContext(api.ContextEndBlock, mux.currentTime)
ctx := mux.state.NewContext(api.ContextEndBlock)
defer ctx.Close()

if _, err = mux.md.Publish(ctx, api.MessageStateSyncCompleted, nil); err != nil {
Expand Down
30 changes: 18 additions & 12 deletions go/consensus/tendermint/abci/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,16 @@ type applicationState struct { // nolint: maligned
metricsClosedCh chan struct{}
}

func (s *applicationState) NewContext(mode api.ContextMode, now time.Time) *api.Context {
func (s *applicationState) NewContext(mode api.ContextMode) *api.Context {
s.blockLock.RLock()
defer s.blockLock.RUnlock()

var blockCtx *api.BlockContext
var state mkvs.OverlayTree
var (
blockCtx *api.BlockContext
state mkvs.OverlayTree
)
blockHeight := int64(s.stateRoot.Version)
now := s.blockTime
switch mode {
case api.ContextInitChain:
state = mkvs.NewOverlayWrapper(s.canonicalState)
Expand All @@ -185,11 +188,11 @@ func (s *applicationState) NewContext(mode api.ContextMode, now time.Time) *api.
case api.ContextDeliverTx, api.ContextBeginBlock, api.ContextEndBlock:
state = s.proposal.tree
blockCtx = s.blockCtx
now = blockCtx.Time
case api.ContextSimulateTx:
// Since simulation is running in parallel to any changes to the database, we make sure
// to create a separate in-memory tree at the given block height.
state = mkvs.NewOverlayWrapper(mkvs.NewWithRoot(nil, s.storage.NodeDB(), s.stateRoot, mkvs.WithoutWriteLog()))
now = s.blockTime
default:
panic(fmt.Errorf("context: invalid mode: %s (%d)", mode, mode))
}
Expand Down Expand Up @@ -342,7 +345,7 @@ func (s *applicationState) Upgrader() upgrade.Backend {
return s.upgrader
}

func (s *applicationState) doInitChain(now time.Time) error {
func (s *applicationState) doInitChain() error {
s.blockLock.Lock()
defer s.blockLock.Unlock()

Expand All @@ -356,7 +359,7 @@ func (s *applicationState) doInitChain(now time.Time) error {
s.stateRoot.Hash = stateRootHash
s.stateRoot.Version = s.initialHeight - 1

return s.doCommitOrInitChainLocked(now)
return s.doCommitOrInitChainLocked()
}

func (s *applicationState) doApplyStateSync(root storage.Root) error {
Expand All @@ -370,14 +373,14 @@ func (s *applicationState) doApplyStateSync(root storage.Root) error {
s.checkState.Close()
s.checkState = mkvs.NewWithRoot(nil, s.storage.NodeDB(), root, mkvs.WithoutWriteLog())

if err := s.doCommitOrInitChainLocked(time.Time{}); err != nil {
if err := s.doCommitOrInitChainLocked(); err != nil {
return err
}

return nil
}

func (s *applicationState) doCommit(now time.Time) (uint64, error) {
func (s *applicationState) doCommit() (uint64, error) {
s.blockLock.Lock()
defer s.blockLock.Unlock()

Expand All @@ -404,10 +407,12 @@ func (s *applicationState) doCommit(now time.Time) (uint64, error) {
s.stateRoot.Hash = stateRootHash
s.stateRoot.Version++

if err := s.doCommitOrInitChainLocked(now); err != nil {
if err := s.doCommitOrInitChainLocked(); err != nil {
return 0, err
}

// Reset block context.
s.blockCtx = nil
// Switch the check tree to the newly committed version. Note that this is safe as CometBFT
// holds the mempool lock while commit is in progress so no CheckTx can take place.
s.checkState.Close()
Expand All @@ -426,8 +431,8 @@ func (s *applicationState) doCommit(now time.Time) (uint64, error) {
}

// Guarded by s.blockLock.
func (s *applicationState) doCommitOrInitChainLocked(now time.Time) error {
s.blockTime = now
func (s *applicationState) doCommitOrInitChainLocked() error {
s.blockTime = s.blockCtx.Time

// Update cache of consensus parameters (the only places where consensus parameters can be
// changed are InitChain and EndBlock, so we can safely update the cache here).
Expand Down Expand Up @@ -685,6 +690,7 @@ func newApplicationState(ctx context.Context, upgrader upgrade.Backend, cfg *App
prunerNotifyCh: channels.NewRingChannel(1),
pruneInterval: cfg.Pruning.PruneInterval,
upgrader: upgrader,
blockCtx: api.NewBlockContext(api.BlockInfo{}),
haltEpoch: cfg.HaltEpoch,
haltHeight: cfg.HaltHeight,
minGasPrice: minGasPrice,
Expand All @@ -695,7 +701,7 @@ func newApplicationState(ctx context.Context, upgrader upgrade.Backend, cfg *App

// Refresh consensus parameters when loading state if we are past genesis.
if latestVersion >= s.initialHeight {
if err = s.doCommitOrInitChainLocked(time.Time{}); err != nil {
if err = s.doCommitOrInitChainLocked(); err != nil {
return nil, fmt.Errorf("state: failed to run initial state commit hook: %w", err)
}
}
Expand Down
6 changes: 1 addition & 5 deletions go/consensus/tendermint/abci/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"encoding/base64"
"fmt"
"math"
"sync/atomic"
"time"

"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
Expand Down Expand Up @@ -152,9 +150,7 @@ func (mux *abciMux) EstimateGas(caller signature.PublicKey, tx *transaction.Tran

// As opposed to other transaction dispatch entry points (CheckTx/DeliverTx), this method can
// be called in parallel to the consensus layer and to other invocations.
//
// For simulation mode, time will be filled in by NewContext from last block time.
ctx := mux.state.NewContext(api.ContextSimulateTx, time.Time{})
ctx := mux.state.NewContext(api.ContextSimulateTx)
defer ctx.Close()

// Modify transaction to include maximum possible gas in order to estimate the upper limit on
Expand Down
83 changes: 39 additions & 44 deletions go/consensus/tendermint/api/block.go
Original file line number Diff line number Diff line change
@@ -1,63 +1,58 @@
package api

import (
"time"

"github.com/cometbft/cometbft/abci/types"
)

// blockProposerKey is the block context key for storing the block proposer address.
type blockProposerKey struct{}

// NewDefault returns a new default value for the given key.
func (bpk blockProposerKey) NewDefault() interface{} {
// This should never be called as a block proposer must always be created by the application
// multiplexer.
panic("no proposer address in block context")
}

// SetBlockProposer sets the block proposer address.
func SetBlockProposer(ctx *Context, proposer []byte) {
ctx.BlockContext().Set(blockProposerKey{}, proposer)
}
// BlockInfo contains information about a block which is always present in block context.
type BlockInfo struct {
Time time.Time
ProposerAddress []byte
LastCommitInfo types.CommitInfo
ValidatorMisbehavior []types.Misbehavior

// GetBlockProposer returns the block proposer address.
func GetBlockProposer(ctx *Context) []byte {
return ctx.BlockContext().Get(blockProposerKey{}).([]byte)
GasAccountant GasAccountant
}

// lastCommitInfoKey is the block context key for storing the last commit info.
type lastCommitInfoKey struct{}

// NewDefault returns a new default value for the given key.
func (bpk lastCommitInfoKey) NewDefault() interface{} {
// This should never be called as it must always be created by the application multiplexer.
panic("no last commit info in block context")
// BlockContextKey is an interface for a block context key.
type BlockContextKey interface {
// NewDefault returns a new default value for the given key.
NewDefault() interface{}
}

// SetLastCommitInfo sets the last commit info.
func SetLastCommitInfo(ctx *Context, lastCommitInfo types.CommitInfo) {
ctx.BlockContext().Set(lastCommitInfoKey{}, lastCommitInfo)
}
// BlockContext can be used to store arbitrary key/value pairs for state that
// is needed while processing a block.
//
// When a block is committed, this context is automatically reset.
type BlockContext struct {
BlockInfo

// GetLastCommitInfo returns the last commit info.
func GetLastCommitInfo(ctx *Context) types.CommitInfo {
return ctx.BlockContext().Get(lastCommitInfoKey{}).(types.CommitInfo)
storage map[BlockContextKey]interface{}
}

// validatorMisbehaviorKey is the block context key for storing the validator misbehavior info.
type validatorMisbehaviorKey struct{}

// NewDefault returns a new default value for the given key.
func (bpk validatorMisbehaviorKey) NewDefault() interface{} {
// This should never be called as it must always be created by the application multiplexer.
panic("no validator misbehavior info in block context")
// Get returns the value stored under the given key (if any). If no value
// currently exists, the NewDefault method is called on the key to produce a
// default value and that value is stored.
func (bc *BlockContext) Get(key BlockContextKey) interface{} {
v, ok := bc.storage[key]
if !ok {
v = key.NewDefault()
bc.storage[key] = v
}
return v
}

// SetValidatorMisbehavior sets the validator misbehavior info.
func SetValidatorMisbehavior(ctx *Context, misbehavior []types.Misbehavior) {
ctx.BlockContext().Set(validatorMisbehaviorKey{}, misbehavior)
// Set overwrites the value stored under the given key.
func (bc *BlockContext) Set(key BlockContextKey, value interface{}) {
bc.storage[key] = value
}

// GetValidatorMisbehavior returns the validator misbehavior info.
func GetValidatorMisbehavior(ctx *Context) []types.Misbehavior {
return ctx.BlockContext().Get(validatorMisbehaviorKey{}).([]types.Misbehavior)
// NewBlockContext creates a new block context.
func NewBlockContext(blockInfo BlockInfo) *BlockContext {
return &BlockContext{
BlockInfo: blockInfo,
storage: make(map[BlockContextKey]interface{}),
}
}

0 comments on commit b386216

Please sign in to comment.