Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reapTime logic in NetworkDB #1944

Merged
merged 3 commits into from
Sep 22, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ cmd/dnet/dnet

libnetworkbuild.created
test/networkDb/testMain
test/networkDb/gossipdb
2 changes: 2 additions & 0 deletions networkdb/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
TableName: tname,
Key: key,
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}

raw, err := encodeMessage(MessageTypeTableEvent, &tEvent)
Expand Down
79 changes: 45 additions & 34 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,43 +321,51 @@ func (nDB *NetworkDB) reapNetworks() {
}

func (nDB *NetworkDB) reapTableEntries() {
var paths []string

var nodeNetworks []string
// This is best effort, if the list of network changes will be picked up in the next cycle
nDB.RLock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
entry, ok := v.(*entry)
if !ok {
return false
}

if !entry.deleting {
return false
}
if entry.reapTime > 0 {
entry.reapTime -= reapPeriod
return false
}
paths = append(paths, path)
return false
})
for nid := range nDB.networks[nDB.config.NodeName] {
nodeNetworks = append(nodeNetworks, nid)
}
nDB.RUnlock()

nDB.Lock()
for _, path := range paths {
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]

if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}
cycleStart := time.Now()
// In order to avoid blocking the database for a long time, apply the garbage collection logic by network
// The lock is taken at the beginning of the cycle and the deletion is inline
for _, nid := range nodeNetworks {
nDB.Lock()
nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
// timeCompensation compensate in case the lock took some time to be released
timeCompensation := time.Since(cycleStart)
entry, ok := v.(*entry)
if !ok || !entry.deleting {
return false
}

if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}
// In this check we are adding an extra 1 second to guarantee that when the number is truncated to int32 to fit the packet
// for the tableEvent the number is always strictly > 1 and never 0
if entry.reapTime > reapPeriod+timeCompensation+time.Second {
entry.reapTime -= reapPeriod + timeCompensation
return false
}

params := strings.Split(path[1:], "/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we 100% sure the path will have 4 / in it ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 4? it is actually 3 like: fmt.Sprintf("/%s/%s/%s", nid, tname, key)
Also the insertion and deletion is now contained in single functions: createOrUpdateEntry and deleteEntry

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I meant 3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. this is the current expectation in the code and as @fcrisciani mentioned it is all organized now under the same functions and this assumption is valid.

nid := params[0]
tname := params[1]
key := params[2]

okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
if !okTable {
logrus.Errorf("Table tree delete failed, entry with key:%s does not exists in the table:%s network:%s", key, tname, nid)
}
if !okNetwork {
logrus.Errorf("Network tree delete failed, entry with key:%s does not exists in the network:%s table:%s", key, nid, tname)
}

return false
})
nDB.Unlock()
}
nDB.Unlock()
}

func (nDB *NetworkDB) gossip() {
Expand Down Expand Up @@ -406,8 +414,9 @@ func (nDB *NetworkDB) gossip() {
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d",
nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d",
nid, network.entriesNumber, broadcastQ.NumQueued(), broadcastQ.NumNodes(),
network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
}

Expand Down Expand Up @@ -572,6 +581,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
TableName: params[1],
Key: params[2],
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}

msg, err := encodeMessage(MessageTypeTableEvent, &tEvent)
Expand Down
34 changes: 17 additions & 17 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package networkdb

import (
"fmt"
"net"
"strings"
"time"

"github.com/gogo/protobuf/proto"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -198,8 +198,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
}

func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// Update our local clock if the received messages has newer
// time.
// Update our local clock if the received messages has newer time.
nDB.tableClock.Witness(tEvent.LTime)

// Ignore the table events for networks that are in the process of going away
Expand Down Expand Up @@ -235,20 +234,26 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
node: tEvent.NodeName,
value: tEvent.Value,
deleting: tEvent.Type == TableEventTypeDelete,
reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second,
}

