Skip to content

Commit

Permalink
refactor node
Browse files Browse the repository at this point in the history
  • Loading branch information
TATAUFO committed Mar 1, 2020
1 parent 5f419bf commit f62651b
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 93 deletions.
1 change: 1 addition & 0 deletions common/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func String2Hash(s string) (Hash, error) {
return Bytes2Hash(h), nil
}

// CreateHash is used to create a random hash as wave id
func CreateHash() Hash {
return Bytes2Hash(new(big.Int).SetUint64(rand.Uint64()).Bytes())
}
133 changes: 133 additions & 0 deletions node/ask.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2019 The PDU Authors
// This file is part of the PDU library.
//
// The PDU library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The PDU library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the PDU library. If not, see <http://www.gnu.org/licenses/>.

package node

import (
"encoding/json"

"github.com/pdupub/go-pdu/common"
"github.com/pdupub/go-pdu/db"
"github.com/pdupub/go-pdu/galaxy"
)

func (n *Node) askPeers(pid common.Hash) error {
p := n.peers[pid]
localPeerBytes, err := json.Marshal(n.localPeer())
if err != nil {
return err
}
waveID := common.CreateHash()
if err := p.SendQuestion(waveID, galaxy.CmdPeers, localPeerBytes); err != nil {
return err
}
if err := n.recordQuestion(pid, waveID); err != nil {
return err
}
return nil
}

func (n *Node) askPing(pid common.Hash) error {
p := n.peers[pid]
// ping each of peer
waveID := common.CreateHash()
if err := p.SendPing(waveID); err != nil {
n.removePeer(pid)
return err
}
if err := n.recordPing(pid, waveID); err != nil {
return err
}
return nil
}

func (n *Node) askRoots(pid common.Hash) error {
p := n.peers[pid]
waveID := common.CreateHash()
if err := p.SendQuestion(waveID, galaxy.CmdRoots); err != nil {
return err
} else {
if err := n.recordQuestion(pid, waveID); err != nil {
return err
}
}
return nil
}

func (n *Node) askMsg(pid common.Hash) error {
p := n.peers[pid]
// get current last message
lastMsg, err := db.GetLastMsg(n.udb)
var lastMsgID common.Hash
if err != nil && err != db.ErrMessageNotFound {
return err
}
if lastMsg != nil {
lastMsgID = lastMsg.ID()
}

if lastMsg != nil && lastMsgID == n.lastSyncMsg {
return errNoNewMsgSync
}

waveID := common.CreateHash()
if err := p.SendQuestion(waveID, galaxy.CmdMessages, lastMsgID); err != nil {
return err
}
if err := n.recordQuestion(pid, waveID); err != nil {
return err
}
return nil
}

func (n *Node) reAskMsg(waveID common.Hash) error {
if r, ok := n.questionRecord[waveID]; ok {
if left, ok := n.peerSyncCnt[r.pid]; ok && left > 0 {
n.peerSyncCnt[r.pid] = left - 1
if err := n.askMsg(r.pid); err != nil {
n.peerSyncCnt[r.pid] = 0
return err
}
}
}
return nil
}

func (n *Node) delRecord(waveID common.Hash, cmd string) {
if cmd == galaxy.CmdPong {
delete(n.pingpongRecord, waveID)
} else {
delete(n.questionRecord, waveID)
}
}

func (n *Node) recordQuestion(peerID, waveID common.Hash) error {
if _, ok := n.questionRecord[waveID]; !ok {
n.questionRecord[waveID] = &Record{pid: peerID, delay: 0}
} else {
return errDuplicateWaveID
}
return nil
}

func (n *Node) recordPing(peerID, waveID common.Hash) error {
if _, ok := n.pingpongRecord[waveID]; !ok {
n.pingpongRecord[waveID] = &Record{pid: peerID, delay: 0}
} else {
return errDuplicateWaveID
}
return nil
}
103 changes: 10 additions & 93 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (n *Node) checkRecord() {
r.delay++
if r.delay > maxPingPongDelayCnt {
// remove this record
delete(n.pingpongRecord, waveID)
n.delRecord(waveID, galaxy.CmdPong)
// remove this peer
n.removePeer(r.pid)
}
Expand All @@ -319,53 +319,11 @@ func (n *Node) checkRecord() {
if r.delay > maxQuestionDelayCnt {
log.Error("current question", common.Hash2String(waveID), "delay", r.delay)
// remove this record
delete(n.questionRecord, waveID)
n.delRecord(waveID, galaxy.CmdQuestion)
}
}
}

