Skip to content

Commit

Permalink
Add networkDB queue length stats
Browse files Browse the repository at this point in the history
This can be handy to identify the throughput of the node per network
especially to identify if there is an unbalance between nodes that can
point to an MTU asymmetry

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed Jul 17, 2017
1 parent c7c610a commit d61fe86
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 24 deletions.
8 changes: 4 additions & 4 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
listenAddr, _, _ := net.SplitHostPort(listen)

logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v MTU=%d",
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList, c.Config().Daemon.ControlPlaneMTU)
listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList, c.Config().Daemon.NetworkControlPlaneMTU)
if advAddr != "" && agent == nil {
if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
logrus.Errorf("error in agentInit: %v", err)
Expand Down Expand Up @@ -291,12 +291,12 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
netDBConf.BindAddr = listenAddr
netDBConf.AdvertiseAddr = advertiseAddr
netDBConf.Keys = keys
if c.Config().Daemon.ControlPlaneMTU != 0 {
if c.Config().Daemon.NetworkControlPlaneMTU != 0 {
// Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
// To be on the safe side let's cut 100 bytes
netDBConf.PacketBufferSize = (c.Config().Daemon.ControlPlaneMTU - 100)
netDBConf.PacketBufferSize = (c.Config().Daemon.NetworkControlPlaneMTU - 100)
logrus.Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
c.Config().Daemon.ControlPlaneMTU, netDBConf.PacketBufferSize)
c.Config().Daemon.NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
}
nDB, err := networkdb.New(netDBConf)

Expand Down
28 changes: 14 additions & 14 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ type Config struct {

// DaemonCfg represents libnetwork core configuration
type DaemonCfg struct {
Debug bool
Experimental bool
DataDir string
DefaultNetwork string
DefaultDriver string
Labels []string
DriverCfg map[string]interface{}
ClusterProvider cluster.Provider
ControlPlaneMTU int
Debug bool
Experimental bool
DataDir string
DefaultNetwork string
DefaultDriver string
Labels []string
DriverCfg map[string]interface{}
ClusterProvider cluster.Provider
NetworkControlPlaneMTU int
}

// ClusterCfg represents cluster configuration
Expand Down Expand Up @@ -222,15 +222,15 @@ func OptionExperimental(exp bool) Option {
}
}

// OptionControlPlaneMTU function returns an option setter for control plane MTU
func OptionControlPlaneMTU(exp int) Option {
// OptionNetworkControlPlaneMTU function returns an option setter for control plane MTU
func OptionNetworkControlPlaneMTU(exp int) Option {
return func(c *Config) {
logrus.Debugf("Control Plane MTU: %d", exp)
logrus.Debugf("Network Control Plane MTU: %d", exp)
if exp < 1500 {
logrus.Warnf("Configured a Control Plane MTU of %d, this value is too LOW, fallback to default", exp)
logrus.Warnf("Configured a Network Control Plane MTU of %d, this value is too LOW, fallback to default", exp)
exp = 1500
}
c.Daemon.ControlPlaneMTU = exp
c.Daemon.NetworkControlPlaneMTU = exp
}
}

Expand Down
20 changes: 17 additions & 3 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func (nDB *NetworkDB) RemoveKey(key []byte) {
}

func (nDB *NetworkDB) clusterInit() error {
nDB.lastStatsTimestamp = time.Now()

config := memberlist.DefaultLANConfig()
config.Name = nDB.config.NodeName
config.BindAddr = nDB.config.BindAddr
Expand Down Expand Up @@ -200,9 +202,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
mlist := nDB.memberlist

if _, err := mlist.Join(members); err != nil {
// Incase of failure, keep retrying join until it succeeds or the cluster is shutdown.
// In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
go nDB.retryJoin(members, nDB.stopCh)

return fmt.Errorf("could not join node to memberlist: %v", err)
}

Expand Down Expand Up @@ -374,6 +375,7 @@ func (nDB *NetworkDB) gossip() {
networkNodes[nid] = nDB.networkNodes[nid]

}
printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
nDB.RUnlock()

for nid, nodes := range networkNodes {
Expand All @@ -399,6 +401,14 @@ func (nDB *NetworkDB) gossip() {
}

msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("queue stats net:%s qLen:%d netPeers:%d netMsg/s:%d",
nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
}

if len(msgs) == 0 {
continue
}
Expand All @@ -421,6 +431,10 @@ func (nDB *NetworkDB) gossip() {
}
}
}
// Reset the stats
if printStats {
nDB.lastStatsTimestamp = time.Now()
}
}

func (nDB *NetworkDB) bulkSyncTables() {
Expand Down Expand Up @@ -608,7 +622,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
case <-t.C:
logrus.Errorf("Bulk sync to node %s timed out", node)
case <-ch:
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime))
}
t.Stop()
}
Expand Down
17 changes: 14 additions & 3 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type NetworkDB struct {
// bootStrapIP is the list of IPs that can be used to bootstrap
// the gossip.
bootStrapIP []net.IP

// lastStatsTimestamp is the last timestamp when the stats got printed
lastStatsTimestamp time.Time
}

// PeerInfo represents the peer (gossip cluster) nodes of a network
Expand Down Expand Up @@ -127,6 +130,9 @@ type network struct {
// The broadcast queue for table event gossip. This is only
// initialized for this node's network attachment entries.
tableBroadcasts *memberlist.TransmitLimitedQueue

// Number of gossip messages sent related to this network during the last stats collection period
qMessagesSent int
}

// Config represents the configuration of the networdb instance and
Expand Down Expand Up @@ -157,6 +163,10 @@ type Config struct {
// depending on your network's MTU (Maximum Transmission Unit) you may
// be able to increase this to get more content into each gossip packet.
PacketBufferSize int

// StatsPrintPeriod the period to use to print queue stats
// Default is 5sec
StatsPrintPeriod time.Duration
}

// entry defines a table entry
Expand Down Expand Up @@ -186,6 +196,7 @@ func DefaultConfig() *Config {
NodeName: hostname,
BindAddr: "0.0.0.0",
PacketBufferSize: 1400,
StatsPrintPeriod: 5 * time.Second,
}
}

Expand Down Expand Up @@ -218,6 +229,7 @@ func New(c *Config) (*NetworkDB, error) {
// instances passed by the caller in the form of addr:port
func (nDB *NetworkDB) Join(members []string) error {
nDB.Lock()
nDB.bootStrapIP = make([]net.IP, 0, len(members))
for _, m := range members {
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
}
Expand Down Expand Up @@ -499,9 +511,8 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
nDB.RLock()
num := len(nDB.networkNodes[nid])
nDB.RUnlock()
return num
defer nDB.RUnlock()
return len(nDB.networkNodes[nid])
},
RetransmitMult: 4,
}
Expand Down

0 comments on commit d61fe86

Please sign in to comment.