Skip to content

Commit

Permalink
NetworkDB allow setting PacketSize
Browse files Browse the repository at this point in the history
- Introduce the possibility to specify the max buffer length
  in network DB. This will allow to use the whole MTU limit of
  the interface

- Add queue stats per network, it can be handy to identify the
  node's throughput per network and identify unbalance between
  nodes that can point to an MTU missconfiguration

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed Jul 25, 2017
1 parent f81e09a commit 9bf8207
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 37 deletions.
23 changes: 15 additions & 8 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
listen := clusterProvider.GetListenAddress()
listenAddr, _, _ := net.SplitHostPort(listen)

logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v",
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v MTU=%d",
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList, c.Config().Daemon.NetworkControlPlaneMTU)
if advAddr != "" && agent == nil {
if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
logrus.Errorf("error in agentInit: %v", err)
Expand Down Expand Up @@ -286,12 +286,19 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName)

nDB, err := networkdb.New(&networkdb.Config{
BindAddr: listenAddr,
AdvertiseAddr: advertiseAddr,
NodeName: nodeName,
Keys: keys,
})
netDBConf := networkdb.DefaultConfig()
netDBConf.NodeName = nodeName
netDBConf.BindAddr = listenAddr
netDBConf.AdvertiseAddr = advertiseAddr
netDBConf.Keys = keys
if c.Config().Daemon.NetworkControlPlaneMTU != 0 {
// Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
// To be on the safe side let's cut 100 bytes
netDBConf.PacketBufferSize = (c.Config().Daemon.NetworkControlPlaneMTU - 100)
logrus.Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
c.Config().Daemon.NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
}
nDB, err := networkdb.New(netDBConf)

