Skip to content

Commit

Permalink
NetworkDB backport from master
Browse files Browse the repository at this point in the history
Aligned networkDB folder to master code (commit:fbba5555)

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed Sep 22, 2017
1 parent b93fb93 commit 2966d33
Show file tree
Hide file tree
Showing 9 changed files with 753 additions and 431 deletions.
36 changes: 16 additions & 20 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,13 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
a.networkDB.SetKey(added)
}

key, tag, err := c.getPrimaryKeyTag(subsysGossip)
key, _, err := c.getPrimaryKeyTag(subsysGossip)
if err != nil {
return err
}
a.networkDB.SetPrimaryKey(key)

key, tag, err = c.getPrimaryKeyTag(subsysIPSec)
key, tag, err := c.getPrimaryKeyTag(subsysIPSec)
if err != nil {
return err
}
Expand Down Expand Up @@ -286,12 +286,12 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName)

nDB, err := networkdb.New(&networkdb.Config{
BindAddr: listenAddr,
AdvertiseAddr: advertiseAddr,
NodeName: nodeName,
Keys: keys,
})
netDBConf := networkdb.DefaultConfig()
netDBConf.NodeName = nodeName
netDBConf.BindAddr = listenAddr
netDBConf.AdvertiseAddr = advertiseAddr
netDBConf.Keys = keys
nDB, err := networkdb.New(netDBConf)

