Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network queues #606

Merged
merged 4 commits into from
Jan 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions pkg/network/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/CityOfZion/neo-go/pkg/core/storage"
"github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/crypto/keys"
"github.com/CityOfZion/neo-go/pkg/io"
"github.com/CityOfZion/neo-go/pkg/network/payload"
"github.com/CityOfZion/neo-go/pkg/util"
"github.com/CityOfZion/neo-go/pkg/vm"
Expand Down Expand Up @@ -177,14 +178,27 @@ func (p *localPeer) RemoteAddr() net.Addr {
func (p *localPeer) PeerAddr() net.Addr {
return &p.netaddr
}
func (p *localPeer) StartProtocol() {}
func (p *localPeer) Disconnect(err error) {}
func (p *localPeer) WriteMsg(msg *Message) error {
p.messageHandler(p.t, msg)
return nil

func (p *localPeer) EnqueueMessage(msg *Message) error {
b, err := msg.Bytes()
if err != nil {
return err
}
return p.EnqueuePacket(b)
}
func (p *localPeer) EnqueuePacket(m []byte) error {
return p.EnqueueHPPacket(m)
}
func (p *localPeer) Done() chan error {
done := make(chan error)
return done
func (p *localPeer) EnqueueHPPacket(m []byte) error {
msg := &Message{}
r := io.NewBinReaderFromBuf(m)
err := msg.Decode(r)
if err == nil {
p.messageHandler(p.t, msg)
}
return nil
}
func (p *localPeer) Version() *payload.Version {
return p.version
Expand All @@ -200,10 +214,12 @@ func (p *localPeer) HandleVersion(v *payload.Version) error {
return nil
}
func (p *localPeer) SendVersion(m *Message) error {
return p.WriteMsg(m)
_ = p.EnqueueMessage(m)
return nil
}
func (p *localPeer) SendVersionAck(m *Message) error {
return p.WriteMsg(m)
_ = p.EnqueueMessage(m)
return nil
}
func (p *localPeer) HandleVersionAck() error {
p.handshaked = true
Expand Down
12 changes: 12 additions & 0 deletions pkg/network/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,18 @@ func (m *Message) Encode(br *io.BinWriter) error {
return nil
}

// Bytes serializes a Message into the new allocated buffer and returns it.
func (m *Message) Bytes() ([]byte, error) {
w := io.NewBufBinWriter()
if err := m.Encode(w.BinWriter); err != nil {
return nil, err
}
if w.Err != nil {
return nil, w.Err
}
return w.Bytes(), nil
}

// convert a command (string) to a byte slice filled with 0 bytes till
// size 12.
func cmdToByteArray(cmd CommandType) [cmdSize]byte {
Expand Down
21 changes: 19 additions & 2 deletions pkg/network/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,31 @@ type Peer interface {
// before that it returns the same address as RemoteAddr.
PeerAddr() net.Addr
Disconnect(error)
WriteMsg(msg *Message) error
Done() chan error

// EnqueueMessage is a temporary wrapper that sends a message via
// EnqueuePacket if there is no error in serializing it.
EnqueueMessage(*Message) error

// EnqueuePacket is a blocking packet enqueuer, it doesn't return until
// it puts given packet into the queue. It accepts a slice of bytes that
// can be shared with other queues (so that message marshalling can be
// done once for all peers). Does nothing is the peer is not yet
// completed handshaking.
EnqueuePacket([]byte) error

// EnqueueHPPacket is a blocking high priority packet enqueuer, it
// doesn't return until it puts given packet into the high-priority
// queue.
EnqueueHPPacket([]byte) error
Version() *payload.Version
LastBlockIndex() uint32
UpdateLastBlockIndex(lbIndex uint32)
Handshaked() bool
SendVersion(*Message) error
SendVersionAck(*Message) error
// StartProtocol is a goroutine to be run after the handshake. It
// implements basic peer-related protocol handling.
StartProtocol()
HandleVersion(*payload.Version) error
HandleVersionAck() error
GetPingSent() int
Expand Down
135 changes: 39 additions & 96 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,71 +305,6 @@ func (s *Server) HandshakedPeersCount() int {
return count
}

// startProtocol starts a long running background loop that interacts
// every ProtoTickInterval with the peer.
func (s *Server) startProtocol(p Peer) {
var err error

s.log.Info("started protocol",
zap.Stringer("addr", p.RemoteAddr()),
zap.ByteString("userAgent", p.Version().UserAgent),
zap.Uint32("startHeight", p.Version().StartHeight),
zap.Uint32("id", p.Version().Nonce))

s.discovery.RegisterGoodAddr(p.PeerAddr().String())
if s.chain.HeaderHeight() < p.LastBlockIndex() {
err = s.requestHeaders(p)
if err != nil {
p.Disconnect(err)
return
}
}

timer := time.NewTimer(s.ProtoTickInterval)
pingTimer := time.NewTimer(s.PingTimeout)
for {
select {
case err = <-p.Done():
// time to stop
case m := <-s.addrReq:
err = p.WriteMsg(m)
case <-timer.C:
// Try to sync in headers and block with the peer if his block height is higher then ours.
if p.LastBlockIndex() > s.chain.BlockHeight() {
err = s.requestBlocks(p)
}
if err == nil {
timer.Reset(s.ProtoTickInterval)
}
if s.chain.HeaderHeight() >= p.LastBlockIndex() {
block, errGetBlock := s.chain.GetBlock(s.chain.CurrentBlockHash())
if errGetBlock != nil {
err = errGetBlock
} else {
diff := uint32(time.Now().UTC().Unix()) - block.Timestamp
if diff > uint32(s.PingInterval/time.Second) {
p.UpdatePingSent(p.GetPingSent() + 1)
err = p.WriteMsg(NewMessage(s.Net, CMDPing, payload.NewPing(s.id, s.chain.HeaderHeight())))
}
}
}
case <-pingTimer.C:
if p.GetPingSent() > defaultPingLimit {
err = errors.New("ping/pong timeout")
} else {
pingTimer.Reset(s.PingTimeout)
p.UpdatePingSent(0)
}
}
if err != nil {
s.unregister <- peerDrop{p, err}
timer.Stop()
p.Disconnect(err)
return
}
}
}

// When a peer connects to the server, we will send our version immediately.
func (s *Server) sendVersion(p Peer) error {
payload := payload.NewVersion(
Expand Down Expand Up @@ -429,7 +364,7 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {

// handlePing processes ping request.
func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
return p.WriteMsg(NewMessage(s.Net, CMDPong, payload.NewPing(s.id, s.chain.BlockHeight())))
return p.EnqueueMessage(NewMessage(s.Net, CMDPong, payload.NewPing(s.id, s.chain.BlockHeight())))
}

// handlePing processes pong request.
Expand Down Expand Up @@ -465,43 +400,49 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
}
}
if len(reqHashes) > 0 {
payload := payload.NewInventory(inv.Type, reqHashes)
return p.WriteMsg(NewMessage(s.Net, CMDGetData, payload))
msg := NewMessage(s.Net, CMDGetData, payload.NewInventory(inv.Type, reqHashes))
pkt, err := msg.Bytes()
if err != nil {
return err
}
if inv.Type == payload.ConsensusType {
return p.EnqueueHPPacket(pkt)
}
return p.EnqueuePacket(pkt)
}
return nil
}

// handleInvCmd processes the received inventory.
func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
switch inv.Type {
case payload.TXType:
for _, hash := range inv.Hashes {
for _, hash := range inv.Hashes {
var msg *Message

switch inv.Type {
case payload.TXType:
tx, _, err := s.chain.GetTransaction(hash)
if err == nil {
err = p.WriteMsg(NewMessage(s.Net, CMDTX, tx))
if err != nil {
return err
}

msg = NewMessage(s.Net, CMDTX, tx)
}
}
case payload.BlockType:
for _, hash := range inv.Hashes {
case payload.BlockType:
b, err := s.chain.GetBlock(hash)
if err == nil {
err = p.WriteMsg(NewMessage(s.Net, CMDBlock, b))
if err != nil {
return err
}
msg = NewMessage(s.Net, CMDBlock, b)
}
}
case payload.ConsensusType:
for _, hash := range inv.Hashes {
case payload.ConsensusType:
if cp := s.consensus.GetPayload(hash); cp != nil {
if err := p.WriteMsg(NewMessage(s.Net, CMDConsensus, cp)); err != nil {
return err
}
msg = NewMessage(s.Net, CMDConsensus, cp)
}
}
if msg != nil {
pkt, err := msg.Bytes()
if err != nil {
return err
}
if inv.Type == payload.ConsensusType {
return p.EnqueueHPPacket(pkt)
}
return p.EnqueuePacket(pkt)
}
}
return nil
Expand Down Expand Up @@ -533,7 +474,8 @@ func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error {
return nil
}
payload := payload.NewInventory(payload.BlockType, blockHashes)
return p.WriteMsg(NewMessage(s.Net, CMDInv, payload))
msg := NewMessage(s.Net, CMDInv, payload)
return p.EnqueueMessage(msg)
}

// handleGetHeadersCmd processes the getheaders request.
Expand Down Expand Up @@ -562,7 +504,8 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlocks) error {
if len(resp.Hdrs) == 0 {
return nil
}
return p.WriteMsg(NewMessage(s.Net, CMDHeaders, &resp))
msg := NewMessage(s.Net, CMDHeaders, &resp)
return p.EnqueueMessage(msg)
}

// handleConsensusCmd processes received consensus payload.
Expand Down Expand Up @@ -603,15 +546,15 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
netaddr, _ := net.ResolveTCPAddr("tcp", addr)
alist.Addrs[i] = payload.NewAddressAndTime(netaddr, ts)
}
return p.WriteMsg(NewMessage(s.Net, CMDAddr, alist))
return p.EnqueueMessage(NewMessage(s.Net, CMDAddr, alist))
}

// requestHeaders sends a getheaders message to the peer.
// The peer will respond with headers op to a count of 2000.
func (s *Server) requestHeaders(p Peer) error {
start := []util.Uint256{s.chain.CurrentHeaderHash()}
payload := payload.NewGetBlocks(start, util.Uint256{})
return p.WriteMsg(NewMessage(s.Net, CMDGetHeaders, payload))
return p.EnqueueMessage(NewMessage(s.Net, CMDGetHeaders, payload))
}

// requestBlocks sends a getdata message to the peer
Expand All @@ -630,7 +573,7 @@ func (s *Server) requestBlocks(p Peer) error {
}
if len(hashes) > 0 {
payload := payload.NewInventory(payload.BlockType, hashes)
return p.WriteMsg(NewMessage(s.Net, CMDGetData, payload))
return p.EnqueueMessage(NewMessage(s.Net, CMDGetData, payload))
} else if s.chain.HeaderHeight() < p.Version().StartHeight {
return s.requestHeaders(p)
}
Expand Down Expand Up @@ -701,7 +644,7 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
if err != nil {
return err
}
go s.startProtocol(peer)
go peer.StartProtocol()

s.tryStartConsensus()
default:
Expand Down Expand Up @@ -732,7 +675,7 @@ func (s *Server) relayInventoryCmd(cmd CommandType, t payload.InventoryType, has
continue
}
// Who cares about these messages anyway?
_ = peer.WriteMsg(msg)
_ = peer.EnqueueMessage(msg)
}
}

Expand Down
Loading