Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NetworkDB to honor the Network Control Plane MTU #1839

Merged
merged 1 commit into from
Jul 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
listen := clusterProvider.GetListenAddress()
listenAddr, _, _ := net.SplitHostPort(listen)

logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v",
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v MTU=%d",
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList, c.Config().Daemon.NetworkControlPlaneMTU)
if advAddr != "" && agent == nil {
if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
logrus.Errorf("error in agentInit: %v", err)
Expand Down Expand Up @@ -286,12 +286,19 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName)

nDB, err := networkdb.New(&networkdb.Config{
BindAddr: listenAddr,
AdvertiseAddr: advertiseAddr,
NodeName: nodeName,
Keys: keys,
})
netDBConf := networkdb.DefaultConfig()
netDBConf.NodeName = nodeName
netDBConf.BindAddr = listenAddr
netDBConf.AdvertiseAddr = advertiseAddr
netDBConf.Keys = keys
if c.Config().Daemon.NetworkControlPlaneMTU != 0 {
// Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
// To be on the safe side let's cut 100 bytes
netDBConf.PacketBufferSize = (c.Config().Daemon.NetworkControlPlaneMTU - 100)
logrus.Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
c.Config().Daemon.NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
}
nDB, err := networkdb.New(netDBConf)

if err != nil {
return err
Expand Down
35 changes: 23 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ type Config struct {

// DaemonCfg represents libnetwork core configuration
type DaemonCfg struct {
Debug bool
Experimental bool
DataDir string
DefaultNetwork string
DefaultDriver string
Labels []string
DriverCfg map[string]interface{}
ClusterProvider cluster.Provider
Debug bool
Experimental bool
DataDir string
DefaultNetwork string
DefaultDriver string
Labels []string
DriverCfg map[string]interface{}
ClusterProvider cluster.Provider
NetworkControlPlaneMTU int
}

// ClusterCfg represents cluster configuration
Expand Down Expand Up @@ -221,6 +222,19 @@ func OptionExperimental(exp bool) Option {
}
}

// OptionNetworkControlPlaneMTU function returns an option setter for control plane MTU
func OptionNetworkControlPlaneMTU(exp int) Option {
return func(c *Config) {
logrus.Debugf("Network Control Plane MTU: %d", exp)
if exp < 1500 {
// if exp == 0 the value won't be used
logrus.Warnf("Received a MTU of %d, this value is very low,",
"the network control plane can misbehave", exp)
}
c.Daemon.NetworkControlPlaneMTU = exp
}
}

// ProcessOptions processes options and stores it in config
func (c *Config) ProcessOptions(options ...Option) {
for _, opt := range options {
Expand All @@ -232,10 +246,7 @@ func (c *Config) ProcessOptions(options ...Option) {

// IsValidName validates configuration objects supported by libnetwork
func IsValidName(name string) bool {
if strings.TrimSpace(name) == "" {
return false
}
return true
return strings.TrimSpace(name) != ""
}

// OptionLocalKVProvider function returns an option setter for kvstore provider
Expand Down
37 changes: 31 additions & 6 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,14 @@ func (nDB *NetworkDB) RemoveKey(key []byte) {
}

func (nDB *NetworkDB) clusterInit() error {
nDB.lastStatsTimestamp = time.Now()
nDB.lastHealthTimestamp = nDB.lastStatsTimestamp

config := memberlist.DefaultLANConfig()
config.Name = nDB.config.NodeName
config.BindAddr = nDB.config.BindAddr
config.AdvertiseAddr = nDB.config.AdvertiseAddr
config.UDPBufferSize = nDB.config.PacketBufferSize

if nDB.config.BindPort != 0 {
config.BindPort = nDB.config.BindPort
Expand Down Expand Up @@ -199,9 +203,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
mlist := nDB.memberlist

if _, err := mlist.Join(members); err != nil {
// Incase of failure, keep retrying join until it succeeds or the cluster is shutdown.
// In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
go nDB.retryJoin(members, nDB.stopCh)

return fmt.Errorf("could not join node to memberlist: %v", err)
}

Expand Down Expand Up @@ -372,11 +375,21 @@ func (nDB *NetworkDB) gossip() {
networkNodes[nid] = nDB.networkNodes[nid]

}
printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
printHealth := time.Since(nDB.lastHealthTimestamp) >= nDB.config.HealthPrintPeriod
nDB.RUnlock()

if printHealth {
healthScore := nDB.memberlist.GetHealthScore()
if healthScore != 0 {
logrus.Warnf("NetworkDB stats - healthscore:%d (connectivity issues)", healthScore)
}
nDB.lastHealthTimestamp = time.Now()
}

for nid, nodes := range networkNodes {
mNodes := nDB.mRandomNodes(3, nodes)
bytesAvail := udpSendBuf - compoundHeaderOverhead
bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead

nDB.RLock()
network, ok := thisNodeNetworks[nid]
Expand All @@ -397,6 +410,14 @@ func (nDB *NetworkDB) gossip() {
}

msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d",
nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
}

if len(msgs) == 0 {
continue
}
Expand All @@ -414,11 +435,15 @@ func (nDB *NetworkDB) gossip() {
}

// Send the compound message
if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil {
if err := nDB.memberlist.SendBestEffort(&mnode.Node, compound); err != nil {
logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
}
}
}
// Reset the stats
if printStats {
nDB.lastStatsTimestamp = time.Now()
}
}

func (nDB *NetworkDB) bulkSyncTables() {
Expand Down Expand Up @@ -589,7 +614,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
nDB.bulkSyncAckTbl[node] = ch
nDB.Unlock()

err = nDB.memberlist.SendToTCP(&mnode.Node, buf)
err = nDB.memberlist.SendReliable(&mnode.Node, buf)
if err != nil {
nDB.Lock()
delete(nDB.bulkSyncAckTbl, node)
Expand All @@ -606,7 +631,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
case <-t.C:
logrus.Errorf("Bulk sync to node %s timed out", node)
case <-ch:
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime))
}
t.Stop()
}
Expand Down
4 changes: 0 additions & 4 deletions networkdb/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ package networkdb
import "github.com/gogo/protobuf/proto"

const (
// Max udp message size chosen to avoid network packet
// fragmentation.
udpSendBuf = 1400

// Compound message header overhead 1 byte(message type) + 4
// bytes (num messages)
compoundHeaderOverhead = 5
Expand Down
43 changes: 40 additions & 3 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package networkdb
import (
"fmt"
"net"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -93,6 +94,12 @@ type NetworkDB struct {
// bootStrapIP is the list of IPs that can be used to bootstrap
// the gossip.
bootStrapIP []net.IP

// lastStatsTimestamp is the last timestamp when the stats got printed
lastStatsTimestamp time.Time

// lastHealthTimestamp is the last timestamp when the health score got printed
lastHealthTimestamp time.Time
}

// PeerInfo represents the peer (gossip cluster) nodes of a network
Expand Down Expand Up @@ -126,6 +133,9 @@ type network struct {
// The broadcast queue for table event gossip. This is only
// initialized for this node's network attachment entries.
tableBroadcasts *memberlist.TransmitLimitedQueue

// Number of gossip messages sent related to this network during the last stats collection period
qMessagesSent int
}

// Config represents the configuration of the networdb instance and
Expand All @@ -149,6 +159,21 @@ type Config struct {
// Keys to be added to the Keyring of the memberlist. Key at index
// 0 is the primary key
Keys [][]byte

// PacketBufferSize is the maximum number of bytes that memberlist will
// put in a packet (this will be for UDP packets by default with a NetTransport).
// A safe value for this is typically 1400 bytes (which is the default). However,
// depending on your network's MTU (Maximum Transmission Unit) you may
// be able to increase this to get more content into each gossip packet.
PacketBufferSize int

// StatsPrintPeriod the period to use to print queue stats
// Default is 5min
StatsPrintPeriod time.Duration

// HealthPrintPeriod the period to use to print the health score
// Default is 1min
HealthPrintPeriod time.Duration
}

// entry defines a table entry
Expand All @@ -171,6 +196,18 @@ type entry struct {
reapTime time.Duration
}

// DefaultConfig returns a NetworkDB config with default values
func DefaultConfig() *Config {
hostname, _ := os.Hostname()
return &Config{
NodeName: hostname,
BindAddr: "0.0.0.0",
PacketBufferSize: 1400,
StatsPrintPeriod: 5 * time.Minute,
HealthPrintPeriod: 1 * time.Minute,
}
}

// New creates a new instance of NetworkDB using the Config passed by
// the caller.
func New(c *Config) (*NetworkDB, error) {
Expand Down Expand Up @@ -200,6 +237,7 @@ func New(c *Config) (*NetworkDB, error) {
// instances passed by the caller in the form of addr:port
func (nDB *NetworkDB) Join(members []string) error {
nDB.Lock()
nDB.bootStrapIP = make([]net.IP, 0, len(members))
for _, m := range members {
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
}
Expand Down Expand Up @@ -481,9 +519,8 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
nDB.RLock()
num := len(nDB.networkNodes[nid])
nDB.RUnlock()
return num
defer nDB.RUnlock()
return len(nDB.networkNodes[nid])
},
RetransmitMult: 4,
}
Expand Down
8 changes: 4 additions & 4 deletions networkdb/networkdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func TestMain(m *testing.M) {
func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB {
var dbs []*NetworkDB
for i := 0; i < num; i++ {
db, err := New(&Config{
NodeName: fmt.Sprintf("%s%d", namePrefix, i+1),
BindPort: int(atomic.AddInt32(&dbPort, 1)),
})
conf := DefaultConfig()
conf.NodeName = fmt.Sprintf("%s%d", namePrefix, i+1)
conf.BindPort = int(atomic.AddInt32(&dbPort, 1))
db, err := New(conf)
require.NoError(t, err)

if i != 0 {
Expand Down