if err != nil {
return err
Expand Down Expand Up @@ -383,15 +383,11 @@ func (c *controller) agentClose() {

agent.Lock()
for _, cancelFuncs := range agent.driverCancelFuncs {
for _, cancel := range cancelFuncs {
cancelList = append(cancelList, cancel)
}
cancelList = append(cancelList, cancelFuncs...)
}

// Add also the cancel functions for the network db
for _, cancel := range agent.coreCancelFuncs {
cancelList = append(cancelList, cancel)
}
cancelList = append(cancelList, agent.coreCancelFuncs...)
agent.Unlock()

for _, cancel := range cancelList {
Expand Down Expand Up @@ -885,31 +881,31 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
}

if isAdd {
logrus.Debugf("handleEpTableEvent ADD %s R:%v", isAdd, eid, epRec)
logrus.Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
logrus.Errorf("failed adding service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
logrus.Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
} else {
logrus.Debugf("handleEpTableEvent DEL %s R:%v", isAdd, eid, epRec)
logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil {
logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
logrus.Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
logrus.Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions networkdb/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
TableName: tname,
Key: key,
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}

raw, err := encodeMessage(MessageTypeTableEvent, &tEvent)
Expand Down
129 changes: 85 additions & 44 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ import (
)

const (
reapInterval = 30 * time.Minute
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
// The garbage collection logic for entries leverage the presence of the network.
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
reapEntryInterval = 30 * time.Minute
reapNetworkInterval = reapEntryInterval + 5*reapPeriod
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
)

type logWriter struct{}
Expand Down Expand Up @@ -98,10 +102,14 @@ func (nDB *NetworkDB) RemoveKey(key []byte) {
}

func (nDB *NetworkDB) clusterInit() error {
nDB.lastStatsTimestamp = time.Now()
nDB.lastHealthTimestamp = nDB.lastStatsTimestamp

config := memberlist.DefaultLANConfig()
config.Name = nDB.config.NodeName
config.BindAddr = nDB.config.BindAddr
config.AdvertiseAddr = nDB.config.AdvertiseAddr
config.UDPBufferSize = nDB.config.PacketBufferSize

if nDB.config.BindPort != 0 {
config.BindPort = nDB.config.BindPort
Expand Down Expand Up @@ -199,9 +207,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
mlist := nDB.memberlist

if _, err := mlist.Join(members); err != nil {
// Incase of failure, keep retrying join until it succeeds or the cluster is shutdown.
// In case of failure, keep retrying join until it succeeds or the cluster is shutdown.
go nDB.retryJoin(members, nDB.stopCh)

return fmt.Errorf("could not join node to memberlist: %v", err)
}

Expand Down Expand Up @@ -297,8 +304,9 @@ func (nDB *NetworkDB) reconnectNode() {
// the reaper runs. NOTE nDB.reapTableEntries updates the reapTime with a readlock. This
// is safe as long as no other concurrent path touches the reapTime field.
func (nDB *NetworkDB) reapState() {
nDB.reapNetworks()
// The reapTableEntries leverage the presence of the network so garbage collect entries first
nDB.reapTableEntries()
nDB.reapNetworks()
}

func (nDB *NetworkDB) reapNetworks() {
Expand All @@ -318,43 +326,51 @@ func (nDB *NetworkDB) reapNetworks() {
}

func (nDB *NetworkDB) reapTableEntries() {
var paths []string

var nodeNetworks []string
// This is best effort, if the list of network changes will be picked up in the next cycle
nDB.RLock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
entry, ok := v.(*entry)
if !ok {
return false
}

if !entry.deleting {
return false
}
if entry.reapTime > 0 {
entry.reapTime -= reapPeriod
return false
}
paths = append(paths, path)
return false
})
for nid := range nDB.networks[nDB.config.NodeName] {
nodeNetworks = append(nodeNetworks, nid)
}
nDB.RUnlock()

nDB.Lock()
for _, path := range paths {
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]

if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}
cycleStart := time.Now()
// In order to avoid blocking the database for a long time, apply the garbage collection logic by network
// The lock is taken at the beginning of the cycle and the deletion is inline
for _, nid := range nodeNetworks {
nDB.Lock()
nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
// timeCompensation compensate in case the lock took some time to be released
timeCompensation := time.Since(cycleStart)
entry, ok := v.(*entry)
if !ok || !entry.deleting {
return false
}

if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}
// In this check we are adding an extra 1 second to guarantee that when the number is truncated to int32 to fit the packet
// for the tableEvent the number is always strictly > 1 and never 0
if entry.reapTime > reapPeriod+timeCompensation+time.Second {
entry.reapTime -= reapPeriod + timeCompensation
return false
}

params := strings.Split(path[1:], "/")
nid := params[0]
tname := params[1]
key := params[2]

okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
if !okTable {
logrus.Errorf("Table tree delete failed, entry with key:%s does not exists in the table:%s network:%s", key, tname, nid)
}
if !okNetwork {
logrus.Errorf("Network tree delete failed, entry with key:%s does not exists in the network:%s table:%s", key, nid, tname)
}

return false
})
nDB.Unlock()
}
nDB.Unlock()
}

func (nDB *NetworkDB) gossip() {
Expand All @@ -365,11 +381,21 @@ func (nDB *NetworkDB) gossip() {
networkNodes[nid] = nDB.networkNodes[nid]

}
printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
printHealth := time.Since(nDB.lastHealthTimestamp) >= nDB.config.HealthPrintPeriod
nDB.RUnlock()

if printHealth {
healthScore := nDB.memberlist.GetHealthScore()
if healthScore != 0 {
logrus.Warnf("NetworkDB stats - healthscore:%d (connectivity issues)", healthScore)
}
nDB.lastHealthTimestamp = time.Now()
}

for nid, nodes := range networkNodes {
mNodes := nDB.mRandomNodes(3, nodes)
bytesAvail := udpSendBuf - compoundHeaderOverhead
bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead

nDB.RLock()
network, ok := thisNodeNetworks[nid]
Expand All @@ -390,6 +416,15 @@ func (nDB *NetworkDB) gossip() {
}

msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("NetworkDB stats - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(),
network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
}

if len(msgs) == 0 {
continue
}
Expand All @@ -407,11 +442,15 @@ func (nDB *NetworkDB) gossip() {
}

// Send the compound message
if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil {
if err := nDB.memberlist.SendBestEffort(&mnode.Node, compound); err != nil {
logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
}
}
}
// Reset the stats
if printStats {
nDB.lastStatsTimestamp = time.Now()
}
}

func (nDB *NetworkDB) bulkSyncTables() {
Expand Down Expand Up @@ -547,6 +586,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
TableName: params[1],
Key: params[2],
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}

msg, err := encodeMessage(MessageTypeTableEvent, &tEvent)
Expand Down Expand Up @@ -582,7 +623,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
nDB.bulkSyncAckTbl[node] = ch
nDB.Unlock()

err = nDB.memberlist.SendToTCP(&mnode.Node, buf)
err = nDB.memberlist.SendReliable(&mnode.Node, buf)
if err != nil {
nDB.Lock()
delete(nDB.bulkSyncAckTbl, node)
Expand All @@ -599,7 +640,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
case <-t.C:
logrus.Errorf("Bulk sync to node %s timed out", node)
case <-ch:
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime))
}
t.Stop()
}
Expand Down
Loading

0 comments on commit 2966d33

Please sign in to comment.