From cb50c3b90a837c5de251485ce4fa03c08738bd28 Mon Sep 17 00:00:00 2001 From: JunXi Xie Date: Mon, 25 Mar 2019 17:28:15 +0800 Subject: [PATCH] optimize consensus code,modify return type when no error msg --- consensus/vbft/block_pool.go | 19 ++--- consensus/vbft/event_timer.go | 92 +++++++++----------- consensus/vbft/event_timer_test.go | 18 ++-- consensus/vbft/msg_builder.go | 26 +++--- consensus/vbft/msg_pool.go | 27 ++---- consensus/vbft/node_sync.go | 31 ++----- consensus/vbft/node_utils.go | 3 +- consensus/vbft/peer_pool.go | 19 ++--- consensus/vbft/peer_pool_test.go | 18 +--- consensus/vbft/service.go | 133 +++++++++-------------------- consensus/vbft/state_mgmt.go | 29 ++----- consensus/vbft/state_mgmt_test.go | 10 +-- 12 files changed, 141 insertions(+), 284 deletions(-) diff --git a/consensus/vbft/block_pool.go b/consensus/vbft/block_pool.go index a1b7a02321..ed0a429c21 100644 --- a/consensus/vbft/block_pool.go +++ b/consensus/vbft/block_pool.go @@ -249,40 +249,39 @@ func (pool *BlockPool) setProposalEndorsed(proposal *blockProposalMsg, forEmpty return nil } -func (pool *BlockPool) addBlockEndorsementLocked(blkNum uint32, endorser uint32, eSig *CandidateEndorseSigInfo) error { +func (pool *BlockPool) addBlockEndorsementLocked(blkNum uint32, endorser uint32, eSig *CandidateEndorseSigInfo) { candidate := pool.getCandidateInfoLocked(blkNum) if eSigs, present := candidate.EndorseSigs[endorser]; present { for _, eSig := range eSigs { if eSig.ForEmpty { // has endorsed for empty, ignore new endorsement - return nil + return } } if eSig.ForEmpty { // add empty endorsement candidate.EndorseSigs[endorser] = append(eSigs, eSig) - return nil + return } // check dup endorsement for _, esig := range eSigs { if esig.EndorsedProposer == eSig.EndorsedProposer { - return nil + return } } candidate.EndorseSigs[endorser] = append(eSigs, eSig) } else { candidate.EndorseSigs[endorser] = []*CandidateEndorseSigInfo{eSig} } - - return nil + return } // // add endorsement msg to CandidateInfo // -func (pool *BlockPool) newBlockEndorsement(msg *blockEndorseMsg) error { +func (pool *BlockPool) newBlockEndorsement(msg *blockEndorseMsg) { pool.lock.Lock() defer pool.lock.Unlock() @@ -291,7 +290,7 @@ func (pool *BlockPool) newBlockEndorsement(msg *blockEndorseMsg) error { Signature: msg.EndorserSig, ForEmpty: msg.EndorseForEmpty, } - return pool.addBlockEndorsementLocked(msg.GetBlockNum(), msg.Endorser, eSig) + pool.addBlockEndorsementLocked(msg.GetBlockNum(), msg.Endorser, eSig) } // @@ -462,9 +461,7 @@ func (pool *BlockPool) newBlockCommitment(msg *blockCommitMsg) error { Signature: sig, ForEmpty: msg.CommitForEmpty, } - if err := pool.addBlockEndorsementLocked(blkNum, endorser, eSig); err != nil { - return fmt.Errorf("failed to verify endorse sig from %d: %s", endorser, err) - } + pool.addBlockEndorsementLocked(blkNum, endorser, eSig) } // add committer sig diff --git a/consensus/vbft/event_timer.go b/consensus/vbft/event_timer.go index 54b67343e3..314af6e89a 100644 --- a/consensus/vbft/event_timer.go +++ b/consensus/vbft/event_timer.go @@ -119,7 +119,7 @@ func (self *EventTimer) stop() { self.normalTimers = make(map[uint32]*time.Timer) } -func (self *EventTimer) StartTimer(Idx uint32, timeout time.Duration) error { +func (self *EventTimer) StartTimer(Idx uint32, timeout time.Duration) { self.lock.Lock() defer self.lock.Unlock() @@ -139,11 +139,9 @@ func (self *EventTimer) StartTimer(Idx uint32, timeout time.Duration) error { blockNum: Idx, } }) - - return nil } -func (self *EventTimer) CancelTimer(idx uint32) error { +func (self *EventTimer) CancelTimer(idx uint32) { self.lock.Lock() defer self.lock.Unlock() @@ -151,7 +149,6 @@ func (self *EventTimer) CancelTimer(idx uint32) error { t.Stop() delete(self.normalTimers, idx) } - return nil } func (self *EventTimer) getEventTimeout(evtType TimerEventType) time.Duration { @@ -190,7 +187,7 @@ func (self *EventTimer) getEventTimeout(evtType TimerEventType) time.Duration { // // internal helper, should call with lock held // -func (self *EventTimer) startEventTimer(evtType TimerEventType, blockNum uint32) error { +func (self *EventTimer) startEventTimer(evtType TimerEventType, blockNum uint32) { timers := self.eventTimers[evtType] if t, present := timers[blockNum]; present { t.Stop() @@ -208,124 +205,120 @@ func (self *EventTimer) startEventTimer(evtType TimerEventType, blockNum uint32) blockNum: blockNum, } }) - - return nil } // // internal helper, should call with lock held // -func (self *EventTimer) cancelEventTimer(evtType TimerEventType, blockNum uint32) error { +func (self *EventTimer) cancelEventTimer(evtType TimerEventType, blockNum uint32) { timers := self.eventTimers[evtType] if t, present := timers[blockNum]; present { t.Stop() delete(timers, blockNum) } - - return nil } -func (self *EventTimer) StartProposalTimer(blockNum uint32) error { +func (self *EventTimer) StartProposalTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() log.Infof("server %d started proposal timer for blk %d", self.server.Index, blockNum) - return self.startEventTimer(EventProposeBlockTimeout, blockNum) + self.startEventTimer(EventProposeBlockTimeout, blockNum) } -func (self *EventTimer) CancelProposalTimer(blockNum uint32) error { +func (self *EventTimer) CancelProposalTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.cancelEventTimer(EventProposeBlockTimeout, blockNum) + self.cancelEventTimer(EventProposeBlockTimeout, blockNum) } -func (self *EventTimer) StartEndorsingTimer(blockNum uint32) error { +func (self *EventTimer) StartEndorsingTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() log.Infof("server %d started endorsing timer for blk %d", self.server.Index, blockNum) - return self.startEventTimer(EventEndorseBlockTimeout, blockNum) + self.startEventTimer(EventEndorseBlockTimeout, blockNum) } -func (self *EventTimer) CancelEndorseMsgTimer(blockNum uint32) error { +func (self *EventTimer) CancelEndorseMsgTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.cancelEventTimer(EventEndorseBlockTimeout, blockNum) + self.cancelEventTimer(EventEndorseBlockTimeout, blockNum) } -func (self *EventTimer) StartEndorseEmptyBlockTimer(blockNum uint32) error { +func (self *EventTimer) StartEndorseEmptyBlockTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() log.Infof("server %d started empty endorsing timer for blk %d", self.server.Index, blockNum) - return self.startEventTimer(EventEndorseEmptyBlockTimeout, blockNum) + self.startEventTimer(EventEndorseEmptyBlockTimeout, blockNum) } -func (self *EventTimer) CancelEndorseEmptyBlockTimer(blockNum uint32) error { +func (self *EventTimer) CancelEndorseEmptyBlockTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.cancelEventTimer(EventEndorseEmptyBlockTimeout, blockNum) + self.cancelEventTimer(EventEndorseEmptyBlockTimeout, blockNum) } -func (self *EventTimer) StartCommitTimer(blockNum uint32) error { +func (self *EventTimer) StartCommitTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() log.Infof("server %d started commit timer for blk %d", self.server.Index, blockNum) - return self.startEventTimer(EventCommitBlockTimeout, blockNum) + self.startEventTimer(EventCommitBlockTimeout, blockNum) } -func (self *EventTimer) CancelCommitMsgTimer(blockNum uint32) error { +func (self *EventTimer) CancelCommitMsgTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.cancelEventTimer(EventCommitBlockTimeout, blockNum) + self.cancelEventTimer(EventCommitBlockTimeout, blockNum) } -func (self *EventTimer) StartProposalBackoffTimer(blockNum uint32) error { +func (self *EventTimer) StartProposalBackoffTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.startEventTimer(EventProposalBackoff, blockNum) + self.startEventTimer(EventProposalBackoff, blockNum) } -func (self *EventTimer) CancelProposalBackoffTimer(blockNum uint32) error { +func (self *EventTimer) CancelProposalBackoffTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.cancelEventTimer(EventProposalBackoff, blockNum) + self.cancelEventTimer(EventProposalBackoff, blockNum) } -func (self *EventTimer) StartBackoffTimer(blockNum uint32) error { +func (self *EventTimer) StartBackoffTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.startEventTimer(EventRandomBackoff, blockNum) + self.startEventTimer(EventRandomBackoff, blockNum) } -func (self *EventTimer) CancelBackoffTimer(blockNum uint32) error { +func (self *EventTimer) CancelBackoffTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.cancelEventTimer(EventRandomBackoff, blockNum) + self.cancelEventTimer(EventRandomBackoff, blockNum) } -func (self *EventTimer) Start2ndProposalTimer(blockNum uint32) error { +func (self *EventTimer) Start2ndProposalTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.startEventTimer(EventPropose2ndBlockTimeout, blockNum) + self.startEventTimer(EventPropose2ndBlockTimeout, blockNum) } -func (self *EventTimer) Cancel2ndProposalTimer(blockNum uint32) error { +func (self *EventTimer) Cancel2ndProposalTimer(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.cancelEventTimer(EventPropose2ndBlockTimeout, blockNum) + self.cancelEventTimer(EventPropose2ndBlockTimeout, blockNum) } func (self *EventTimer) onBlockSealed(blockNum uint32) { @@ -334,25 +327,22 @@ func (self *EventTimer) onBlockSealed(blockNum uint32) { // clear event timers for i := 0; i < int(EventMax); i++ { - if err := self.cancelEventTimer(TimerEventType(i), blockNum); err != nil { - log.Errorf("server %d, failed to stop timer %d on sealing", - self.server.Index, i) - } + self.cancelEventTimer(TimerEventType(i), blockNum) } } -func (self *EventTimer) StartTxBlockTimeout(blockNum uint32) error { +func (self *EventTimer) StartTxBlockTimeout(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.startEventTimer(EventTxBlockTimeout, blockNum) + self.startEventTimer(EventTxBlockTimeout, blockNum) } -func (self *EventTimer) CancelTxBlockTimeout(blockNum uint32) error { +func (self *EventTimer) CancelTxBlockTimeout(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.cancelEventTimer(EventTxBlockTimeout, blockNum) + self.cancelEventTimer(EventTxBlockTimeout, blockNum) } func (self *EventTimer) startPeerTicker(peerIdx uint32) error { @@ -387,18 +377,18 @@ func (self *EventTimer) stopPeerTicker(peerIdx uint32) error { return nil } -func (self *EventTimer) startTxTicker(blockNum uint32) error { +func (self *EventTimer) startTxTicker(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.startEventTimer(EventTxPool, blockNum) + self.startEventTimer(EventTxPool, blockNum) } -func (self *EventTimer) stopTxTicker(blockNum uint32) error { +func (self *EventTimer) stopTxTicker(blockNum uint32) { self.lock.Lock() defer self.lock.Unlock() - return self.cancelEventTimer(EventTxPool, blockNum) + self.cancelEventTimer(EventTxPool, blockNum) } /////////////////////////////////////////////////////////// diff --git a/consensus/vbft/event_timer_test.go b/consensus/vbft/event_timer_test.go index e6b868889c..cdd1502ed7 100644 --- a/consensus/vbft/event_timer_test.go +++ b/consensus/vbft/event_timer_test.go @@ -27,28 +27,22 @@ func constructEventTimer() *EventTimer { func TestStartTimer(t *testing.T) { eventtimer := constructEventTimer() - err := eventtimer.StartTimer(1, 10) - t.Logf("TestStartTimer: %v", err) + eventtimer.StartTimer(1, 10) } func TestCancelTimer(t *testing.T) { eventtimer := constructEventTimer() - err := eventtimer.StartTimer(1, 10) - t.Logf("TestStartTimer: %v", err) - err = eventtimer.CancelTimer(1) - t.Logf("TestCancelTimer: %v", err) + eventtimer.StartTimer(1, 10) + eventtimer.CancelTimer(1) } func TestStartEventTimer(t *testing.T) { eventtimer := constructEventTimer() - err := eventtimer.startEventTimer(EventProposeBlockTimeout, 1) - t.Logf("TestStartEventTimer: %v", err) + eventtimer.startEventTimer(EventProposeBlockTimeout, 1) } func TestCancelEventTimer(t *testing.T) { eventtimer := constructEventTimer() - err := eventtimer.startEventTimer(EventProposeBlockTimeout, 1) - t.Logf("startEventTimer: %v", err) - err = eventtimer.cancelEventTimer(EventProposeBlockTimeout, 1) - t.Logf("cancelEventTimer: %v", err) + eventtimer.startEventTimer(EventProposeBlockTimeout, 1) + eventtimer.cancelEventTimer(EventProposeBlockTimeout, 1) } diff --git a/consensus/vbft/msg_builder.go b/consensus/vbft/msg_builder.go index cb0ccfb8a2..abecfc6125 100644 --- a/consensus/vbft/msg_builder.go +++ b/consensus/vbft/msg_builder.go @@ -357,41 +357,35 @@ func (self *Server) constructCommitMsg(proposal *blockProposalMsg, endorses []*b return msg, nil } -func (self *Server) constructBlockFetchMsg(blkNum uint32) (*blockFetchMsg, error) { - msg := &blockFetchMsg{ +func (self *Server) constructBlockFetchMsg(blkNum uint32) *blockFetchMsg { + return &blockFetchMsg{ BlockNum: blkNum, } - return msg, nil } -func (self *Server) constructBlockFetchRespMsg(blkNum uint32, blk *Block, blkHash common.Uint256) (*BlockFetchRespMsg, error) { - msg := &BlockFetchRespMsg{ +func (self *Server) constructBlockFetchRespMsg(blkNum uint32, blk *Block, blkHash common.Uint256) *BlockFetchRespMsg { + return &BlockFetchRespMsg{ BlockNumber: blkNum, BlockHash: blkHash, BlockData: blk, } - return msg, nil } -func (self *Server) constructBlockInfoFetchMsg(startBlkNum uint32) (*BlockInfoFetchMsg, error) { - - msg := &BlockInfoFetchMsg{ +func (self *Server) constructBlockInfoFetchMsg(startBlkNum uint32) *BlockInfoFetchMsg { + return &BlockInfoFetchMsg{ StartBlockNum: startBlkNum, } - return msg, nil } -func (self *Server) constructBlockInfoFetchRespMsg(blockInfos []*BlockInfo_) (*BlockInfoFetchRespMsg, error) { - msg := &BlockInfoFetchRespMsg{ +func (self *Server) constructBlockInfoFetchRespMsg(blockInfos []*BlockInfo_) *BlockInfoFetchRespMsg { + return &BlockInfoFetchRespMsg{ Blocks: blockInfos, } - return msg, nil } -func (self *Server) constructProposalFetchMsg(blkNum uint32, proposer uint32) (*proposalFetchMsg, error) { - msg := &proposalFetchMsg{ +func (self *Server) constructProposalFetchMsg(blkNum uint32, proposer uint32) *proposalFetchMsg { + return &proposalFetchMsg{ ProposerID: proposer, BlockNum: blkNum, } - return msg, nil } diff --git a/consensus/vbft/msg_pool.go b/consensus/vbft/msg_pool.go index b22bd9851a..f62860b242 100644 --- a/consensus/vbft/msg_pool.go +++ b/consensus/vbft/msg_pool.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/ontio/ontology/common" - "github.com/ontio/ontology/common/log" ) var errDropFarFutureMsg = errors.New("msg pool dropped msg for far future") @@ -51,15 +50,14 @@ func newConsensusRound(num uint32) *ConsensusRound { return r } -func (self *ConsensusRound) addMsg(msg ConsensusMsg, msgHash common.Uint256) error { +func (self *ConsensusRound) addMsg(msg ConsensusMsg, msgHash common.Uint256) { if _, present := self.msgHashs[msgHash]; present { - return nil + return } msgs := self.msgs[msg.Type()] self.msgs[msg.Type()] = append(msgs, msg) self.msgHashs[msgHash] = msg - return nil } func (self *ConsensusRound) dropMsg(msg ConsensusMsg) { @@ -79,11 +77,11 @@ func (self *ConsensusRound) dropMsg(msg ConsensusMsg) { } } -func (self *ConsensusRound) hasMsg(msg ConsensusMsg, msgHash common.Uint256) (bool, error) { +func (self *ConsensusRound) hasMsg(msg ConsensusMsg, msgHash common.Uint256) bool { if _, present := self.msgHashs[msgHash]; present { - return present, nil + return present } - return false, nil + return false } type MsgPool struct { @@ -125,7 +123,8 @@ func (pool *MsgPool) AddMsg(msg ConsensusMsg, msgHash common.Uint256) error { // TODO: limit #history rounds to historyLen // Note: we accept msg for future rounds - return pool.rounds[blkNum].addMsg(msg, msgHash) + pool.rounds[blkNum].addMsg(msg, msgHash) + return nil } func (pool *MsgPool) DropMsg(msg ConsensusMsg) { @@ -144,22 +143,12 @@ func (pool *MsgPool) HasMsg(msg ConsensusMsg, msgHash common.Uint256) bool { if roundMsgs, present := pool.rounds[msg.GetBlockNum()]; !present { return false } else { - if present, err := roundMsgs.hasMsg(msg, msgHash); err != nil { - log.Errorf("msgpool failed to check msg avail: %s", err) - return false - } else { - return present - } + return roundMsgs.hasMsg(msg, msgHash) } return false } -func (pool *MsgPool) Persist() error { - // TODO - return nil -} - func (pool *MsgPool) GetProposalMsgs(blocknum uint32) []ConsensusMsg { pool.lock.RLock() defer pool.lock.RUnlock() diff --git a/consensus/vbft/node_sync.go b/consensus/vbft/node_sync.go index d9deb68e81..4a0fb893b3 100644 --- a/consensus/vbft/node_sync.go +++ b/consensus/vbft/node_sync.go @@ -146,9 +146,7 @@ func (self *Syncer) run() { if req.startBlockNum > req.targetBlockNum { continue } - if err := self.onNewBlockSyncReq(req); err != nil { - log.Errorf("server %d failed to handle new block sync req: %s", self.server.Index, err) - } + self.onNewBlockSyncReq(req) case syncMsg := <-self.syncMsgC: if p, present := self.peers[syncMsg.fromPeer]; present { @@ -290,7 +288,7 @@ func (self *Syncer) isActive() bool { return self.nextReqBlkNum <= self.targetBlkNum } -func (self *Syncer) startPeerSyncer(syncer *PeerSyncer, targetBlkNum uint32) error { +func (self *Syncer) startPeerSyncer(syncer *PeerSyncer, targetBlkNum uint32) { syncer.lock.Lock() defer syncer.lock.Unlock() @@ -304,30 +302,26 @@ func (self *Syncer) startPeerSyncer(syncer *PeerSyncer, targetBlkNum uint32) err syncer.run() }() } - - return nil } -func (self *Syncer) cancelFetcherForPeer(peer *PeerSyncer) error { +func (self *Syncer) cancelFetcherForPeer(peer *PeerSyncer) { if peer == nil { - return nil + return } peer.lock.Lock() defer peer.lock.Unlock() // TODO - - return nil } -func (self *Syncer) onNewBlockSyncReq(req *BlockSyncReq) error { +func (self *Syncer) onNewBlockSyncReq(req *BlockSyncReq) { if req.startBlockNum < self.nextReqBlkNum { log.Errorf("server %d new blockSyncReq startblkNum %d vs %d", self.server.Index, req.startBlockNum, self.nextReqBlkNum) } if req.targetBlockNum <= self.targetBlkNum { - return nil + return } if self.nextReqBlkNum == 1 { self.nextReqBlkNum = req.startBlockNum @@ -360,8 +354,6 @@ func (self *Syncer) onNewBlockSyncReq(req *BlockSyncReq) error { p := self.peers[peerIdx] self.startPeerSyncer(p, self.targetBlkNum) } - - return nil } ///////////////////////////////////////////////////////////////////// @@ -450,10 +442,7 @@ func (self *PeerSyncer) stop(force bool) bool { } func (self *PeerSyncer) requestBlock(blkNum uint32) (*Block, error) { - msg, err := self.server.constructBlockFetchMsg(blkNum) - if err != nil { - return nil, err - } + msg := self.server.constructBlockFetchMsg(blkNum) self.server.msgSendC <- &SendMsgEvent{ ToPeer: self.peerIdx, Msg: msg, @@ -484,11 +473,7 @@ func (self *PeerSyncer) requestBlock(blkNum uint32) (*Block, error) { } func (self *PeerSyncer) requestBlockInfo(startBlkNum uint32) ([]*BlockInfo_, error) { - msg, err := self.server.constructBlockInfoFetchMsg(startBlkNum) - if err != nil { - return nil, err - } - + msg := self.server.constructBlockInfoFetchMsg(startBlkNum) self.server.msgSendC <- &SendMsgEvent{ ToPeer: self.peerIdx, Msg: msg, diff --git a/consensus/vbft/node_utils.go b/consensus/vbft/node_utils.go index ea71dfb95e..d5724f0ca4 100644 --- a/consensus/vbft/node_utils.go +++ b/consensus/vbft/node_utils.go @@ -433,12 +433,11 @@ func (self *Server) sendToPeer(peerIdx uint32, data []byte) error { return nil } -func (self *Server) broadcast(msg ConsensusMsg) error { +func (self *Server) broadcast(msg ConsensusMsg) { self.msgSendC <- &SendMsgEvent{ ToPeer: math.MaxUint32, Msg: msg, } - return nil } func (self *Server) broadcastToAll(data []byte) error { diff --git a/consensus/vbft/peer_pool.go b/consensus/vbft/peer_pool.go index 1f20e2bbfc..1d0bc4ba7c 100644 --- a/consensus/vbft/peer_pool.go +++ b/consensus/vbft/peer_pool.go @@ -115,10 +115,10 @@ func (pool *PeerPool) getActivePeerCount() int { return n } -func (pool *PeerPool) waitPeerConnected(peerIdx uint32) error { +func (pool *PeerPool) waitPeerConnected(peerIdx uint32) { if !pool.isNewPeer(peerIdx) { // peer already connected - return nil + return } var C chan struct{} @@ -132,10 +132,9 @@ func (pool *PeerPool) waitPeerConnected(peerIdx uint32) error { pool.lock.Unlock() <-C - return nil } -func (pool *PeerPool) peerConnected(peerIdx uint32) error { +func (pool *PeerPool) peerConnected(peerIdx uint32) { pool.lock.Lock() defer pool.lock.Unlock() @@ -150,10 +149,9 @@ func (pool *PeerPool) peerConnected(peerIdx uint32) error { delete(pool.peerConnectionWaitings, peerIdx) close(C) } - return nil } -func (pool *PeerPool) peerDisconnected(peerIdx uint32) error { +func (pool *PeerPool) peerDisconnected(peerIdx uint32) { pool.lock.Lock() defer pool.lock.Unlock() @@ -168,10 +166,9 @@ func (pool *PeerPool) peerDisconnected(peerIdx uint32) error { LastUpdateTime: lastUpdateTime, connected: false, } - return nil } -func (pool *PeerPool) peerHandshake(peerIdx uint32, msg *peerHandshakeMsg) error { +func (pool *PeerPool) peerHandshake(peerIdx uint32, msg *peerHandshakeMsg) { pool.lock.Lock() defer pool.lock.Unlock() @@ -183,11 +180,9 @@ func (pool *PeerPool) peerHandshake(peerIdx uint32, msg *peerHandshakeMsg) error LastUpdateTime: time.Now(), connected: true, } - - return nil } -func (pool *PeerPool) peerHeartbeat(peerIdx uint32, msg *peerHeartbeatMsg) error { +func (pool *PeerPool) peerHeartbeat(peerIdx uint32, msg *peerHeartbeatMsg) { pool.lock.Lock() defer pool.lock.Unlock() @@ -205,8 +200,6 @@ func (pool *PeerPool) peerHeartbeat(peerIdx uint32, msg *peerHeartbeatMsg) error LastUpdateTime: time.Now(), connected: true, } - - return nil } func (pool *PeerPool) getNeighbours() []*Peer { diff --git a/consensus/vbft/peer_pool_test.go b/consensus/vbft/peer_pool_test.go index 292a14f2da..713672fe08 100644 --- a/consensus/vbft/peer_pool_test.go +++ b/consensus/vbft/peer_pool_test.go @@ -65,18 +65,6 @@ func TestGetActivePeerCount(t *testing.T) { t.Logf("TestGetActivePeerCount count:%v", count) } -func TestPeerConnected(t *testing.T) { - peerpool := constructPeerPool(false) - err := peerpool.peerConnected(uint32(1)) - t.Logf("TestPeerConnected :%v", err) -} - -func TestPeerDisconnected(t *testing.T) { - peerpool := constructPeerPool(true) - err := peerpool.peerDisconnected(uint32(1)) - t.Logf("TestPeerDisconnected :%v", err) -} - func TestPeerHandshake(t *testing.T) { nodeId := "120202c924ed1a67fd1719020ce599d723d09d48362376836e04b0be72dfe825e24d81" peerconfig := &vconfig.PeerConfig{ @@ -90,8 +78,7 @@ func TestPeerHandshake(t *testing.T) { CommittedBlockHash: common.Uint256{}, CommittedBlockLeader: uint32(1), } - err := peerpool.peerHandshake(uint32(1), handshakemsg) - t.Logf("TestPeerHandshake :%v", err) + peerpool.peerHandshake(uint32(1), handshakemsg) } func TestPeerHeartbeat(t *testing.T) { @@ -108,8 +95,7 @@ func TestPeerHeartbeat(t *testing.T) { CommittedBlockLeader: uint32(1), ChainConfigView: uint32(1), } - err := peerpool.peerHeartbeat(uint32(1), heartbeatmsg) - t.Logf("TestPeerHeartbeat: %v", err) + peerpool.peerHeartbeat(uint32(1), heartbeatmsg) } func TestGetNeighbours(t *testing.T) { diff --git a/consensus/vbft/service.go b/consensus/vbft/service.go index 9e09eeaa40..af63d9f7e4 100644 --- a/consensus/vbft/service.go +++ b/consensus/vbft/service.go @@ -521,7 +521,7 @@ func (self *Server) start() error { return nil } -func (self *Server) stop() error { +func (self *Server) stop() { self.incrValidator.Clean() self.sub.Unsubscribe(message.TOPIC_SAVE_BLOCK_COMPLETE) @@ -536,8 +536,6 @@ func (self *Server) stop() error { self.blockPool.clean() self.chainStore.close() self.peerPool.clean() - - return nil } // @@ -554,9 +552,7 @@ func (self *Server) run(peerPubKey keypair.PublicKey) error { self.heartbeat() // wait remote msgs - if err := self.peerPool.waitPeerConnected(peerIdx); err != nil { - return err - } + self.peerPool.waitPeerConnected(peerIdx) defer func() { // TODO: handle peer disconnection here @@ -633,17 +629,15 @@ func (self *Server) updateParticipantConfig() error { if block.Info.NewChainConfig != nil { chainconfig = block.Info.NewChainConfig } - var err error self.metaLock.Lock() - if cfg, err := self.buildParticipantConfig(blkNum, block, chainconfig); err == nil { + cfg, err := self.buildParticipantConfig(blkNum, block, chainconfig) + if err == nil { self.currentParticipantConfig = cfg } self.metaLock.Unlock() - if err != nil { return fmt.Errorf("failed to build participant config (%d): %s", blkNum, err) } - // TODO: if server is not in new config, self.stop() return nil } @@ -683,10 +677,7 @@ func (self *Server) startNewRound() error { if msg == nil { continue } - if err := self.blockPool.newBlockEndorsement(msg); err != nil { - log.Infof("starting new round, failed to add endorse, blk %d, endorse for %d: %s", - blkNum, msg.EndorsedProposer, err) - } + self.blockPool.newBlockEndorsement(msg) } } @@ -723,7 +714,7 @@ func (self *Server) startNewRound() error { return nil } -func (self *Server) startNewProposal(blkNum uint32) error { +func (self *Server) startNewProposal(blkNum uint32) { // make proposal if self.isProposer(blkNum, self.Index) { log.Infof("server %d, proposer for block %d", self.Index, blkNum) @@ -740,7 +731,7 @@ func (self *Server) startNewProposal(blkNum uint32) error { // TODO: if new round block proposal has received, go endorsing/committing directly - return self.timer.StartProposalTimer(blkNum) + self.timer.StartProposalTimer(blkNum) } // verify consensus messsage, then send msg to processMsgEvent @@ -926,9 +917,7 @@ func (self *Server) onConsensusMsg(peerIdx uint32, msg ConsensusMsg, msgHash com log.Errorf("invalid msg with heartbeat msg type") return } - if err := self.processHeartbeatMsg(peerIdx, pMsg); err != nil { - log.Errorf("server %d, failed to process heartbeat %d: %s", self.Index, peerIdx, err) - } + self.processHeartbeatMsg(peerIdx, pMsg) if pMsg.CommittedBlockNumber+MAX_SYNCING_CHECK_BLK_NUM < self.GetCommittedBlockNo() { // delayed peer detected, response heartbeat with our chain Info self.timer.C <- &TimerEvent{ @@ -983,17 +972,12 @@ func (self *Server) onConsensusMsg(peerIdx uint32, msg ConsensusMsg, msgHash com return } blk, blkHash := self.blockPool.getSealedBlock(pMsg.BlockNum) - msg, err := self.constructBlockFetchRespMsg(pMsg.BlockNum, blk, blkHash) - if err != nil { - log.Errorf("server %d, failed to handle blockfetch %d from %d: %s", - self.Index, pMsg.BlockNum, peerIdx, err) - } else { - log.Infof("server %d, handle blockfetch %d from %d", - self.Index, pMsg.BlockNum, peerIdx) - self.msgSendC <- &SendMsgEvent{ - ToPeer: peerIdx, - Msg: msg, - } + msg := self.constructBlockFetchRespMsg(pMsg.BlockNum, blk, blkHash) + log.Infof("server %d, handle blockfetch %d from %d", + self.Index, pMsg.BlockNum, peerIdx) + self.msgSendC <- &SendMsgEvent{ + ToPeer: peerIdx, + Msg: msg, } case BlockFetchRespMessage: @@ -1025,17 +1009,12 @@ func (self *Server) onConsensusMsg(peerIdx uint32, msg ConsensusMsg, msgHash com break } } - msg, err := self.constructBlockInfoFetchRespMsg(blkInfos) - if err != nil { - log.Errorf("server %d, failed to handle blockinfo fetch %d to %d: %s", - self.Index, pMsg.StartBlockNum, peerIdx, err) - } else { - log.Infof("server %d, response blockinfo fetch to %d, blk %d, len %d", - self.Index, peerIdx, pMsg.StartBlockNum, len(blkInfos)) - self.msgSendC <- &SendMsgEvent{ - ToPeer: peerIdx, - Msg: msg, - } + msg := self.constructBlockInfoFetchRespMsg(blkInfos) + log.Infof("server %d, response blockinfo fetch to %d, blk %d, len %d", + self.Index, peerIdx, pMsg.StartBlockNum, len(blkInfos)) + self.msgSendC <- &SendMsgEvent{ + ToPeer: peerIdx, + Msg: msg, } case BlockInfoFetchRespMessage: @@ -1188,10 +1167,7 @@ func (self *Server) processMsgEvent() error { } // stop proposal timer - if err := self.timer.CancelProposalTimer(msgBlkNum); err != nil { - log.Errorf("failed to cancel proposal timer, blockNum %d, err: %s", msgBlkNum, err) - } - + self.timer.CancelProposalTimer(msgBlkNum) if self.isEndorser(msgBlkNum, self.Index) { if err := self.endorseBlock(pMsg, false); err != nil { log.Errorf("failed to endorse block proposal (%d): %s", msgBlkNum, err) @@ -1230,10 +1206,7 @@ func (self *Server) processMsgEvent() error { if msgBlkNum == self.GetCurrentBlockNo() { // add endorse to block-pool - if err := self.blockPool.newBlockEndorsement(pMsg); err != nil { - log.Errorf("failed to add endorsement (%d): %s", msgBlkNum, err) - return nil - } + self.blockPool.newBlockEndorsement(pMsg) log.Infof("server %d received endorse from %d, for proposer %d, block %d, empty: %t", self.Index, pMsg.Endorser, pMsg.EndorsedProposer, msgBlkNum, pMsg.EndorseForEmpty) @@ -1247,14 +1220,9 @@ func (self *Server) processMsgEvent() error { // TODO: should only count endorsements from endorsers if proposer, forEmpty, done := self.blockPool.endorseDone(msgBlkNum, self.config.C); done { // stop endorse timer - if err := self.timer.CancelEndorseMsgTimer(msgBlkNum); err != nil { - log.Errorf("failed to cancel endorse timer, blockNum %d, err: %s", msgBlkNum, err) - } + self.timer.CancelEndorseMsgTimer(msgBlkNum) // stop empty endorse timer - if err := self.timer.CancelEndorseEmptyBlockTimer(msgBlkNum); err != nil { - log.Errorf("failed to cancel empty endorse timer, blockNum %d, err: %s", msgBlkNum, err) - } - + self.timer.CancelEndorseEmptyBlockTimer(msgBlkNum) proposal := self.findBlockProposal(msgBlkNum, proposer, forEmpty) if proposal == nil { log.Infof("server %d endorse %d done, waiting proposal from %d", self.Index, msgBlkNum, proposer) @@ -1321,9 +1289,7 @@ func (self *Server) processMsgEvent() error { } // stop commit timer - if err := self.timer.CancelCommitMsgTimer(msgBlkNum); err != nil { - log.Errorf("failed to cancel commit timer, blockNum: %d, err: %s", msgBlkNum, err) - } + self.timer.CancelCommitMsgTimer(msgBlkNum) if err := self.makeSealed(proposal, forEmpty); err != nil { log.Errorf("failed to seal block %d, err: %s", msgBlkNum, err) @@ -1739,9 +1705,7 @@ func (self *Server) processTimerEvent(evt *TimerEvent) error { } } } else { - if err := self.timer.StartEndorseEmptyBlockTimer(evt.blockNum); err != nil { - return fmt.Errorf("failed to start empty endorse timer (%d): %s", evt.blockNum, err) - } + self.timer.StartEndorseEmptyBlockTimer(evt.blockNum) } } return nil @@ -1803,9 +1767,7 @@ func (self *Server) processTimerEvent(evt *TimerEvent) error { } func (self *Server) processHandshakeMsg(peerIdx uint32, msg *peerHandshakeMsg) error { - if err := self.peerPool.peerHandshake(peerIdx, msg); err != nil { - return fmt.Errorf("failed to update peer %d: %s", peerIdx, err) - } + self.peerPool.peerHandshake(peerIdx, msg) self.stateMgr.StateEventC <- &StateEvent{ Type: UpdatePeerConfig, peerState: &PeerState{ @@ -1819,11 +1781,8 @@ func (self *Server) processHandshakeMsg(peerIdx uint32, msg *peerHandshakeMsg) e return nil } -func (self *Server) processHeartbeatMsg(peerIdx uint32, msg *peerHeartbeatMsg) error { - - if err := self.peerPool.peerHeartbeat(peerIdx, msg); err != nil { - return fmt.Errorf("failed to update peer %d: %s", peerIdx, err) - } +func (self *Server) processHeartbeatMsg(peerIdx uint32, msg *peerHeartbeatMsg) { + self.peerPool.peerHeartbeat(peerIdx, msg) log.Debugf("server %d received heartbeat from peer %d, chainview %d, blkNum %d", self.Index, peerIdx, msg.ChainConfigView, msg.CommittedBlockNumber) self.stateMgr.StateEventC <- &StateEvent{ @@ -1835,8 +1794,6 @@ func (self *Server) processHeartbeatMsg(peerIdx uint32, msg *peerHeartbeatMsg) e committedBlockNum: msg.CommittedBlockNumber, }, } - - return nil } func (self *Server) endorseBlock(proposal *blockProposalMsg, forEmpty bool) error { @@ -1877,21 +1834,16 @@ func (self *Server) endorseBlock(proposal *blockProposalMsg, forEmpty bool) erro log.Infof("endorser %d, endorsed block %d, from server %d", self.Index, blkNum, proposal.Block.getProposer()) // broadcast my endorsement - return self.broadcast(endorseMsg) + self.broadcast(endorseMsg) + return nil } // start endorsing timer // TODO: endorsing may have reached consensus before received proposal, handle this if !forEmpty { - if err := self.timer.StartEndorsingTimer(blkNum); err != nil { - return fmt.Errorf("server %d failed to start endorser timer, blockNum %d, err: %s", - self.Index, blkNum, err) - } + self.timer.StartEndorsingTimer(blkNum) } else { - if err := self.timer.StartEndorseEmptyBlockTimer(blkNum); err != nil { - return fmt.Errorf("server %d failed to start empty endorse timer (%d): %s", - self.Index, blkNum, err) - } + self.timer.StartEndorseEmptyBlockTimer(blkNum) } return nil @@ -1943,14 +1895,13 @@ func (self *Server) commitBlock(proposal *blockProposalMsg, forEmpty bool) error log.Infof("committer %d, set block %d committed, from server %d", self.Index, blkNum, proposal.Block.getProposer()) // broadcast my commitment - return self.broadcast(commitMsg) + self.broadcast(commitMsg) + return nil } // start commit timer // TODO: committing may have reached consensus before received endorsement, handle this - if err := self.timer.StartCommitTimer(blkNum); err != nil { - return fmt.Errorf("failed to start commit timer (%d): %s", blkNum, err) - } + self.timer.StartCommitTimer(blkNum) return nil } @@ -2182,7 +2133,8 @@ func (self *Server) makeProposal(blkNum uint32, forEmpty bool) error { h, _ := HashMsg(proposal) self.msgPool.AddMsg(proposal, h) self.processProposalMsg(proposal) - return self.broadcast(proposal) + self.broadcast(proposal) + return nil } func (self *Server) makeCommitment(proposal *blockProposalMsg, blkNum uint32, forEmpty bool) error { @@ -2249,10 +2201,7 @@ func (self *Server) reBroadcastCurrentRoundMsgs() error { } func (self *Server) fetchProposal(blkNum uint32, proposer uint32) error { - msg, err := self.constructProposalFetchMsg(blkNum, proposer) - if err != nil { - return nil - } + msg := self.constructProposalFetchMsg(blkNum, proposer) self.msgSendC <- &SendMsgEvent{ ToPeer: math.MaxUint32, Msg: msg, @@ -2285,9 +2234,7 @@ func (self *Server) handleProposalTimeout(evt *TimerEvent) error { if err := self.makeProposal(evt.blockNum, true); err != nil { return fmt.Errorf("failed to propose empty block: %s", err) } - if err := self.timer.Start2ndProposalTimer(evt.blockNum); err != nil { - return fmt.Errorf("failed to start 2nd proposal timer: %s", err) - } + self.timer.Start2ndProposalTimer(evt.blockNum) log.Infof("server %d proposed empty block for blk %d", self.Index, evt.blockNum) } return nil diff --git a/consensus/vbft/state_mgmt.go b/consensus/vbft/state_mgmt.go index f13cbedfd7..a826f8071d 100644 --- a/consensus/vbft/state_mgmt.go +++ b/consensus/vbft/state_mgmt.go @@ -148,13 +148,9 @@ func (self *StateMgr) run() { } case UpdatePeerState: if evt.peerState.connected { - if err := self.onPeerUpdate(evt.peerState); err != nil { - log.Errorf("statemgr process peer (%d) err: %s", evt.peerState.peerIdx, err) - } + self.onPeerUpdate(evt.peerState) } else { - if err := self.onPeerDisconnected(evt.peerState.peerIdx); err != nil { - log.Errorf("statmgr process peer (%d) disconn err: %s", evt.peerState.peerIdx, err) - } + self.onPeerDisconnected(evt.peerState.peerIdx) } case SyncDone: @@ -176,7 +172,7 @@ func (self *StateMgr) run() { } } -func (self *StateMgr) onPeerUpdate(peerState *PeerState) error { +func (self *StateMgr) onPeerUpdate(peerState *PeerState) { peerIdx := peerState.peerIdx newPeer := false if _, present := self.peers[peerIdx]; !present { @@ -193,10 +189,8 @@ func (self *StateMgr) onPeerUpdate(peerState *PeerState) error { if isActive(self.currentState) && peerState.committedBlockNum > self.server.GetCurrentBlockNo()+MAX_SYNCING_CHECK_BLK_NUM { log.Warnf("server %d seems lost sync: %d(%d) vs %d", self.server.Index, peerState.committedBlockNum, peerState.peerIdx, self.server.GetCurrentBlockNo()) - if err := self.checkStartSyncing(self.server.GetCommittedBlockNo()+MAX_SYNCING_CHECK_BLK_NUM, false); err != nil { - log.Errorf("server %d start syncing check failed", self.server.Index) - } - return nil + self.checkStartSyncing(self.server.GetCommittedBlockNo()+MAX_SYNCING_CHECK_BLK_NUM, false) + return } } @@ -259,14 +253,12 @@ func (self *StateMgr) onPeerUpdate(peerState *PeerState) error { self.checkStartSyncing(self.server.GetCommittedBlockNo()+MAX_SYNCING_CHECK_BLK_NUM, false) } } - - return nil } -func (self *StateMgr) onPeerDisconnected(peerIdx uint32) error { +func (self *StateMgr) onPeerDisconnected(peerIdx uint32) { if _, present := self.peers[peerIdx]; !present { - return nil + return } delete(self.peers, peerIdx) @@ -277,7 +269,6 @@ func (self *StateMgr) onPeerDisconnected(peerIdx uint32) error { } } - return nil } func (self *StateMgr) onLiveTick(evt *StateEvent) error { @@ -379,11 +370,11 @@ func (self *StateMgr) setSyncedReady() error { return nil } -func (self *StateMgr) checkStartSyncing(startBlkNum uint32, forceSync bool) error { +func (self *StateMgr) checkStartSyncing(startBlkNum uint32, forceSync bool) { if self.server.nonConsensusNode() { // non-consensus node, block-syncer do the syncing - return nil + return } var maxCommitted uint32 peers := make(map[uint32][]uint32) @@ -422,8 +413,6 @@ func (self *StateMgr) checkStartSyncing(startBlkNum uint32, forceSync bool) erro log.Infof("server %d, start syncing check %v, %d", self.server.Index, peers, self.server.GetCurrentBlockNo()) self.currentState = SyncingCheck } - - return nil } // return 0 if consensus not reached yet diff --git a/consensus/vbft/state_mgmt_test.go b/consensus/vbft/state_mgmt_test.go index 3ebbd4d9f4..70ce24068c 100644 --- a/consensus/vbft/state_mgmt_test.go +++ b/consensus/vbft/state_mgmt_test.go @@ -161,9 +161,7 @@ func TestStateMgr_onPeerUpdate(t *testing.T) { liveTicker: tt.fields.liveTicker, lastTickChainHeight: tt.fields.lastTickChainHeight, } - if err := self.onPeerUpdate(tt.args.peerState); (err != nil) != tt.wantErr { - t.Errorf("StateMgr.onPeerUpdate() error = %v, wantErr %v", err, tt.wantErr) - } + self.onPeerUpdate(tt.args.peerState) }) } } @@ -184,11 +182,7 @@ func constructPeerState() *StateMgr { } func TestStateMgr_onPeerDisconnected(t *testing.T) { statemgr := constructPeerState() - err := statemgr.onPeerDisconnected(1) - if err != nil { - t.Errorf("TestonPeerDisconnected failed:%v", err) - return - } + statemgr.onPeerDisconnected(1) t.Logf("TestonPeerDisconnected succ") }