if err != nil {
return err
Expand Down
35 changes: 23 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ type Config struct {

// DaemonCfg represents libnetwork core configuration
type DaemonCfg struct {
Debug bool
Experimental bool
DataDir string
DefaultNetwork string
DefaultDriver string
Labels []string
DriverCfg map[string]interface{}
ClusterProvider cluster.Provider
Debug bool
Experimental bool
DataDir string
DefaultNetwork string
DefaultDriver string
Labels []string
DriverCfg map[string]interface{}
ClusterProvider cluster.Provider
NetworkControlPlaneMTU int
}

// ClusterCfg represents cluster configuration
Expand Down Expand Up @@ -221,6 +222,19 @@ func OptionExperimental(exp bool) Option {
}
}

// OptionNetworkControlPlaneMTU function returns an option setter for control plane MTU
func OptionNetworkControlPlaneMTU(exp int) Option {
return func(c *Config) {
logrus.Debugf("Network Control Plane MTU: %d", exp)
if exp < 1500 {
// if exp == 0 the value won't be used
logrus.Warnf("Received a Network Control Plane MTU of %d, this value is very LOW,",
"the database can misbehave, 0 value will be ignored", exp)
}
c.Daemon.NetworkControlPlaneMTU = exp
}
}

// ProcessOptions processes options and stores it in config
func (c *Config) ProcessOptions(options ...Option) {
for _, opt := range options {
Expand All @@ -232,10 +246,7 @@ func (c *Config) ProcessOptions(options ...Option) {

// IsValidName validates configuration objects supported by libnetwork
func IsValidName(name string) bool {
if strings.TrimSpace(name) == "" {
return false
}
return true
return strings.TrimSpace(name) != ""
}

// OptionLocalKVProvider function returns an option setter for kvstore provider
Expand Down
35 changes: 29 additions & 6 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,13 @@ func (nDB *NetworkDB) RemoveKey(key []byte) {
}

func (nDB *NetworkDB) clusterInit() error {
nDB.lastStatsTimestamp = time.Now()

config := memberlist.DefaultLANConfig()
config.Name = nDB.config.NodeName
config.BindAddr = nDB.config.BindAddr
config.AdvertiseAddr = nDB.config.AdvertiseAddr
config.UDPBufferSize = nDB.config.PacketBufferSize

if nDB.config.BindPort != 0 {
config.BindPort = nDB.config.BindPort
Expand Down Expand Up @@ -199,9 +202,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
mlist := nDB.memberlist

if _, err := mlist.Join(members); err != nil {
// Incase of failure, keep retrying join until it succeeds or the cluster is shutdown.
// In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
go nDB.retryJoin(members, nDB.stopCh)

return fmt.Errorf("could not join node to memberlist: %v", err)
}

Expand Down Expand Up @@ -372,11 +374,20 @@ func (nDB *NetworkDB) gossip() {
networkNodes[nid] = nDB.networkNodes[nid]

}
printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
nDB.RUnlock()

if printStats {
healthScore := nDB.memberlist.GetHealthScore()
if healthScore == 0 {
logrus.Infof("NetworkDB healthscore:%d Node totally healthy", healthScore)
} else {
logrus.Warnf("NetworkDB healthscore:%d Node having connectivity issues", healthScore)
}
}
for nid, nodes := range networkNodes {
mNodes := nDB.mRandomNodes(3, nodes)
bytesAvail := udpSendBuf - compoundHeaderOverhead
bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead

nDB.RLock()
network, ok := thisNodeNetworks[nid]
Expand All @@ -397,6 +408,14 @@ func (nDB *NetworkDB) gossip() {
}

msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("queue stats net:%s qLen:%d netPeers:%d netMsg/s:%d",
nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
}

if len(msgs) == 0 {
continue
}
Expand All @@ -414,11 +433,15 @@ func (nDB *NetworkDB) gossip() {
}

// Send the compound message
if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil {
if err := nDB.memberlist.SendBestEffort(&mnode.Node, compound); err != nil {
logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
}
}
}
// Reset the stats
if printStats {
nDB.lastStatsTimestamp = time.Now()
}
}

func (nDB *NetworkDB) bulkSyncTables() {
Expand Down Expand Up @@ -589,7 +612,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
nDB.bulkSyncAckTbl[node] = ch
nDB.Unlock()

err = nDB.memberlist.SendToTCP(&mnode.Node, buf)
err = nDB.memberlist.SendReliable(&mnode.Node, buf)
if err != nil {
nDB.Lock()
delete(nDB.bulkSyncAckTbl, node)
Expand All @@ -606,7 +629,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
case <-t.C:
logrus.Errorf("Bulk sync to node %s timed out", node)
case <-ch:
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime))
}
t.Stop()
}
Expand Down
4 changes: 0 additions & 4 deletions networkdb/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ package networkdb
import "github.com/gogo/protobuf/proto"

const (
// Max udp message size chosen to avoid network packet
// fragmentation.
udpSendBuf = 1400

// Compound message header overhead 1 byte(message type) + 4
// bytes (num messages)
compoundHeaderOverhead = 5
Expand Down
35 changes: 32 additions & 3 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package networkdb
import (
"fmt"
"net"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -93,6 +94,9 @@ type NetworkDB struct {
// bootStrapIP is the list of IPs that can be used to bootstrap
// the gossip.
bootStrapIP []net.IP

// lastStatsTimestamp is the last timestamp when the stats got printed
lastStatsTimestamp time.Time
}

// PeerInfo represents the peer (gossip cluster) nodes of a network
Expand Down Expand Up @@ -126,6 +130,9 @@ type network struct {
// The broadcast queue for table event gossip. This is only
// initialized for this node's network attachment entries.
tableBroadcasts *memberlist.TransmitLimitedQueue

// Number of gossip messages sent related to this network during the last stats collection period
qMessagesSent int
}

// Config represents the configuration of the networdb instance and
Expand All @@ -149,6 +156,17 @@ type Config struct {
// Keys to be added to the Keyring of the memberlist. Key at index
// 0 is the primary key
Keys [][]byte

// PacketBufferSize is the maximum number of bytes that memberlist will
// put in a packet (this will be for UDP packets by default with a NetTransport).
// A safe value for this is typically 1400 bytes (which is the default). However,
// depending on your network's MTU (Maximum Transmission Unit) you may
// be able to increase this to get more content into each gossip packet.
PacketBufferSize int

// StatsPrintPeriod the period to use to print queue stats
// Default is 5sec
StatsPrintPeriod time.Duration
}

// entry defines a table entry
Expand All @@ -171,6 +189,17 @@ type entry struct {
reapTime time.Duration
}

// DefaultConfig returns a NetworkDB config with default values
func DefaultConfig() *Config {
hostname, _ := os.Hostname()
return &Config{
NodeName: hostname,
BindAddr: "0.0.0.0",
PacketBufferSize: 1400,
StatsPrintPeriod: 5 * time.Second,
}
}

// New creates a new instance of NetworkDB using the Config passed by
// the caller.
func New(c *Config) (*NetworkDB, error) {
Expand Down Expand Up @@ -200,6 +229,7 @@ func New(c *Config) (*NetworkDB, error) {
// instances passed by the caller in the form of addr:port
func (nDB *NetworkDB) Join(members []string) error {
nDB.Lock()
nDB.bootStrapIP = make([]net.IP, 0, len(members))
for _, m := range members {
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
}
Expand Down Expand Up @@ -481,9 +511,8 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
nDB.RLock()
num := len(nDB.networkNodes[nid])
nDB.RUnlock()
return num
defer nDB.RUnlock()
return len(nDB.networkNodes[nid])
},
RetransmitMult: 4,
}
Expand Down
8 changes: 4 additions & 4 deletions networkdb/networkdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func TestMain(m *testing.M) {
func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB {
var dbs []*NetworkDB
for i := 0; i < num; i++ {
db, err := New(&Config{
NodeName: fmt.Sprintf("%s%d", namePrefix, i+1),
BindPort: int(atomic.AddInt32(&dbPort, 1)),
})
conf := DefaultConfig()
conf.NodeName = fmt.Sprintf("%s%d", namePrefix, i+1)
conf.BindPort = int(atomic.AddInt32(&dbPort, 1))
db, err := New(conf)
require.NoError(t, err)

if i != 0 {
Expand Down

0 comments on commit 9bf8207

Please sign in to comment.