func (n *Node) askPeers(pid common.Hash) error {
p := n.peers[pid]
localPeerBytes, err := json.Marshal(n.localPeer())
if err != nil {
return err
}
waveID := common.CreateHash()
if err := p.SendQuestion(waveID, galaxy.CmdPeers, localPeerBytes); err != nil {
return err
}
if err := n.recordQuestion(pid, waveID); err != nil {
return err
}
return nil
}

func (n *Node) askMsg(pid common.Hash) error {
p := n.peers[pid]
// get current last message
lastMsg, err := db.GetLastMsg(n.udb)
var lastMsgID common.Hash
if err != nil && err != db.ErrMessageNotFound {
return err
}
if lastMsg != nil {
lastMsgID = lastMsg.ID()
}

if lastMsg != nil && lastMsgID == n.lastSyncMsg {
return errNoNewMsgSync
}

waveID := common.CreateHash()
if err := p.SendQuestion(waveID, galaxy.CmdMessages, lastMsgID); err != nil {
return err
}
if err := n.recordQuestion(pid, waveID); err != nil {
return err
}
return nil
}

func (n *Node) runNode(sig <-chan struct{}, wait chan<- struct{}) {
// run node
chanWave := make(chan galaxy.Wave)
Expand Down Expand Up @@ -394,32 +352,22 @@ func (n *Node) runNode(sig <-chan struct{}, wait chan<- struct{}) {
peerLoopCnt[k] = 0
}
peerLoopCnt[k]++
// ping each of peer
waveID := common.CreateHash()
if err := p.SendPing(waveID); err != nil {
n.removePeer(k)
continue
}
if err := n.recordPing(k, waveID); err != nil {

if err := n.askPing(k); err != nil {
log.Error(err)
continue
}

// get roots if universe not exist, so break if not err
if n.initStep < db.StepRootsSaved {
waveID = common.CreateHash()
if err := p.SendQuestion(waveID, galaxy.CmdRoots); err != nil {
if err := n.askRoots(k); err != nil {
log.Error(err)
continue
} else {
if err := n.recordQuestion(k, waveID); err != nil {
log.Error(err)
}
break
}
break // done for this loop
}

// sync from peers,
// todo : set n.wsAcceptMsg
if peerLoopCnt[k] == 1 {
log.Trace("Start to sync from other peer ")
n.peerSyncCnt[k] = syncMsgLoopCnt
Expand All @@ -443,46 +391,15 @@ func (n *Node) runNode(sig <-chan struct{}, wait chan<- struct{}) {
waveID, err := n.handleWave(nil, w, true)
if err != nil {
//log.Trace("Peer handler fail", err, "waveID", common.Hash2String(waveID))
} else {
// sync msg if n.peerSyncCnt > 0
if r, ok := n.questionRecord[waveID]; ok && w.Command() == galaxy.CmdMessages {
if left, ok := n.peerSyncCnt[r.pid]; ok && left > 0 {
n.peerSyncCnt[r.pid] = left - 1
if err := n.askMsg(r.pid); err != nil {
log.Error(err)
n.peerSyncCnt[r.pid] = 0
}
}
}
} else if w.Command() == galaxy.CmdMessages {
n.reAskMsg(waveID)
}
n.delRecord(waveID, w.Command())

if w.Command() == galaxy.CmdPong {
delete(n.pingpongRecord, waveID)
} else {
delete(n.questionRecord, waveID)
}
}
}
}

func (n *Node) recordQuestion(peerID, waveID common.Hash) error {
if _, ok := n.questionRecord[waveID]; !ok {
n.questionRecord[waveID] = &Record{pid: peerID, delay: 0}
} else {
return errDuplicateWaveID
}
return nil
}

func (n *Node) recordPing(peerID, waveID common.Hash) error {
if _, ok := n.pingpongRecord[waveID]; !ok {
n.pingpongRecord[waveID] = &Record{pid: peerID, delay: 0}
} else {
return errDuplicateWaveID
}
return nil
}

func (n *Node) runTimeProof(sig <-chan struct{}, wait chan<- struct{}) {
for {
select {
Expand Down

0 comments on commit f62651b

Please sign in to comment.