Skip to content

Commit

Permalink
Merge pull request #1944 from fcrisciani/netdb-fix-reap
Browse files Browse the repository at this point in the history
Fix reapTime logic in NetworkDB
  • Loading branch information
mavenugo authored Sep 22, 2017
2 parents c34e58a + fbba555 commit 0f08d31
Show file tree
Hide file tree
Showing 10 changed files with 644 additions and 400 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ cmd/dnet/dnet

libnetworkbuild.created
test/networkDb/testMain
test/networkDb/gossipdb
2 changes: 2 additions & 0 deletions networkdb/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
TableName: tname,
Key: key,
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}

raw, err := encodeMessage(MessageTypeTableEvent, &tEvent)
Expand Down
96 changes: 56 additions & 40 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ import (
)

const (
reapInterval = 30 * time.Minute
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
// The garbage collection logic for entries leverage the presence of the network.
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
reapEntryInterval = 30 * time.Minute
reapNetworkInterval = reapEntryInterval + 5*reapPeriod
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
)

type logWriter struct{}
Expand Down Expand Up @@ -300,8 +304,9 @@ func (nDB *NetworkDB) reconnectNode() {
// the reaper runs. NOTE nDB.reapTableEntries updates the reapTime with a readlock. This
// is safe as long as no other concurrent path touches the reapTime field.
func (nDB *NetworkDB) reapState() {
nDB.reapNetworks()
// The reapTableEntries leverage the presence of the network so garbage collect entries first
nDB.reapTableEntries()
nDB.reapNetworks()
}

func (nDB *NetworkDB) reapNetworks() {
Expand All @@ -321,43 +326,51 @@ func (nDB *NetworkDB) reapNetworks() {
}

func (nDB *NetworkDB) reapTableEntries() {
var paths []string

var nodeNetworks []string
// This is best effort, if the list of network changes will be picked up in the next cycle
nDB.RLock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
entry, ok := v.(*entry)
if !ok {
return false
}

if !entry.deleting {
return false
}
if entry.reapTime > 0 {
entry.reapTime -= reapPeriod
return false
}
paths = append(paths, path)
return false
})
for nid := range nDB.networks[nDB.config.NodeName] {
nodeNetworks = append(nodeNetworks, nid)
}
nDB.RUnlock()

nDB.Lock()
for _, path := range paths {
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]

if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}
cycleStart := time.Now()
// In order to avoid blocking the database for a long time, apply the garbage collection logic by network
// The lock is taken at the beginning of the cycle and the deletion is inline
for _, nid := range nodeNetworks {
nDB.Lock()
nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
// timeCompensation compensate in case the lock took some time to be released
timeCompensation := time.Since(cycleStart)
entry, ok := v.(*entry)
if !ok || !entry.deleting {
return false
}

if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}
// In this check we are adding an extra 1 second to guarantee that when the number is truncated to int32 to fit the packet
// for the tableEvent the number is always strictly > 1 and never 0
if entry.reapTime > reapPeriod+timeCompensation+time.Second {
entry.reapTime -= reapPeriod + timeCompensation
return false
}

params := strings.Split(path[1:], "/")
nid := params[0]
tname := params[1]
key := params[2]

okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
if !okTable {
logrus.Errorf("Table tree delete failed, entry with key:%s does not exists in the table:%s network:%s", key, tname, nid)
}
if !okNetwork {
logrus.Errorf("Network tree delete failed, entry with key:%s does not exists in the network:%s table:%s", key, nid, tname)
}

return false
})
nDB.Unlock()
}
nDB.Unlock()
}

func (nDB *NetworkDB) gossip() {
Expand Down Expand Up @@ -406,8 +419,9 @@ func (nDB *NetworkDB) gossip() {
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d",
nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
logrus.Infof("NetworkDB stats - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(),
network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
}

Expand Down Expand Up @@ -572,6 +586,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
TableName: params[1],
Key: params[2],
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}

msg, err := encodeMessage(MessageTypeTableEvent, &tEvent)
Expand Down
38 changes: 19 additions & 19 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package networkdb

import (
"fmt"
"net"
"strings"
"time"

"github.com/gogo/protobuf/proto"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -165,7 +165,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
n.ltime = nEvent.LTime
n.leaving = nEvent.Type == NetworkEventTypeLeave
if n.leaving {
n.reapTime = reapInterval
n.reapTime = reapNetworkInterval

// The remote node is leaving the network, but not the gossip cluster.
// Mark all its entries in deleted state, this will guarantee that
Expand Down Expand Up @@ -198,8 +198,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
}

func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// Update our local clock if the received messages has newer
// time.
// Update our local clock if the received messages has newer time.
nDB.tableClock.Witness(tEvent.LTime)

// Ignore the table events for networks that are in the process of going away
Expand Down Expand Up @@ -235,20 +234,26 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
node: tEvent.NodeName,
value: tEvent.Value,
deleting: tEvent.Type == TableEventTypeDelete,
reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second,
}

if e.deleting {
e.reapTime = reapInterval
// All the entries marked for deletion should have a reapTime set greater than 0
// This case can happen if the cluster is running different versions of the engine where the old version does not have the
// field. If that is not the case, this can be a BUG
if e.deleting && e.reapTime == 0 {
logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent)
e.reapTime = reapEntryInterval
}

nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
nDB.Unlock()

if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we didn't have the entry here don't repropagate
return true
// If it is a delete event and we did not have a state for it, don't propagate to the application
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
return e.reapTime > reapPeriod/6
}

