Skip to content

Commit

Permalink
Make try sync next block execute as many blocks as possible (#1378)
Browse files Browse the repository at this point in the history
<!--
Please read and fill out this form before submitting your PR.

Please make sure you have reviewed our contributors guide before
submitting your
first PR.
-->

## Overview

Closes: #1355 

<!-- 
Please provide an explanation of the PR, including the appropriate
context,
background, goal, and rationale. If there is an issue with this
information,
please provide a tl;dr and link the issue. 
-->

## Checklist

<!-- 
Please complete the checklist to ensure that the PR is ready to be
reviewed.

IMPORTANT:
PRs should be left in Draft until the below checklist is completed.
-->

- [x] New and updated code has appropriate documentation
- [x] New and updated code has new and/or updated testing
- [x] Required CI checks are passing
- [ ] Visual proof for any user facing features like CLI or
documentation updates
- [x] Linked issues closed with keywords


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Implemented a new search functionality with a dedicated search bar for
improved user experience.

- **Bug Fixes**
- Fixed an issue where search results were not properly displayed under
certain conditions.

- **Style**
- Updated the search bar design to align with the latest brand
guidelines.

- **Documentation**
- Added user-facing documentation to guide through the new search
features and enhancements.

- **Refactor**
- Optimized search result fetching to improve performance and reduce
latency.
- Refined the search algorithm to deliver more relevant results based on
user queries.

- **Tests**
- Implemented comprehensive tests for the new search functionality to
ensure reliability and accuracy.

- **Chores**
- Performed routine maintenance and codebase cleanup to ensure smooth
operation of the search feature.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
Manav-Aggarwal committed Dec 8, 2023
1 parent 2b0afcb commit 21e37e9
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 54 deletions.
74 changes: 43 additions & 31 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ const blockInChLength = 10000
// initialBackoff defines initial value for block submission backoff
var initialBackoff = 100 * time.Millisecond

type newBlockEvent struct {
block *types.Block
daHeight uint64
// NewBlockEvent is used to pass block and DA height to blockInCh
type NewBlockEvent struct {
Block *types.Block
DAHeight uint64
}

// Manager is responsible for aggregating transactions into blocks.
Expand All @@ -73,7 +74,7 @@ type Manager struct {
HeaderCh chan *types.SignedHeader
BlockCh chan *types.Block

blockInCh chan newBlockEvent
blockInCh chan NewBlockEvent
blockStore *goheaderstore.Store[*types.Block]

blockCache *BlockCache
Expand Down Expand Up @@ -179,7 +180,7 @@ func NewManager(
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
HeaderCh: make(chan *types.SignedHeader, channelLength),
BlockCh: make(chan *types.Block, channelLength),
blockInCh: make(chan newBlockEvent, blockInChLength),
blockInCh: make(chan NewBlockEvent, blockInChLength),
blockStoreCh: make(chan struct{}, 1),
blockStore: blockStore,
lastStateMtx: new(sync.RWMutex),
Expand Down Expand Up @@ -208,11 +209,28 @@ func (m *Manager) SetDALC(dalc *da.DAClient) {
m.dalc = dalc
}

// SetLastState is used to set lastState used by Manager.
func (m *Manager) SetLastState(state types.State) {
m.lastStateMtx.Lock()
defer m.lastStateMtx.Unlock()
m.lastState = state
}

// GetStoreHeight returns the manager's store height
func (m *Manager) GetStoreHeight() uint64 {
return m.store.Height()
}

// GetBlockInCh returns the manager's blockInCh
func (m *Manager) GetBlockInCh() chan NewBlockEvent {
return m.blockInCh
}

// IsBlockHashSeen returns true if the block with the given hash has been seen.
func (m *Manager) IsBlockHashSeen(blockHash string) bool {
return m.blockCache.isSeen(blockHash)
}

// IsDAIncluded returns true if the block with the given hash has been seen on DA.
func (m *Manager) IsDAIncluded(hash types.Hash) bool {
return m.blockCache.isDAIncluded(hash.String())
Expand Down Expand Up @@ -316,8 +334,8 @@ func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) {
m.sendNonBlockingSignalToBlockStoreCh()
case blockEvent := <-m.blockInCh:
// Only validated blocks are sent to blockInCh, so we can safely assume that blockEvent.block is valid
block := blockEvent.block
daHeight := blockEvent.daHeight
block := blockEvent.Block
daHeight := blockEvent.DAHeight
blockHash := block.Hash().String()
blockHeight := uint64(block.Height())
m.logger.Debug("block body retrieved",
Expand Down Expand Up @@ -360,37 +378,32 @@ func (m *Manager) sendNonBlockingSignalToRetrieveCh() {
}
}

// trySyncNextBlock tries to progress one step (one block) in sync process.
// trySyncNextBlock tries to execute as many blocks as possible from the blockCache.
//
// To be able to apply block and height h, we need to have its Commit. It is contained in block at height h+1.
// If block at height h+1 is not available, value of last gossiped commit is checked.
// If commit for block h is available, we proceed with sync process, and remove synced block from sync cache.
// Note: the blockCache contains only valid blocks that are not yet synced
//
// For every block, to be able to apply block at height h, we need to have its Commit. It is contained in block at height h+1.
// If commit for block h+1 is available, we proceed with sync process, and remove synced block from sync cache.
func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
var commit *types.Commit
currentHeight := m.store.Height() // TODO(tzdybal): maybe store a copy in memory

b, ok := m.blockCache.getBlock(currentHeight + 1)
if !ok {
return nil
}

signedHeader := &b.SignedHeader
if signedHeader != nil {
commit = &b.SignedHeader.Commit
}
for {
currentHeight := m.store.Height()
b, ok := m.blockCache.getBlock(currentHeight + 1)
if !ok {
m.logger.Debug("block not found in cache", "height", currentHeight+1)
return nil
}

if b != nil && commit != nil {
bHeight := uint64(b.Height())
m.logger.Info("Syncing block", "height", bHeight)
// Validate the received block before applying
if err := m.executor.Validate(m.lastState, b); err != nil {
return fmt.Errorf("failed to validate block: %w", err)
}
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, b)
newState, responses, err := m.applyBlock(ctx, b)
if err != nil {
return fmt.Errorf("failed to ApplyBlock: %w", err)
}
err = m.store.SaveBlock(b, commit)
err = m.store.SaveBlock(b, &b.SignedHeader.Commit)
if err != nil {
return fmt.Errorf("failed to save block: %w", err)
}
Expand All @@ -404,6 +417,7 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
return fmt.Errorf("failed to save block responses: %w", err)
}

// Height gets updated
m.store.SetHeight(bHeight)

if daHeight > newState.DAHeight {
Expand All @@ -415,8 +429,6 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
}
m.blockCache.deleteBlock(currentHeight + 1)
}

return nil
}

// BlockStoreRetrieveLoop is responsible for retrieving blocks from the Block Store.
Expand Down Expand Up @@ -449,7 +461,7 @@ func (m *Manager) BlockStoreRetrieveLoop(ctx context.Context) {
default:
}
m.logger.Debug("block retrieved from p2p block sync", "blockHeight", block.Height(), "daHeight", daHeight)
m.blockInCh <- newBlockEvent{block, daHeight}
m.blockInCh <- NewBlockEvent{block, daHeight}
}
}
lastBlockStoreHeight = blockStoreHeight
Expand Down Expand Up @@ -490,7 +502,7 @@ func (m *Manager) RetrieveLoop(ctx context.Context) {
}
daHeight := atomic.LoadUint64(&m.daHeight)
err := m.processNextDABlock(ctx)
if err != nil {
if err != nil && ctx.Err() == nil {
m.logger.Error("failed to retrieve block from DALC", "daHeight", daHeight, "errors", err.Error())
continue
}
Expand Down Expand Up @@ -542,7 +554,7 @@ func (m *Manager) processNextDABlock(ctx context.Context) error {
return errors.WithMessage(ctx.Err(), "unable to send block to blockInCh, context done")
default:
}
m.blockInCh <- newBlockEvent{block, daHeight}
m.blockInCh <- NewBlockEvent{block, daHeight}
}
}
return nil
Expand Down
55 changes: 55 additions & 0 deletions node/full_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

