Skip to content

Commit

Permalink
Reduplicative ations with same hash shows in action list 1386 (#1390)
Browse files Browse the repository at this point in the history
  • Loading branch information
coderbradlee authored and zjshen14 committed Jul 22, 2019
1 parent 6717829 commit 51cf380
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 17 deletions.
49 changes: 34 additions & 15 deletions blockchain/indexbuilder.go
Expand Up @@ -44,10 +44,16 @@ type IndexBuilder struct {
cancelChan chan interface{}
timerFactory *prometheustimer.TimerFactory
dao *blockDAO
reindex bool
}

type actionDelta struct {
senderDelta map[hash.Hash160]uint64
recipientDelta map[hash.Hash160]uint64
}

// NewIndexBuilder instantiates an index builder
func NewIndexBuilder(chain Blockchain) (*IndexBuilder, error) {
func NewIndexBuilder(chain Blockchain, reindex bool) (*IndexBuilder, error) {
bc, ok := chain.(*blockchain)
if !ok {
log.S().Panic("unexpected blockchain implementation")
Expand All @@ -67,6 +73,7 @@ func NewIndexBuilder(chain Blockchain) (*IndexBuilder, error) {
cancelChan: make(chan interface{}),
timerFactory: timerFactory,
dao: bc.dao,
reindex: reindex,
}, nil
}

Expand Down Expand Up @@ -147,14 +154,16 @@ func (ib *IndexBuilder) getStartHeightAndIndex() (startHeight, startIndex uint64
}
return
}
func (ib *IndexBuilder) commitBatchAndClear(tipIndex, tipHeight uint64, batch db.KVStoreBatch) error {
func (ib *IndexBuilder) commitBatchAndClear(tipIndex, tipHeight uint64, batch db.KVStoreBatch, actDelta *actionDelta) error {
tipIndexBytes := byteutil.Uint64ToBytes(tipIndex)
batch.Put(blockActionBlockMappingNS, indexActionsTipIndexKey, tipIndexBytes, "failed to put tip index of actions")
tipHeightBytes := byteutil.Uint64ToBytes(tipHeight)
batch.Put(blockActionBlockMappingNS, indexActionsTipHeightKey, tipHeightBytes, "failed to put tip height")
if err := ib.store.Commit(batch); err != nil {
return err
}
actDelta.senderDelta = make(map[hash.Hash160]uint64)
actDelta.recipientDelta = make(map[hash.Hash160]uint64)
return nil
}
func (ib *IndexBuilder) initAndLoadActions() error {
Expand All @@ -166,12 +175,19 @@ func (ib *IndexBuilder) initAndLoadActions() error {
if err != nil {
return err
}
startHeight, startIndex, err := ib.getStartHeightAndIndex()
if err != nil {
return err
startHeight, startIndex := uint64(1), uint64(0)
if !ib.reindex {
startHeight, startIndex, err = ib.getStartHeightAndIndex()
if err != nil {
return err
}
}
zap.L().Info("Loading actions", zap.Uint64("startHeight", startHeight), zap.Uint64("startIndex", startIndex))
batch := db.NewBatch()
actDelta := &actionDelta{
senderDelta: make(map[hash.Hash160]uint64),
recipientDelta: make(map[hash.Hash160]uint64),
}
i := startHeight
for ; i <= tipHeight; i++ {
hash, err := ib.dao.getBlockHash(i)
Expand All @@ -185,7 +201,7 @@ func (ib *IndexBuilder) initAndLoadActions() error {
blk := &block.Block{
Body: *body,
}
err = indexBlockHash(startIndex, hash, ib.store, blk, batch)
err = indexBlockHash(startIndex, hash, ib.store, blk, batch, actDelta)
if err != nil {
return err
}
Expand All @@ -198,7 +214,7 @@ func (ib *IndexBuilder) initAndLoadActions() error {
startIndex += uint64(len(blk.Actions))
// commit once every 10000 heights
if i%10000 == 0 || i == tipHeight {
if err := ib.commitBatchAndClear(startIndex, i, batch); err != nil {
if err := ib.commitBatchAndClear(startIndex, i, batch, actDelta); err != nil {
return err
}
}
Expand Down Expand Up @@ -226,7 +242,7 @@ func getNextHeight(store db.KVStore) (uint64, error) {
return nextHeight, nil
}
func indexBlock(store db.KVStore, blk *block.Block, batch db.KVStoreBatch) error {
hash := blk.HashBlock()
hashBlock := blk.HashBlock()
// get index that already builded
startIndex, err := getNextIndex(store)
if err != nil {
Expand All @@ -237,7 +253,11 @@ func indexBlock(store db.KVStore, blk *block.Block, batch db.KVStoreBatch) error
if err != nil {
return err
}
if err = indexBlockHash(startIndex, hash, store, blk, batch); err != nil {
actDelta := &actionDelta{
senderDelta: make(map[hash.Hash160]uint64),
recipientDelta: make(map[hash.Hash160]uint64),
}
if err = indexBlockHash(startIndex, hashBlock, store, blk, batch, actDelta); err != nil {
return err
}
tipIndexBytes := byteutil.Uint64ToBytes(startIndex + uint64(len(blk.Actions)))
Expand All @@ -246,21 +266,20 @@ func indexBlock(store db.KVStore, blk *block.Block, batch db.KVStoreBatch) error
batch.Put(blockActionBlockMappingNS, indexActionsTipHeightKey, tipHeightBytes, "failed to put tip height")
return nil
}
func indexBlockHash(startActionsNum uint64, blkHash hash.Hash256, store db.KVStore, blk *block.Block, batch db.KVStoreBatch) error {
func indexBlockHash(startActionsNum uint64, blkHash hash.Hash256, store db.KVStore, blk *block.Block, batch db.KVStoreBatch, actDelta *actionDelta) error {
for i, elp := range blk.Actions {
actHash := elp.Hash()
batch.Put(blockActionBlockMappingNS, actHash[hashOffset:], blkHash[:], "failed to put action hash %x", actHash)
indexActionsBytes := byteutil.Uint64ToBytes(startActionsNum + uint64(i))
batch.Put(blockActionBlockMappingNS, indexActionsBytes, actHash[:], "failed to put index of actions %x", actHash)
}

return putActions(store, blk, batch)
return putActions(store, blk, batch, actDelta)
}

func putActions(store db.KVStore, blk *block.Block, batch db.KVStoreBatch) error {
senderDelta := make(map[hash.Hash160]uint64)
recipientDelta := make(map[hash.Hash160]uint64)

func putActions(store db.KVStore, blk *block.Block, batch db.KVStoreBatch, actDelta *actionDelta) error {
senderDelta := actDelta.senderDelta
recipientDelta := actDelta.recipientDelta
for _, selp := range blk.Actions {
actHash := selp.Hash()
callerAddrBytes := hash.BytesToHash160(selp.SrcPubkey().Hash())
Expand Down
2 changes: 1 addition & 1 deletion chainservice/chainservice.go
Expand Up @@ -140,7 +140,7 @@ func New(

var indexBuilder *blockchain.IndexBuilder
if _, ok := cfg.Plugins[config.GatewayPlugin]; ok && cfg.Chain.EnableAsyncIndexWrite {
if indexBuilder, err = blockchain.NewIndexBuilder(chain); err != nil {
if indexBuilder, err = blockchain.NewIndexBuilder(chain, cfg.Reindex); err != nil {
return nil, errors.Wrap(err, "failed to create index builder")
}
if err := chain.AddSubscriber(indexBuilder); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion config/config.go
Expand Up @@ -35,6 +35,7 @@ func init() {
flag.StringVar(&_secretPath, "secret-path", "", "Secret path")
flag.StringVar(&_subChainPath, "sub-config-path", "", "Sub chain Config path")
flag.Var(&_plugins, "plugin", "Plugin of the node")
flag.BoolVar(&_reindex, "reindex", false, "reindex actions")
}

var (
Expand All @@ -44,6 +45,7 @@ var (
_secretPath string
_subChainPath string
_plugins strs
_reindex bool
)

const (
Expand Down Expand Up @@ -166,6 +168,7 @@ var (
},
},
Genesis: genesis.Default,
Reindex: false,
}

// ErrInvalidCfg indicates the invalid config value
Expand Down Expand Up @@ -351,6 +354,7 @@ type (
Log log.GlobalConfig `yaml:"log"`
SubLogs map[string]log.GlobalConfig `yaml:"subLogs"`
Genesis genesis.Genesis `yaml:"genesis"`
Reindex bool `yaml:"reindex"`
}

// Validate is the interface of validating the config
Expand Down Expand Up @@ -379,7 +383,7 @@ func New(validates ...Validate) (Config, error) {
if err := yaml.Get(uconfig.Root).Populate(&cfg); err != nil {
return Config{}, errors.Wrap(err, "failed to unmarshal YAML config to struct")
}

cfg.Reindex = _reindex
// set network master key to private key
if cfg.Network.MasterKey == "" {
cfg.Network.MasterKey = cfg.Chain.ProducerPrivKey
Expand Down

0 comments on commit 51cf380

Please sign in to comment.