if e.deleting {
// All the entries marked for deletion should have a reapTime set greater than 0
// This case can happen if the cluster is running different versions of the engine where the old version does not have the
// field. If that is not the case, this can be a BUG
if e.deleting && e.reapTime == 0 {
logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent)
e.reapTime = reapInterval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change of behavior expected here ?
shouldn't we log when e.reapTime == 0 but still do e.reapTime = reapInterval if it is not ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, this is actually the point of this patch, the reapTime should be the one passed in the proto (line 237).
If that is 0 and the entry is marked for deletion we are in front of a bug (nodes does not set the value properly) or we are dealing with an old client that does not set it because does not have the new proto.

The old behavior was such that no matter when you get the notification the clock was being set to the max value of 30 min

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except in that very unfortunate circumstance that the ResidualReapTime populated by the peer has infact gone down to exactly 0 seconds at the time of populating the message (https://github.com/docker/libnetwork/pull/1944/files#diff-ced1ed36c3141cb15bdf5acafed78d67R583). If that happens in such a rare situation, we will reset this back to 30 seconds and the event will get cleaned up in 30 seconds (or in the more rarest of occasions we get another genuine 0 seconds ResidualReapTime).

@fcrisciani is that correct ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW. yes. This is the actual point of this patch and I think it is a good idea to get this done.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mavenugo your point is correct only for the case of the truncation of the number. I will add an extra check that the number truncated is still strictly greater than 0.
The improvement here is that the lock is taken only 1 time and not following the same pattern of saving the keys to delete and then do another semaphore race to delete them. So right now the only case in this patch that turns out to be 0 is when the residual time in seconds is 0.xxxx that truncated will be 0.

}

nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
nDB.Unlock()

if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we didn't have the entry here don't repropagate
return true
// If it is a delete event and we did not have a state for it, don't propagate to the application
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
return e.reapTime > reapPeriod/6
}

var op opType
Expand Down Expand Up @@ -303,22 +308,17 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
nDB.RUnlock()

if !ok {
return
}

broadcastQ := n.tableBroadcasts

if broadcastQ == nil {
// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
if !ok || n.leaving || n.tableBroadcasts == nil {
return
}

broadcastQ.QueueBroadcast(&tableEventMessage{
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
msg: buf,
id: tEvent.NetworkID,
tname: tEvent.TableName,
key: tEvent.Key,
node: nDB.config.NodeName,
node: tEvent.NodeName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being a rebroadcast event and based on the current logic of depending on the residual reaptimer, what is the purpose of this change ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I understood correctly flavio's explanation, it's not strictly related to this PR. it's an issue he saw while he was looking at the code.

Without this change, when the event was rebroadcasted, the node original node emitting the event was replaced with the one rebroadcasting it. So you could receive event saying it originated from one node, but actually came from another one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The node is simply rebroadcasting the message of another node, at the top of the function there is the check that skips messages where the owner matches with the current node.
Without this change the ownership of the entry is messed up and can cause issues on deletion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. this is a bit subtle and I think a good testing coverage will help mitigate the risk.

})
}
}
Expand Down
63 changes: 49 additions & 14 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ type network struct {

// Number of gossip messages sent related to this network during the last stats collection period
qMessagesSent int

// Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
// Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
// interval
entriesNumber int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is used only for debugging purposes... It would be good to write a comment reflecting the purpose and we should not be tempted to use this as a reference counter for any logic in the future.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add extra comments

}

// Config represents the configuration of the networdb instance and
Expand Down Expand Up @@ -338,8 +343,7 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
}

nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

return nil
Expand All @@ -365,8 +369,7 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
}

nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

return nil
Expand Down Expand Up @@ -410,8 +413,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
}

nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()

return nil
Expand Down Expand Up @@ -473,7 +475,7 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {

entry := &entry{
ltime: oldEntry.ltime,
node: node,
node: oldEntry.node,
value: oldEntry.value,
deleting: true,
reapTime: reapInterval,
Expand All @@ -488,12 +490,10 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
// without doing a delete of all the objects
entry.ltime++
}
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
} else {
// the local node is leaving the network, all the entries of remote nodes can be safely removed
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
nDB.deleteEntry(nid, tname, key)
}

nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
Expand All @@ -513,8 +513,7 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nid := params[1]
key := params[2]

nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
nDB.deleteEntry(nid, tname, key)

nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
return false
Expand Down Expand Up @@ -558,7 +557,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
nodeNetworks = make(map[string]*network)
nDB.networks[nDB.config.NodeName] = nodeNetworks
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime}
n, ok := nodeNetworks[nid]
var entries int
if ok {
entries = n.entriesNumber
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
nDB.RLock()
Expand All @@ -567,6 +571,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
},
RetransmitMult: 4,
}

nDB.addNetworkNode(nid, nDB.config.NodeName)
networkNodes := nDB.networkNodes[nid]
nDB.Unlock()
Expand Down Expand Up @@ -679,3 +684,33 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
n.ltime = ltime
}
}

// createOrUpdateEntry this function handles the creation or update of entries into the local
// tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated)
func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (bool, bool) {
_, okTable := nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
_, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
if !okNetwork {
// Add only if it is an insert not an update
n, ok := nDB.networks[nDB.config.NodeName][nid]
if ok {
n.entriesNumber++
}
}
return okTable, okNetwork
}

// deleteEntry this function handles the deletion of entries into the local tree store.
// It is also used to keep in sync the entries number of the network (all tables are aggregated)
func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
_, okTable := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
_, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
if okNetwork {
// Remove only if the delete is successful
n, ok := nDB.networks[nDB.config.NodeName][nid]
if ok {
n.entriesNumber--
}
}
return okTable, okNetwork
}
Loading