Skip to content

Commit

Permalink
Move consensus status into db (#1457)
Browse files Browse the repository at this point in the history
1. added endorsementpb and consensusDB implementation 

2. changed AddVoteFunc / Cleanup func in endorsementmanager

2. moved endorsement manager initialization to rolldpos context

3. added unit tests to convert eManagerpb / refactor
  • Loading branch information
koseoyoung committed Aug 28, 2019
1 parent c2d2f09 commit a7e79fe
Show file tree
Hide file tree
Showing 14 changed files with 610 additions and 93 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Expand Up @@ -129,6 +129,7 @@ var (
},
ToleratedOvertime: 2 * time.Second,
Delay: 5 * time.Second,
ConsensusDBPath: "./consensus.db",
},
},
BlockSync: BlockSync{
Expand Down Expand Up @@ -251,6 +252,7 @@ type (
FSM consensusfsm.Config `yaml:"fsm"`
ToleratedOvertime time.Duration `yaml:"toleratedOvertime"`
Delay time.Duration `yaml:"delay"`
ConsensusDBPath string `yaml:"consensusDBPath"`
}

// Dispatcher is the dispatcher config
Expand Down
196 changes: 184 additions & 12 deletions consensus/scheme/rolldpos/endorsementmanager.go
Expand Up @@ -10,18 +10,30 @@ import (
"encoding/hex"
"time"

"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/consensus/scheme/rolldpos/endorsementpb"
"github.com/iotexproject/iotex-core/db"
"github.com/iotexproject/iotex-core/endorsement"
"github.com/iotexproject/iotex-core/pkg/log"
)

const (
eManagerNS = "edm"
)

var (
// ErrExpiredEndorsement indicates that the endorsement is expired
ErrExpiredEndorsement = errors.New("the endorsement has been replaced or expired")
statusKey = []byte("status")
)

//EndorsedByMajorityFunc defines a function to give an information of consensus status
type EndorsedByMajorityFunc func(blockHash []byte, topics []ConsensusVoteTopic) bool

type endorserEndorsementCollection struct {
endorsements map[ConsensusVoteTopic]*endorsement.Endorsement
}
Expand All @@ -32,6 +44,32 @@ func newEndorserEndorsementCollection() *endorserEndorsementCollection {
}
}

func (ee *endorserEndorsementCollection) fromProto(endorserPro *endorsementpb.EndorserEndorsementCollection) error {
ee.endorsements = make(map[ConsensusVoteTopic]*endorsement.Endorsement)
for index := range endorserPro.Topics {
endorse := &endorsement.Endorsement{}
if err := endorse.LoadProto(endorserPro.Endorsements[index]); err != nil {
return err
}
ee.endorsements[ConsensusVoteTopic(endorserPro.Topics[index])] = endorse
}
return nil
}

func (ee *endorserEndorsementCollection) toProto(endorser string) (*endorsementpb.EndorserEndorsementCollection, error) {
eeProto := &endorsementpb.EndorserEndorsementCollection{}
eeProto.Endorser = endorser
for topic, endorse := range ee.endorsements {
eeProto.Topics = append(eeProto.Topics, uint32(topic))
ioEndorsement, err := endorse.Proto()
if err != nil {
return nil, err
}
eeProto.Endorsements = append(eeProto.Endorsements, ioEndorsement)
}
return eeProto, nil
}

func (ee *endorserEndorsementCollection) AddEndorsement(
topic ConsensusVoteTopic,
en *endorsement.Endorsement,
Expand Down Expand Up @@ -82,6 +120,43 @@ func newBlockEndorsementCollection(blk *block.Block) *blockEndorsementCollection
}
}

func (bc *blockEndorsementCollection) fromProto(blockPro *endorsementpb.BlockEndorsementCollection) error {
bc.endorsers = make(map[string]*endorserEndorsementCollection)
if blockPro.Blk == nil {
bc.blk = nil
} else {
blk := &block.Block{}
if err := blk.ConvertFromBlockPb(blockPro.Blk); err != nil {
return err
}
bc.blk = blk
}
for _, endorsement := range blockPro.BlockMap {
ee := &endorserEndorsementCollection{}
if err := ee.fromProto(endorsement); err != nil {
return err
}
bc.endorsers[endorsement.Endorser] = ee
}
return nil
}