abci "github.com/cometbft/cometbft/abci/types"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"

testutils "github.com/celestiaorg/utils/test"

"github.com/rollkit/rollkit/block"
"github.com/rollkit/rollkit/mempool"
"github.com/rollkit/rollkit/test/mocks"
"github.com/rollkit/rollkit/types"
)

// simply check that node is starting and stopping without panicking
Expand All @@ -39,13 +42,65 @@ func TestMempoolDirectly(t *testing.T) {
verifyMempoolSize(node, assert)
}

// Tests that the node is able to sync multiple blocks even if blocks arrive out of order
func TestTrySyncNextBlockMultiple(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := setupTestNode(ctx, t, "full")
fullNode, ok := node.(*FullNode)
require.True(t, ok)
store := fullNode.Store
height := store.Height()
b1, signingKey := types.GetRandomBlockWithKey(height+1, 0)
b2 := types.GetRandomNextBlock(b1, signingKey, []byte{1, 2, 3, 4}, 0)
b2.SignedHeader.AppHash = []byte{1, 2, 3, 4}

// Update state with hashes generated from block
state, err := store.GetState()
require.NoError(t, err)
state.AppHash = b1.SignedHeader.AppHash
state.LastResultsHash = b1.SignedHeader.LastResultsHash

manager := fullNode.blockManager
manager.SetLastState(state)

require.NoError(t, err)

// Add second block to blockInCh
// This should not trigger a sync since b1 hasn't been seen yet
blockInCh := manager.GetBlockInCh()
blockInCh <- block.NewBlockEvent{
Block: b2,
DAHeight: state.DAHeight,
}

err = node.Start()
require.NoError(t, err)
defer cleanUpNode(node, t)

require.NoError(t, waitUntilBlockHashSeen(node, b2.Hash().String()))

newHeight := store.Height()
require.Equal(t, height, newHeight)

// Adding first block to blockInCh should sync both blocks
blockInCh <- block.NewBlockEvent{
Block: b1,
DAHeight: state.DAHeight,
}

require.NoError(t, waitForAtLeastNBlocks(node, 2, Store))
}