var op opType
Expand Down Expand Up @@ -303,22 +308,17 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
nDB.RUnlock()

if !ok {
return
}

broadcastQ := n.tableBroadcasts

if broadcastQ == nil {
// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
if !ok || n.leaving || n.tableBroadcasts == nil {
return
}

broadcastQ.QueueBroadcast(&tableEventMessage{
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
msg: buf,
id: tEvent.NetworkID,
tname: tEvent.TableName,
key: tEvent.Key,
node: nDB.config.NodeName,
node: tEvent.NodeName,
})
}
}
Expand Down
70 changes: 53 additions & 17 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ type network struct {

// Number of gossip messages sent related to this network during the last stats collection period
qMessagesSent int

// Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
// Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
// interval
entriesNumber int
}

// Config represents the configuration of the networdb instance and
Expand Down Expand Up @@ -338,8 +343,7 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
}

nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

return nil
Expand All @@ -365,8 +369,7 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
}

nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

return nil
Expand Down Expand Up @@ -402,16 +405,15 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
node: nDB.config.NodeName,
value: value,
deleting: true,
reapTime: reapInterval,
reapTime: reapEntryInterval,
}

if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send table delete event: %v", err)
}

nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

return nil
Expand Down Expand Up @@ -473,10 +475,10 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {

entry := &entry{
ltime: oldEntry.ltime,
node: node,
node: oldEntry.node,
value: oldEntry.value,
deleting: true,
reapTime: reapInterval,
reapTime: reapEntryInterval,
}

// we arrived at this point in 2 cases:
Expand All @@ -488,12 +490,10 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
// without doing a delete of all the objects
entry.ltime++
}
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
} else {
// the local node is leaving the network, all the entries of remote nodes can be safely removed
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
nDB.deleteEntry(nid, tname, key)
}

nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
Expand All @@ -513,8 +513,7 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nid := params[1]
key := params[2]

nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
nDB.deleteEntry(nid, tname, key)

nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
return false
Expand Down Expand Up @@ -558,7 +557,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
nodeNetworks = make(map[string]*network)
nDB.networks[nDB.config.NodeName] = nodeNetworks
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime}
n, ok := nodeNetworks[nid]
var entries int
if ok {
entries = n.entriesNumber
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
nDB.RLock()
Expand All @@ -567,6 +571,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
},
RetransmitMult: 4,
}

nDB.addNetworkNode(nid, nDB.config.NodeName)
networkNodes := nDB.networkNodes[nid]
nDB.Unlock()
Expand Down Expand Up @@ -614,8 +619,9 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
return fmt.Errorf("could not find network %s while trying to leave", nid)
}

logrus.Debugf("%s: leaving network %s", nDB.config.NodeName, nid)
n.ltime = ltime
n.reapTime = reapInterval
n.reapTime = reapNetworkInterval
n.leaving = true
return nil
}
Expand Down Expand Up @@ -679,3 +685,33 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
n.ltime = ltime
}
}

// createOrUpdateEntry this function handles the creation or update of entries into the local
// tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated)
func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (bool, bool) {
_, okTable := nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
_, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
if !okNetwork {
// Add only if it is an insert not an update
n, ok := nDB.networks[nDB.config.NodeName][nid]
if ok {
n.entriesNumber++
}
}
return okTable, okNetwork
}

// deleteEntry this function handles the deletion of entries into the local tree store.
// It is also used to keep in sync the entries number of the network (all tables are aggregated)
func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
_, okTable := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
_, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
if okNetwork {
// Remove only if the delete is successful
n, ok := nDB.networks[nDB.config.NodeName][nid]
if ok {
n.entriesNumber--
}
}
return okTable, okNetwork
}
Loading

0 comments on commit 0f08d31

Please sign in to comment.