Skip to content

Commit

Permalink
Changed ReapTable logic
Browse files Browse the repository at this point in the history
- Changed the loop per network. Previous implementation was taking a
  ReadLock to update the reapTime but now with the residualReapTime
  also the bulkSync is using the same ReadLock creating possible
  issues in concurrent read and update of the value.
  The new logic fetches the list of networks and proceed to the
  cleanup network by network locking the database and releasing it
  after each network. This should ensure a fair locking avoiding
  to keep the database blocked for too much time.

  Note: The ticker does not guarantee that the reap logic runs
  precisely every reapTimePeriod, actually documentation says that
  if the routine is too long will skip ticks. In case of slowdown
  of the process itself it is possible that the lifetime of the
  deleted entries increases, it still should not be a huge problem
  because now the residual reaptime is propagated among all the nodes
  a slower node will let the deleted entry being repropagate multiple
  times but the state will still remain consistent.

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed Sep 20, 2017
1 parent 7a0e447 commit f3a3aca
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 40 deletions.
74 changes: 40 additions & 34 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,43 +321,49 @@ func (nDB *NetworkDB) reapNetworks() {
}

func (nDB *NetworkDB) reapTableEntries() {
var paths []string

var nodeNetworks []string
// This is best effort, if the list of network changes will be picked up in the next cycle
nDB.RLock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
entry, ok := v.(*entry)
if !ok {
return false
}

if !entry.deleting {
return false
}
if entry.reapTime > 0 {
entry.reapTime -= reapPeriod
return false
}
paths = append(paths, path)
return false
})
for nid := range nDB.networks[nDB.config.NodeName] {
nodeNetworks = append(nodeNetworks, nid)
}
nDB.RUnlock()

nDB.Lock()
for _, path := range paths {
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]

okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
if !okTable {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}
if !okNetwork {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}
cycleStart := time.Now()
// In order to avoid blocking the database for a long time, apply the garbage collection logic by network
// The lock is taken at the beginning of the cycle and the deletion is inline
for _, nid := range nodeNetworks {
nDB.Lock()
nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
// timeCompensation compensate in case the lock took some time to be released
timeCompensation := time.Since(cycleStart)
entry, ok := v.(*entry)
if !ok || !entry.deleting {
return false
}

if entry.reapTime > reapPeriod+timeCompensation {
entry.reapTime -= reapPeriod + timeCompensation
return false
}

params := strings.Split(path[1:], "/")
nid := params[0]
tname := params[1]
key := params[2]

okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
if !okTable {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}
if !okNetwork {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}

return false
})
nDB.Unlock()
}
nDB.Unlock()
}

func (nDB *NetworkDB) gossip() {
Expand Down Expand Up @@ -406,7 +412,7 @@ func (nDB *NetworkDB) gossip() {
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d",
logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d",
nid, network.entriesNumber, broadcastQ.NumQueued(), broadcastQ.NumNodes(),
network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
Expand Down
4 changes: 2 additions & 2 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {

if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we did not have a state for it, don't propagate to the application
// If the residual reapTime is lower than 1/6 of the total reapTime don't bother broadcasting it around
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
return e.reapTime >= reapPeriod/6
return e.reapTime > reapPeriod/6
}

var op opType
Expand Down
18 changes: 14 additions & 4 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
nodeNetworks = make(map[string]*network)
nDB.networks[nDB.config.NodeName] = nodeNetworks
}
_, ok = nodeNetworks[nid]
if ok {
// Already part of this network nothing to do
nDB.Unlock()
return nil
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime}
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
Expand Down Expand Up @@ -684,8 +690,10 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac
_, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
if !okNetwork {
// Add only if it is an insert not an update
n := nDB.networks[nDB.config.NodeName][nid]
n.entriesNumber++
n, ok := nDB.networks[nDB.config.NodeName][nid]
if ok {
n.entriesNumber++
}
}
return okTable, okNetwork
}
Expand All @@ -697,8 +705,10 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
_, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
if okNetwork {
// Remove only if the delete is successful
n := nDB.networks[nDB.config.NodeName][nid]
n.entriesNumber--
n, ok := nDB.networks[nDB.config.NodeName][nid]
if ok {
n.entriesNumber--
}
}
return okTable, okNetwork
}

0 comments on commit f3a3aca

Please sign in to comment.