Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support Index service recovery and indexHeight in Status api #3847

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 23 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ type Node struct {
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer
indexerService *txindex.IndexerService
indexHub *sm.IndexHub
prometheusSrv *http.Server
}

Expand Down Expand Up @@ -254,6 +255,18 @@ func createAndStartIndexerService(config *cfg.Config, dbProvider DBProvider,
return indexerService, txIndexer, nil
}

func createAndStartIndexHub(initialHeight int64, stateDB dbm.DB, blockStore sm.BlockStore, eventBus types.BlockEventPublisher, logger log.Logger, metrics *sm.Metrics, indexSvcs ...sm.IndexService) (*sm.IndexHub, error) {
indexHub := sm.NewIndexHub(initialHeight, stateDB, blockStore, eventBus, sm.IndexHubWithMetrics(metrics))
for _, svc := range indexSvcs {
indexHub.RegisterIndexSvc(svc)
}
indexHub.SetLogger(logger.With("module", "indexer_hub"))
if err := indexHub.Start(); err != nil {
return nil, err
}
return indexHub, nil
}

func doHandshake(stateDB dbm.DB, state sm.State, blockStore sm.BlockStore,
genDoc *types.GenesisDoc, eventBus *types.EventBus, proxyApp proxy.AppConns, consensusLogger log.Logger) error {

Expand Down Expand Up @@ -567,6 +580,13 @@ func NewNode(config *cfg.Config,
return nil, err
}

csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)

indexHub, err := createAndStartIndexHub(state.LastBlockHeight, stateDB, blockStore, eventBus, logger, smMetrics, indexerService)
if err != nil {
return nil, err
}

// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
Expand Down Expand Up @@ -595,8 +615,6 @@ func NewNode(config *cfg.Config,
// We don't fast-sync when the only validator is us.
fastSync := config.FastSyncMode && !onlyValidatorIsUs(state, privValidator)

csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)

// Make MempoolReactor
mempoolReactor, mempool := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger)

