Skip to content

Commit

Permalink
Fix reapTime logic in NetworkDB
Browse files Browse the repository at this point in the history
- Added remainingReapTime field in the table event.
  Wihtout it a node that did not have a state for the element
  was marking the element for deletion setting the max reapTime.
  This was creating the possibility to keep the entry being resync
  between nodes forever avoding the purpose of the reap time
  itself.

- On broadcast of the table event the node owner was rewritten
  with the local node name, this was not correct because the owner
  should continue to remain the original one of the message

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed Sep 20, 2017
1 parent c34e58a commit 7a0e447
Show file tree
Hide file tree
Showing 8 changed files with 565 additions and 353 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -38,3 +38,4 @@ cmd/dnet/dnet

libnetworkbuild.created
test/networkDb/testMain
test/networkDb/gossipdb
2 changes: 2 additions & 0 deletions networkdb/broadcast.go
Expand Up @@ -134,6 +134,8 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
TableName: tname,
Key: key,
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}

raw, err := encodeMessage(MessageTypeTableEvent, &tEvent)
Expand Down
13 changes: 8 additions & 5 deletions networkdb/cluster.go
Expand Up @@ -349,11 +349,11 @@ func (nDB *NetworkDB) reapTableEntries() {
nid := params[1]
key := params[2]

if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
if !okTable {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}

if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
if !okNetwork {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}
}
Expand Down Expand Up @@ -406,8 +406,9 @@ func (nDB *NetworkDB) gossip() {
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d",
nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d",
nid, network.entriesNumber, broadcastQ.NumQueued(), broadcastQ.NumNodes(),
network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
}

Expand Down Expand Up @@ -572,6 +573,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
TableName: params[1],
Key: params[2],
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}

msg, err := encodeMessage(MessageTypeTableEvent, &tEvent)
Expand Down
34 changes: 17 additions & 17 deletions networkdb/delegate.go
@@ -1,9 +1,9 @@
package networkdb

import (
"fmt"
"net"
"strings"
"time"

"github.com/gogo/protobuf/proto"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -198,8 +198,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
}

func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// Update our local clock if the received messages has newer
// time.
// Update our local clock if the received messages has newer time.
nDB.tableClock.Witness(tEvent.LTime)

// Ignore the table events for networks that are in the process of going away
Expand Down Expand Up @@ -235,20 +234,26 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
node: tEvent.NodeName,
value: tEvent.Value,
deleting: tEvent.Type == TableEventTypeDelete,
reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second,
}

if e.deleting {
// All the entries marked for deletion should have a reapTime set greater than 0
// This case can happens if the cluster is running different versions of the engine where the old version does not have the
// field. In both cases we should raise a warning message
if e.deleting && e.reapTime == 0 {
logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent)
e.reapTime = reapInterval
}

nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
nDB.Unlock()

if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we didn't have the entry here don't repropagate
return true
// If it is a delete event and we did not have a state for it, don't propagate to the application
// If the residual reapTime is lower than 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
return e.reapTime >= reapPeriod/6
}

var op opType
Expand Down Expand Up @@ -303,22 +308,17 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
nDB.RUnlock()

if !ok {
return
}

broadcastQ := n.tableBroadcasts

if broadcastQ == nil {
// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
if !ok || n.leaving || n.tableBroadcasts == nil {
return
}

broadcastQ.QueueBroadcast(&tableEventMessage{
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
msg: buf,
id: tEvent.NetworkID,
tname: tEvent.TableName,
key: tEvent.Key,
node: nDB.config.NodeName,
node: tEvent.NodeName,
})
}
}
Expand Down
47 changes: 35 additions & 12 deletions networkdb/networkdb.go
Expand Up @@ -141,6 +141,9 @@ type network struct {

// Number of gossip messages sent related to this network during the last stats collection period
qMessagesSent int

// Number of entries on the network
entriesNumber int
}

// Config represents the configuration of the networdb instance and
Expand Down Expand Up @@ -338,8 +341,7 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
}

nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

return nil
Expand All @@ -365,8 +367,7 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
}

nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

return nil
Expand Down Expand Up @@ -410,8 +411,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
}

nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

return nil
Expand Down Expand Up @@ -488,12 +488,10 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
// without doing a delete of all the objects
entry.ltime++
}
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
} else {
// the local node is leaving the network, all the entries of remote nodes can be safely removed
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
nDB.deleteEntry(nid, tname, key)
}

nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
Expand All @@ -513,8 +511,7 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nid := params[1]
key := params[2]

nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
nDB.deleteEntry(nid, tname, key)

nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
return false
Expand Down Expand Up @@ -679,3 +676,29 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
n.ltime = ltime
}
}

// createOrUpdateEntry this function handles the creation or update of entries into the local
// tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated)
func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (bool, bool) {
_, okTable := nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
_, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
if !okNetwork {
// Add only if it is an insert not an update
n := nDB.networks[nDB.config.NodeName][nid]
n.entriesNumber++
}
return okTable, okNetwork
}

// deleteEntry this function handles the deletion of entries into the local tree store.
// It is also used to keep in sync the entries number of the network (all tables are aggregated)
func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
_, okTable := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
_, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
if okNetwork {
// Remove only if the delete is successful
n := nDB.networks[nDB.config.NodeName][nid]
n.entriesNumber--
}
return okTable, okNetwork
}

0 comments on commit 7a0e447

Please sign in to comment.