Skip to content

Commit

Permalink
Merge pull request #1704 from sanimej/cphard
Browse files Browse the repository at this point in the history
control-plane hardening: cleanup local state on peer leaving a network
  • Loading branch information
aboch authored Apr 10, 2017
2 parents 6c94115 + 4b2279d commit b09185d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
16 changes: 15 additions & 1 deletion networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,25 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
}

func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
var flushEntries bool
// Update our local clock if the received messages has newer
// time.
nDB.networkClock.Witness(nEvent.LTime)

nDB.Lock()
defer nDB.Unlock()
defer func() {
nDB.Unlock()
// When a node leaves a network on the last task removal cleanup the
// local entries for this network & node combination. When the tasks
// on a network are removed we could have missed the gossip updates.
// Not doing this cleanup can leave stale entries because bulksyncs
// from the node will no longer include this network state.
//
// deleteNodeNetworkEntries takes nDB lock.
if flushEntries {
nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName)
}
}()

if nEvent.NodeName == nDB.config.NodeName {
return false
Expand Down Expand Up @@ -121,6 +134,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
n.leaving = nEvent.Type == NetworkEventTypeLeave
if n.leaving {
n.reapTime = reapInterval
flushEntries = true
}

nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
Expand Down
31 changes: 31 additions & 0 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,37 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
nDB.Unlock()
}

func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
nDB.Lock()
nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
func(path string, v interface{}) bool {
oldEntry := v.(*entry)
params := strings.Split(path[1:], "/")
nid := params[0]
tname := params[1]
key := params[2]

if oldEntry.node != node {
return false
}

entry := &entry{
ltime: oldEntry.ltime,
node: node,
value: oldEntry.value,
deleting: true,
reapTime: reapInterval,
}

nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)

nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
return false
})
nDB.Unlock()
}

func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nDB.Lock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
Expand Down

0 comments on commit b09185d

Please sign in to comment.