Expand Down Expand Up @@ -699,6 +717,7 @@ func NewNode(config *cfg.Config,
proxyApp: proxyApp,
txIndexer: txIndexer,
indexerService: indexerService,
indexHub: indexHub,
eventBus: eventBus,
}
node.BaseService = *cmn.NewBaseService(logger, "Node", node)
Expand Down Expand Up @@ -776,6 +795,7 @@ func (n *Node) OnStop() {
// first stop the non-reactor services
n.eventBus.Stop()
n.indexerService.Stop()
n.indexHub.Stop()

// now stop the reactors
n.sw.Stop()
Expand Down Expand Up @@ -827,6 +847,7 @@ func (n *Node) ConfigureRPC() {
rpccore.SetAddrBook(n.addrBook)
rpccore.SetProxyAppQuery(n.proxyApp.Query())
rpccore.SetTxIndexer(n.txIndexer)
rpccore.SetIndexHub(n.indexHub)
rpccore.SetConsensusReactor(n.consensusReactor)
rpccore.SetEventBus(n.eventBus)
rpccore.SetLogger(n.Logger.With("module", "rpc"))
Expand Down
5 changes: 5 additions & 0 deletions rpc/core/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var (
consensusReactor *consensus.ConsensusReactor
eventBus *types.EventBus // thread safe
mempool mempl.Mempool
indexerHub *sm.IndexHub

logger log.Logger

Expand Down Expand Up @@ -141,6 +142,10 @@ func SetEventBus(b *types.EventBus) {
eventBus = b
}

func SetIndexHub(hub *sm.IndexHub) {
indexerHub = hub
}

// SetConfig sets an RPCConfig.
func SetConfig(c cfg.RPCConfig) {
config = c
Expand Down
1 change: 1 addition & 0 deletions rpc/core/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
LatestBlockHeight: latestHeight,
LatestBlockTime: latestBlockTime,
CatchingUp: consensusReactor.FastSync(),
IndexHeight: indexerHub.GetHeight(),
},
ValidatorInfo: ctypes.ValidatorInfo{
Address: pubKey.Address(),
Expand Down
1 change: 1 addition & 0 deletions rpc/core/types/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type SyncInfo struct {
LatestBlockHeight int64 `json:"latest_block_height"`
LatestBlockTime time.Time `json:"latest_block_time"`
CatchingUp bool `json:"catching_up"`
IndexHeight int64 `json:"index_height"`
}

// Info about the node's validator
Expand Down
177 changes: 177 additions & 0 deletions state/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package state

import (
"sync"

cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tm-cmn/db"
)

const (
// Actually the max lag is 2, use 10 for tolerance.
MaxIndexLag = 10
)

var indexHeight = []byte("indexHeight")

type IndexService interface {
SetOnIndex(callback func(int64))
}

type IndexHub struct {
cmn.BaseService
mtx sync.Mutex

stateHeight int64
expectHeight int64

// the total registered index service
numIdxSvc int
indexTaskCounter map[int64]int
indexTaskEvents chan int64

stateDB dbm.DB
blockStore BlockStore
eventBus types.BlockEventPublisher

metrics *Metrics
}

func NewIndexHub(initialHeight int64, stateDB dbm.DB, blockStore BlockStore, eventBus types.BlockEventPublisher, options ...IndexHubOption) *IndexHub {
ih := &IndexHub{
stateHeight: initialHeight,
indexTaskCounter: make(map[int64]int),
indexTaskEvents: make(chan int64, MaxIndexLag),
stateDB: stateDB,
blockStore: blockStore,
eventBus: eventBus,
metrics: NopMetrics(),
}
indexedHeight := ih.GetIndexedHeight()
if indexedHeight < 0 {
// no indexedHeight found, will do no recover
ih.expectHeight = ih.stateHeight + 1
} else {
ih.expectHeight = indexedHeight + 1
}
for _, option := range options {
option(ih)
}
ih.BaseService = *cmn.NewBaseService(nil, "indexHub", ih)
return ih
}

type IndexHubOption func(*IndexHub)

func IndexHubWithMetrics(metrics *Metrics) IndexHubOption {
return func(ih *IndexHub) {
ih.metrics = metrics
}
}

func (ih *IndexHub) OnStart() error {
// start listen routine before recovering.
go ih.indexRoutine()
ih.recoverIndex()
return nil
}

func (ih *IndexHub) recoverIndex() {
for h := ih.expectHeight; h <= ih.stateHeight; h++ {
ih.Logger.Info("try to recover index", "height", h)
block := ih.blockStore.LoadBlock(h)
if block == nil {
ih.Logger.Error("index skip since the the block is missing", "height", h)
} else {
abciResponses, err := LoadABCIResponses(ih.stateDB, h)
Copy link
Contributor

@melekes melekes Aug 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already do that as a part of the replay process?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part is before replay. replay process will only replay the saved block but still do not play the state, but index replay will refire the event of played block but unindexed block

if err != nil {
ih.Logger.Error("failed to load ABCIResponse, will use default")
abciResponses = NewABCIResponses(block)
}
abciValUpdates := abciResponses.EndBlock.ValidatorUpdates
validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ineffectual assignment to err (from ineffassign)

if err != nil {
ih.Logger.Error("failed to load validatorUpdates, will use nil by default")
}
fireEvents(ih.Logger, ih.eventBus, block, abciResponses, validatorUpdates)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might shoot us in the leg one day. I think this function is only expected to be called from state/execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any suggestions?

}
}
}

func (ih *IndexHub) indexRoutine() {
for {
select {
case <-ih.Quit():
return
case h := <-ih.indexTaskEvents:
ih.Logger.Info("finish index", "height", h)
ih.SetIndexedHeight(h)
ih.metrics.IndexHeight.Set(float64(h))
}
}
}

func (ih *IndexHub) RegisterIndexSvc(idx IndexService) {
ih.mtx.Lock()
defer ih.mtx.Unlock()
if ih.IsRunning() {
panic("can't RegisterIndexSvc when IndexHub is running")
}
idx.SetOnIndex(ih.CountDownAt)
ih.numIdxSvc++
}

// `CountDownAt` is a callback in index service, keep it simple and fast.
func (ih *IndexHub) CountDownAt(height int64) {
ih.mtx.Lock()
defer ih.mtx.Unlock()
count, exist := ih.indexTaskCounter[height]
if exist {
count = count - 1
} else {
count = ih.numIdxSvc - 1
}
// The higher block won't finish index before lower one.
if count == 0 && height == ih.expectHeight {
if exist {
delete(ih.indexTaskCounter, height)
}
ih.expectHeight = ih.expectHeight + 1
ih.indexTaskEvents <- height
} else {
ih.indexTaskCounter[height] = count
}
}

// set and get won't happen in the same time, won't lock
func (ih *IndexHub) SetIndexedHeight(h int64) {
rawHeight, err := cdc.MarshalBinaryBare(h)
if err != nil {
panic(err)
}
ih.stateDB.Set(indexHeight, rawHeight)
}

// if never store `indexHeight` in index db, will return -1.
func (ih *IndexHub) GetIndexedHeight() int64 {
rawHeight := ih.stateDB.Get(indexHeight)
if rawHeight == nil {
return -1
} else {
var height int64
err := cdc.UnmarshalBinaryBare(rawHeight, &height)
if err != nil {
// should not happen
panic(err)
}
return height
}
}

// get indexed height from memory to save time for RPC
func (ih *IndexHub) GetHeight() int64 {
ih.mtx.Lock()
defer ih.mtx.Unlock()
return ih.expectHeight - 1
}
84 changes: 84 additions & 0 deletions state/index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package state

import (
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/kv"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tm-cmn/db"
)

func TestSetHeight(t *testing.T) {

indexDb := db.NewMemDB()
indexHub := NewIndexHub(0, indexDb, nil, nil)
indexHub.SetLogger(log.TestingLogger())

realHeightAtFirst := indexHub.GetIndexedHeight()
assert.Equal(t, int64(-1), realHeightAtFirst)
height := int64(1024)
indexHub.SetIndexedHeight(height)
realHeight := indexHub.GetIndexedHeight()
assert.Equal(t, height, realHeight)
}

func TestCountDown(t *testing.T) {
// event bus
eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger())
err := eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()

indexDb := db.NewMemDB()

// start tx index
txIndexer := kv.NewTxIndex(indexDb, kv.IndexAllTags())
txIndexSvc := txindex.NewIndexerService(txIndexer, eventBus)
txIndexSvc.SetLogger(log.TestingLogger())
err = txIndexSvc.Start()
require.NoError(t, err)
defer txIndexSvc.Stop()

// start index hub
indexHub := NewIndexHub(0, indexDb, nil, eventBus)
indexHub.SetLogger(log.TestingLogger())
indexHub.RegisterIndexSvc(txIndexSvc)
err = indexHub.Start()
assert.NoError(t, err)

// publish block with txs
for h := int64(1); h < 10; h++ {
numTxs := rand.Int63n(5)
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: types.Header{Height: h, NumTxs: numTxs},
})
for i := int64(0); i < numTxs; i++ {
txResult := &types.TxResult{
Height: h,
Index: uint32(i),
Tx: types.Tx("foo"),
Result: abci.ResponseDeliverTx{Code: 0},
}
eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult})
}
// In test case, 100ms is far enough for index
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(h), indexHub.GetIndexedHeight())
assert.Equal(t, int64(h), indexHub.GetHeight())
// test no memory leak
assert.Equal(t, len(indexHub.indexTaskCounter), 0)
}
}

func TestCountDownRegisterIndexSvc(t *testing.T) {

}