Skip to content

Commit

Permalink
Merge branch 'master' into minor_sync_issue
Browse files Browse the repository at this point in the history
  • Loading branch information
ggoranov committed Nov 26, 2018
2 parents d035b55 + 6f607c0 commit dbee71c
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 65 deletions.
3 changes: 1 addition & 2 deletions examples/getting_started/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func main() {

keys := ed25519.RandomKeyPair()

log.Info().Str("private_key", keys.PrivateKeyHex()).Msg("")
log.Info().Str("public_key", keys.PublicKeyHex()).Msg("")
log.Info().Str("private_key", keys.PrivateKeyHex()).Str("public_key", keys.PublicKeyHex()).Msg("Generated keypair.")

builder := network.NewBuilder()
builder.SetKeys(keys)
Expand Down
6 changes: 3 additions & 3 deletions examples/request_benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
)

const (
defaultNumNodes = 5
defaultNumReqPerNode = 50
defaultNumNodes = 8
defaultNumReqPerNode = 100
host = "localhost"
startPort = 23000
)
Expand Down Expand Up @@ -109,7 +109,7 @@ func setupNetworks(host string, startPort int, numNodes int) []*network.Network
}

// Wait for all nodes to finish discovering other peers.
time.Sleep(1 * time.Second)
time.Sleep(5 * time.Second)

return nodes
}
Expand Down
2 changes: 1 addition & 1 deletion log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

var (
logger = zerolog.New(os.Stderr).With().Timestamp().Caller().Logger()
logger = zerolog.New(os.Stderr).With().Timestamp().Logger()
)