// setupMockApplication initializes a mock application
func setupMockApplication() *mocks.Application {
app := &mocks.Application{}
app.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
app.On("CheckTx", mock.Anything, mock.Anything).Return(&abci.ResponseCheckTx{}, nil)
app.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse).Maybe()
app.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
app.On("FinalizeBlock", mock.Anything, mock.Anything).Return(&abci.ResponseFinalizeBlock{AppHash: []byte{1, 2, 3, 4}}, nil)
app.On("Commit", mock.Anything, mock.Anything).Return(&abci.ResponseCommit{}, nil)
return app
}

Expand Down
16 changes: 16 additions & 0 deletions node/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ func getNodeHeight(node Node, source Source) (uint64, error) {
}
}

func isBlockHashSeen(node Node, blockHash string) bool {
if fn, ok := node.(*FullNode); ok {
return fn.blockManager.IsBlockHashSeen(blockHash)
}
return false
}

func getNodeHeightFromHeader(node Node) (uint64, error) {
if fn, ok := node.(*FullNode); ok {
return fn.hSyncService.HeaderStore().Height(), nil
Expand Down Expand Up @@ -126,3 +133,12 @@ func waitForAtLeastNBlocks(node Node, n int, source Source) error {
return fmt.Errorf("expected height > %v, got %v", n, nHeight)
})
}

func waitUntilBlockHashSeen(node Node, blockHash string) error {
return testutils.Retry(300, 100*time.Millisecond, func() error {
if isBlockHashSeen(node, blockHash) {
return nil
}
return fmt.Errorf("block hash %v not seen", blockHash)
})
}
2 changes: 1 addition & 1 deletion state/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (e *BlockExecutor) updateState(state types.State, block *types.Block, final
},
ConsensusParams: state.ConsensusParams,
LastHeightConsensusParamsChanged: state.LastHeightConsensusParamsChanged,
AppHash: make(types.Hash, 32),
AppHash: finalizeBlockResponse.AppHash,
}
copy(s.LastResultsHash[:], cmtypes.NewResults(finalizeBlockResponse.TxResults).Hash())

Expand Down

0 comments on commit 21e37e9

Please sign in to comment.