Skip to content

Commit

Permalink
Add networkDB queue length stats
Browse files Browse the repository at this point in the history
This can be handy to identify the throughput of the node per network
especially to identify if there is an unbalance between nodes that can
point to an MTU asymmetry

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed Jul 14, 2017
1 parent c7c610a commit ae47a6c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 6 deletions.
20 changes: 17 additions & 3 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func (nDB *NetworkDB) RemoveKey(key []byte) {
}

func (nDB *NetworkDB) clusterInit() error {
nDB.lastStatsTimestamp = time.Now()

config := memberlist.DefaultLANConfig()
config.Name = nDB.config.NodeName
config.BindAddr = nDB.config.BindAddr
Expand Down Expand Up @@ -200,9 +202,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
mlist := nDB.memberlist

if _, err := mlist.Join(members); err != nil {
// Incase of failure, keep retrying join until it succeeds or the cluster is shutdown.
// In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
go nDB.retryJoin(members, nDB.stopCh)

return fmt.Errorf("could not join node to memberlist: %v", err)
}

Expand Down Expand Up @@ -374,6 +375,7 @@ func (nDB *NetworkDB) gossip() {
networkNodes[nid] = nDB.networkNodes[nid]

}
printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
nDB.RUnlock()

for nid, nodes := range networkNodes {
Expand All @@ -399,6 +401,14 @@ func (nDB *NetworkDB) gossip() {
}

msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("queue stats net:%s qLen:%d netPeers:%d netMsg/s:%d",
nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
}

if len(msgs) == 0 {
continue
}
Expand All @@ -421,6 +431,10 @@ func (nDB *NetworkDB) gossip() {
}
}
}
// Reset the stats
if printStats {
nDB.lastStatsTimestamp = time.Now()
}
}

func (nDB *NetworkDB) bulkSyncTables() {
Expand Down Expand Up @@ -608,7 +622,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
case <-t.C:
logrus.Errorf("Bulk sync to node %s timed out", node)
case <-ch:
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime))
}
t.Stop()
}
Expand Down
17 changes: 14 additions & 3 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type NetworkDB struct {
// bootStrapIP is the list of IPs that can be used to bootstrap
// the gossip.
bootStrapIP []net.IP

// lastStatsTimestamp is the last timestamp when the stats got printed
lastStatsTimestamp time.Time
}

// PeerInfo represents the peer (gossip cluster) nodes of a network
Expand Down Expand Up @@ -127,6 +130,9 @@ type network struct {
// The broadcast queue for table event gossip. This is only
// initialized for this node's network attachment entries.
tableBroadcasts *memberlist.TransmitLimitedQueue

// Number of gossip messages sent related to this network during the last stats collection period
qMessagesSent int
}

// Config represents the configuration of the networdb instance and
Expand Down Expand Up @@ -157,6 +163,10 @@ type Config struct {
// depending on your network's MTU (Maximum Transmission Unit) you may
// be able to increase this to get more content into each gossip packet.
PacketBufferSize int

// StatsPrintPeriod the period to use to print queue stats
// Default is 5sec
StatsPrintPeriod time.Duration
}

// entry defines a table entry
Expand Down Expand Up @@ -186,6 +196,7 @@ func DefaultConfig() *Config {
NodeName: hostname,
BindAddr: "0.0.0.0",
PacketBufferSize: 1400,
StatsPrintPeriod: 5 * time.Second,
}
}

Expand Down Expand Up @@ -218,6 +229,7 @@ func New(c *Config) (*NetworkDB, error) {
// instances passed by the caller in the form of addr:port
func (nDB *NetworkDB) Join(members []string) error {
nDB.Lock()
nDB.bootStrapIP = make([]net.IP, 0, len(members))
for _, m := range members {
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
}
Expand Down Expand Up @@ -499,9 +511,8 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
nDB.RLock()
num := len(nDB.networkNodes[nid])
nDB.RUnlock()
return num
defer nDB.RUnlock()
return len(nDB.networkNodes[nid])
},
RetransmitMult: 4,
}
Expand Down

0 comments on commit ae47a6c

Please sign in to comment.