func init() {
Expand Down
6 changes: 2 additions & 4 deletions network/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ import (
"github.com/pkg/errors"
)

const (
defaultRequestTimeout = 5 * time.Second
)

// PeerClient represents a single incoming peers client.
type PeerClient struct {
sync.Once

Network *Network

ID *peer.ID
Expand Down
8 changes: 4 additions & 4 deletions network/discovery/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (state *Plugin) Receive(ctx *network.PluginContext) error {

log.Info().
Strs("peers", state.Routes.GetPeerAddresses()).
Msg("bootstrapped w/ peer(s)")
Msg("Bootstrapped w/ peer(s).")
case *protobuf.LookupNodeRequest:
if state.DisableLookup {
break
Expand All @@ -84,7 +84,7 @@ func (state *Plugin) Receive(ctx *network.PluginContext) error {

log.Info().
Strs("peers", state.Routes.GetPeerAddresses()).
Msg("connected to peer(s)")
Msg("Connected to peer(s).")
}

return nil
Expand All @@ -100,10 +100,10 @@ func (state *Plugin) PeerDisconnect(client *network.PeerClient) {
if state.Routes.PeerExists(*client.ID) {
state.Routes.RemovePeer(*client.ID)

log.Info().
log.Debug().
Str("address", client.Network.ID.Address).
Str("peer_address", client.ID.Address).
Msg("peer has disconnected")
Msg("Peer has disconnected.")
}
}
}
74 changes: 23 additions & 51 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package network
import (
"bufio"
"context"
"fmt"
"math/rand"
"net"
"sync"
Expand Down Expand Up @@ -138,15 +137,15 @@ func (n *Network) dispatchMessage(client *PeerClient, msg *protobuf.Message) {
code := opcode.Opcode(msg.Opcode)
switch code {
case opcode.BytesCode:
ptr = &protobuf.Bytes{}
ptr = new(protobuf.Bytes)
case opcode.PingCode:
ptr = &protobuf.Ping{}
ptr = new(protobuf.Ping)
case opcode.PongCode:
ptr = &protobuf.Pong{}
ptr = new(protobuf.Pong)
case opcode.LookupNodeRequestCode:
ptr = &protobuf.LookupNodeRequest{}
ptr = new(protobuf.LookupNodeRequest)
case opcode.LookupNodeResponseCode:
ptr = &protobuf.LookupNodeResponse{}
ptr = new(protobuf.LookupNodeResponse)
case opcode.UnregisteredCode:
log.Error().Msg("network: message received had no opcode")
return
Expand All @@ -158,9 +157,10 @@ func (n *Network) dispatchMessage(client *PeerClient, msg *protobuf.Message) {
return
}
}

if len(msg.Message) > 0 {
if err := proto.Unmarshal(msg.Message, ptr); err != nil {
log.Error().Err(err).Msg("")
log.Error().Msgf("%v", err)
return
}
}
Expand Down Expand Up @@ -200,7 +200,6 @@ func (n *Network) dispatchMessage(client *PeerClient, msg *protobuf.Message) {

// Listen starts listening for peers on a port.
func (n *Network) Listen() {

// Handle 'network starts listening' callback for plugins.
n.plugins.Each(func(plugin PluginInterface) {
plugin.Startup(n)
Expand Down Expand Up @@ -234,7 +233,7 @@ func (n *Network) Listen() {

log.Info().
Str("address", n.Address).
Msg("listening for peers")
Msg("Listening for peers.")

// handle server shutdowns
go func() {
Expand All @@ -249,23 +248,21 @@ func (n *Network) Listen() {
for {
if conn, err := listener.Accept(); err == nil {
go n.Accept(conn)

} else {
// if the Shutdown flag is set, no need to continue with the for loop
select {
case <-n.kill:
log.Info().Msgf("shutting down server on %s.", n.Address)
log.Info().Msgf("Shutting down server %s.", n.Address)
return
default:
log.Error().Err(err).Msg("")
log.Error().Msgf("%v", err)
}
}
}
}

// getOrSetPeerClient either returns a cached peer client or creates a new one given a net.Conn
// or dials the client if no net.Conn is provided.
func (n *Network) getOrSetPeerClient(address string, conn net.Conn) (*PeerClient, error) {
// Client either creates or returns a cached peer client given its host address.
func (n *Network) Client(address string) (*PeerClient, error) {
address, err := ToUnifiedAddress(address)
if err != nil {
return nil, err
Expand All @@ -275,23 +272,6 @@ func (n *Network) getOrSetPeerClient(address string, conn net.Conn) (*PeerClient
return nil, errors.New("network: peer should not dial itself")
}

// if conn is not nil, check that the sender host matches the net.Conn remote host address
if conn != nil {
addrInfo, err := ParseAddress(address)
if err != nil {
return nil, err
}

remoteAddrInfo, err := ParseAddress(fmt.Sprintf("%s://%s", conn.RemoteAddr().Network(), conn.RemoteAddr().String()))
if err != nil {
return nil, err
}

if addrInfo.Host != remoteAddrInfo.Host {
return nil, errors.New("network: sender address did not match connection remote address")
}
}

clientNew, err := createPeerClient(n, address)
if err != nil {
return nil, err
Expand All @@ -313,12 +293,10 @@ func (n *Network) getOrSetPeerClient(address string, conn net.Conn) (*PeerClient
client.setOutgoingReady()
}()

if conn == nil {
conn, err = n.Dial(address)
if err != nil {
n.peers.Delete(address)
return nil, err
}
conn, err := n.Dial(address)
if err != nil {
n.peers.Delete(address)
return nil, err
}

n.connections.Store(address, &ConnState{
Expand All @@ -332,11 +310,6 @@ func (n *Network) getOrSetPeerClient(address string, conn net.Conn) (*PeerClient
return client, nil
}

// Client either creates or returns a cached peer client given its host address.
func (n *Network) Client(address string) (*PeerClient, error) {
return n.getOrSetPeerClient(address, nil)
}

// ConnectionStateExists returns true if network has a connection on a given address.
func (n *Network) ConnectionStateExists(address string) bool {
_, ok := n.connections.Load(address)
Expand Down Expand Up @@ -414,16 +387,12 @@ func (n *Network) Dial(address string) (net.Conn, error) {
return nil, err
}

// use the connection for also receiving messages
go n.Accept(conn)

return conn, nil
}

// Accept handles peer registration and processes incoming message streams.
func (n *Network) Accept(incoming net.Conn) {
var client *PeerClient
var clientInit sync.Once

recvWindow := NewRecvWindow(n.opts.recvWindowSize)

Expand All @@ -444,18 +413,21 @@ func (n *Network) Accept(incoming net.Conn) {
msg, err := n.receiveMessage(incoming)
if err != nil {
if err != errEmptyMsg {
log.Error().Err(err).Msg("")
log.Error().Msgf("%v", err)
}
break
}

// Initialize client if not exists.
clientInit.Do(func() {
client, err = n.getOrSetPeerClient(msg.Sender.Address, incoming)
if client == nil {
client, err = n.Client(msg.Sender.Address)

if err != nil {
return
}
}

client.Do(func() {
client.ID = (*peer.ID)(msg.Sender)

if !n.ConnectionStateExists(client.ID.Address) {
Expand All @@ -476,7 +448,7 @@ func (n *Network) Accept(incoming net.Conn) {
log.Error().
Interface("peer_id", peer.ID(*msg.Sender)).
Interface("client_id", client.ID).
Msg("message signed by peer does not match client ID")
Msg("Message signed by peer does not match client ID.")
return
}

Expand Down

0 comments on commit dbee71c

Please sign in to comment.