Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
TATAUFO committed Mar 1, 2020
1 parent 157531b commit bb864d7
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 61 deletions.
8 changes: 4 additions & 4 deletions node/ask.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func (n *Node) askRoots(pid common.Hash) error {
waveID := common.CreateHash()
if err := p.SendQuestion(waveID, galaxy.CmdRoots); err != nil {
return err
} else {
if err := n.recordQuestion(pid, waveID); err != nil {
return err
}
}
if err := n.recordQuestion(pid, waveID); err != nil {
return err
}

return nil
}

Expand Down
119 changes: 62 additions & 57 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,22 @@ type Node struct {
wsAcceptMsg bool
peerSyncCnt map[common.Hash]int
lastSyncMsg common.Hash
standardLoopCnt map[common.Hash]uint64
}

// New is used to create new node
func New(udb db.UDB) (node *Node, err error) {
node = &Node{
udb: udb,
tpInterval: uint64(1),
localPort: DefaultLocalPort,
peers: make(map[common.Hash]*peer.Peer),
pingpongRecord: make(map[common.Hash]*Record),
questionRecord: make(map[common.Hash]*Record),
wsAcceptMsg: false,
peerSyncCnt: make(map[common.Hash]int),
lastSyncMsg: common.Hash{},
udb: udb,
tpInterval: uint64(1),
localPort: DefaultLocalPort,
peers: make(map[common.Hash]*peer.Peer),
pingpongRecord: make(map[common.Hash]*Record),
questionRecord: make(map[common.Hash]*Record),
wsAcceptMsg: false,
peerSyncCnt: make(map[common.Hash]int),
lastSyncMsg: common.Hash{},
standardLoopCnt: make(map[common.Hash]uint64),
}
rand.Seed(time.Now().UnixNano())
if err := node.loadUniverse(); err != nil {
Expand Down Expand Up @@ -324,63 +326,67 @@ func (n *Node) checkRecord() {
}
}

func (n *Node) standardLoop(chanWave chan<- galaxy.Wave, chanWSig chan<- common.Hash) {
for k, p := range n.peers {
if !p.Connected() {
if err := p.Dial(); err != nil {
log.Error(err)
n.removePeer(k)
continue
}
if err := n.askPeers(k); err != nil {
log.Error(err)
continue
}
go n.serveReceiveWave(p.Conn, k, chanWave, chanWSig)
} else {
if loopCnt, ok := n.standardLoopCnt[k]; !ok || loopCnt >= maxPeerLoopCnt {
n.standardLoopCnt[k] = 0
}
n.standardLoopCnt[k]++

if err := n.askPing(k); err != nil {
log.Error(err)
continue
}

// get roots if universe not exist, so break if not err
if n.initStep < db.StepRootsSaved {
if err := n.askRoots(k); err != nil {
log.Error(err)
continue
}
break // done for this loop
}

// sync from peers,
if n.standardLoopCnt[k] == 1 {
log.Trace("Start to sync from other peer ")
n.peerSyncCnt[k] = syncMsgLoopCnt
if err := n.askMsg(k); err != nil {
log.Error(err)
continue
}
} else {
n.peerSyncCnt[k] = 0
}

}
}
}

func (n *Node) runNode(sig <-chan struct{}, wait chan<- struct{}) {
// run node
chanWave := make(chan galaxy.Wave)
chanWSig := make(chan common.Hash)
peerLoopCnt := make(map[common.Hash]uint64)
n.standardLoopCnt = make(map[common.Hash]uint64)

for {
select {
case <-time.After(time.Second * time.Duration(checkPeerInterval)):
log.Info("Update information from peers")
n.checkRecord()
for k, p := range n.peers {
if !p.Connected() {
if err := p.Dial(); err != nil {
log.Error(err)
n.removePeer(k)
continue
}
if err := n.askPeers(k); err != nil {
log.Error(err)
continue
}
go n.serveReceiveWave(p.Conn, k, chanWave, chanWSig)
} else {
if loopCnt, ok := peerLoopCnt[k]; !ok || loopCnt >= maxPeerLoopCnt {
peerLoopCnt[k] = 0
}
peerLoopCnt[k]++

if err := n.askPing(k); err != nil {
log.Error(err)
continue
}

// get roots if universe not exist, so break if not err
if n.initStep < db.StepRootsSaved {
if err := n.askRoots(k); err != nil {
log.Error(err)
continue
}
break // done for this loop
}

// sync from peers,
if peerLoopCnt[k] == 1 {
log.Trace("Start to sync from other peer ")
n.peerSyncCnt[k] = syncMsgLoopCnt
if err := n.askMsg(k); err != nil {
log.Error(err)
continue
}
} else {
n.peerSyncCnt[k] = 0
}

}
}
n.standardLoop(chanWave, chanWSig)
case k := <-chanWSig:
n.removePeer(k)
case <-sig:
Expand All @@ -395,7 +401,6 @@ func (n *Node) runNode(sig <-chan struct{}, wait chan<- struct{}) {
n.reAskMsg(waveID)
}
n.delRecord(waveID, w.Command())

}
}
}
Expand Down

0 comments on commit bb864d7

Please sign in to comment.