func (bc *blockEndorsementCollection) toProto() (*endorsementpb.BlockEndorsementCollection, error) {
bcProto := &endorsementpb.BlockEndorsementCollection{}
if bc.blk != nil {
bcProto.Blk = bc.blk.ConvertToBlockPb()
}

for s, endorse := range bc.endorsers {
ioEndorsement, err := endorse.toProto(s)
if err != nil {
return nil, err
}
bcProto.BlockMap = append(bcProto.BlockMap, ioEndorsement)
}
return bcProto, nil
}

func (bc *blockEndorsementCollection) SetBlock(blk *block.Block) error {
bc.blk = blk
return nil
Expand Down Expand Up @@ -143,22 +218,97 @@ func (bc *blockEndorsementCollection) Endorsements(
}

type endorsementManager struct {
collections map[string]*blockEndorsementCollection
isMajorityFunc EndorsedByMajorityFunc
eManagerDB db.KVStore
collections map[string]*blockEndorsementCollection
}

func newEndorsementManager() *endorsementManager {
return &endorsementManager{
collections: map[string]*blockEndorsementCollection{},
func newEndorsementManager(eManagerDB db.KVStore) (*endorsementManager, error) {
if eManagerDB == nil {
return &endorsementManager{
eManagerDB: nil,
collections: map[string]*blockEndorsementCollection{},
}, nil
}
bytes, err := eManagerDB.Get(eManagerNS, statusKey)
switch errors.Cause(err) {
case nil:
// Get from DB
manager := &endorsementManager{eManagerDB: eManagerDB}
managerProto := &endorsementpb.EndorsementManager{}
if err = proto.Unmarshal(bytes, managerProto); err != nil {
return nil, err
}
if err = manager.fromProto(managerProto); err != nil {
return nil, err
}
manager.eManagerDB = eManagerDB
return manager, nil
case db.ErrNotExist:
// If DB doesn't have any information
log.L().Info("First initializing DB")
return &endorsementManager{
eManagerDB: eManagerDB,
collections: map[string]*blockEndorsementCollection{},
}, nil
default:
return nil, err
}
}

func (m *endorsementManager) PutEndorsementManagerToDB() error {
managerProto, err := m.toProto()
if err != nil {
return err
}
valBytes, err := proto.Marshal(managerProto)
if err != nil {
return err
}
err = m.eManagerDB.Put(eManagerNS, statusKey, valBytes)
if err != nil {
return err
}
return nil
}

func (m *endorsementManager) SetIsMarjorityFunc(isMajorityFunc EndorsedByMajorityFunc) {
m.isMajorityFunc = isMajorityFunc
return
}

func (m *endorsementManager) fromProto(managerPro *endorsementpb.EndorsementManager) error {
m.collections = make(map[string]*blockEndorsementCollection)
for i, block := range managerPro.BlockEndorsements {
bc := &blockEndorsementCollection{}
if err := bc.fromProto(block); err != nil {
return err
}
m.collections[managerPro.BlkHash[i]] = bc
}
return nil
}

func (m *endorsementManager) toProto() (*endorsementpb.EndorsementManager, error) {
mc := &endorsementpb.EndorsementManager{}
for encodedBlockHash, block := range m.collections {
ioBlockEndorsement, err := block.toProto()
if err != nil {
return nil, err
}
mc.BlkHash = append(mc.BlkHash, encodedBlockHash)
mc.BlockEndorsements = append(mc.BlockEndorsements, ioBlockEndorsement)
}
return mc, nil
}

func (m *endorsementManager) CollectionByBlockHash(blkHash []byte) *blockEndorsementCollection {
encodedBlockHash := encodeToString(blkHash)
c, exists := m.collections[encodedBlockHash]
collections, exists := m.collections[encodedBlockHash]
if !exists {
return nil
}
return c
return collections
}

func (m *endorsementManager) Size() int {
Expand All @@ -183,13 +333,20 @@ func (m *endorsementManager) RegisterBlock(blk *block.Block) error {
}
m.collections[encodedBlockHash] = newBlockEndorsementCollection(blk)

if m.eManagerDB != nil {
return m.PutEndorsementManagerToDB()
}
return nil
}

func (m *endorsementManager) AddVoteEndorsement(
vote *ConsensusVote,
en *endorsement.Endorsement,
) error {
var beforeVote, afterVote bool
if m.isMajorityFunc != nil {
beforeVote = m.isMajorityFunc(vote.BlockHash(), []ConsensusVoteTopic{vote.Topic()})
}
encoded := encodeToString(vote.BlockHash())
c, exists := m.collections[encoded]
if !exists {
Expand All @@ -200,16 +357,31 @@ func (m *endorsementManager) AddVoteEndorsement(
}
m.collections[encoded] = c

if m.eManagerDB != nil && m.isMajorityFunc != nil {
afterVote = m.isMajorityFunc(vote.BlockHash(), []ConsensusVoteTopic{vote.Topic()})
if !beforeVote && afterVote {
//put into DB only it changes the status of consensus
return m.PutEndorsementManagerToDB()
}
}
return nil
}

func (m *endorsementManager) Cleanup(timestamp time.Time) *endorsementManager {
cleaned := newEndorsementManager()
for encoded, c := range m.collections {
cleaned.collections[encoded] = c.Cleanup(timestamp)
func (m *endorsementManager) Cleanup(timestamp time.Time) error {
cleaned := &endorsementManager{
eManagerDB: m.eManagerDB,
collections: map[string]*blockEndorsementCollection{},
}

return cleaned
if !timestamp.IsZero() {
for encoded, c := range m.collections {
cleaned.collections[encoded] = c.Cleanup(timestamp)
}
}
m = cleaned
if m.eManagerDB != nil {
return m.PutEndorsementManagerToDB()
}
return nil
}

func (m *endorsementManager) Log(
Expand Down
56 changes: 46 additions & 10 deletions consensus/scheme/rolldpos/endorsementmanager_test.go
Expand Up @@ -87,20 +87,22 @@ func TestBlockEndorsementCollection(t *testing.T) {
require.Equal(1, len(ends))
require.Equal(end, ends[0])

cleaned := ec.Cleanup(time.Now().Add(time.Second * 10 * -1))
require.Equal(1, len(cleaned.endorsers))
require.Equal(1, len(cleaned.endorsers[b.PublicKey().HexString()].endorsements))
require.Equal(end, cleaned.endorsers[b.PublicKey().HexString()].Endorsement(PROPOSAL))
ec = ec.Cleanup(time.Now().Add(time.Second * 10 * -1))
require.Equal(1, len(ec.endorsers))
require.Equal(1, len(ec.endorsers[b.PublicKey().HexString()].endorsements))
require.Equal(end, ec.endorsers[b.PublicKey().HexString()].Endorsement(PROPOSAL))
}

func TestEndorsementManager(t *testing.T) {
require := require.New(t)
em := newEndorsementManager()
em, err := newEndorsementManager(nil)
require.Nil(err)
require.NotNil(em)
require.Equal(0, em.Size())
require.Equal(0, em.SizeWithBlock())

b := getBlock(t)

require.NoError(em.RegisterBlock(&b))

require.Panics(func() {
Expand All @@ -123,12 +125,46 @@ func TestEndorsementManager(t *testing.T) {
l := em.Log(log.L(), nil)
require.NotNil(l)
l.Info("test output")
cleaned := em.Cleanup(time.Now().Add(time.Second * 10 * -1))
require.NotNil(cleaned)
require.Equal(1, len(cleaned.collections))
err = em.Cleanup(time.Now().Add(time.Second * 10 * -1))
require.Nil(err)
require.NotNil(em)
require.Equal(1, len(em.collections))
encoded := encodeToString(cv.BlockHash())
require.Equal(1, len(cleaned.collections[encoded].endorsers))
require.Equal(1, len(em.collections[encoded].endorsers))

collection := cleaned.collections[encoded].endorsers[end.Endorser().HexString()]
collection := em.collections[encoded].endorsers[end.Endorser().HexString()]
require.Equal(end, collection.endorsements[PROPOSAL])
}

func TestEndorsementManagerProto(t *testing.T) {
require := require.New(t)
em, err := newEndorsementManager(nil)
require.Nil(err)
require.NotNil(em)

b := getBlock(t)

require.NoError(em.RegisterBlock(&b))
blkHash := b.HashBlock()
cv := NewConsensusVote(blkHash[:], PROPOSAL)
require.NotNil(cv)
end := endorsement.NewEndorsement(time.Now(), b.PublicKey(), []byte("123"))
require.NoError(em.AddVoteEndorsement(cv, end))

//test converting endorsement pb
endProto, err := end.Proto()
require.Nil(err)
end2 := &endorsement.Endorsement{}
require.NoError(end2.LoadProto(endProto))
require.Equal(end, end2)

//test converting emanager pb
emProto, err := em.toProto()
require.Nil(err)
em2, err := newEndorsementManager(nil)
require.NoError(em2.fromProto(emProto))

require.Equal(len(em.collections), len(em2.collections))
encoded := encodeToString(cv.BlockHash())
require.Equal(em.collections[encoded].endorsers, em2.collections[encoded].endorsers)
}

0 comments on commit a7e79fe

Please sign in to comment.