-
Notifications
You must be signed in to change notification settings - Fork 881
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix reapTime logic in NetworkDB #1944
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,3 +38,4 @@ cmd/dnet/dnet | |
|
||
libnetworkbuild.created | ||
test/networkDb/testMain | ||
test/networkDb/gossipdb |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
package networkdb | ||
|
||
import ( | ||
"fmt" | ||
"net" | ||
"strings" | ||
"time" | ||
|
||
"github.com/gogo/protobuf/proto" | ||
"github.com/sirupsen/logrus" | ||
|
@@ -165,7 +165,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { | |
n.ltime = nEvent.LTime | ||
n.leaving = nEvent.Type == NetworkEventTypeLeave | ||
if n.leaving { | ||
n.reapTime = reapInterval | ||
n.reapTime = reapNetworkInterval | ||
|
||
// The remote node is leaving the network, but not the gossip cluster. | ||
// Mark all its entries in deleted state, this will guarantee that | ||
|
@@ -198,8 +198,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { | |
} | ||
|
||
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { | ||
// Update our local clock if the received messages has newer | ||
// time. | ||
// Update our local clock if the received messages has newer time. | ||
nDB.tableClock.Witness(tEvent.LTime) | ||
|
||
// Ignore the table events for networks that are in the process of going away | ||
|
@@ -235,20 +234,26 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { | |
node: tEvent.NodeName, | ||
value: tEvent.Value, | ||
deleting: tEvent.Type == TableEventTypeDelete, | ||
reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second, | ||
} | ||
|
||
if e.deleting { | ||
e.reapTime = reapInterval | ||
// All the entries marked for deletion should have a reapTime set greater than 0 | ||
// This case can happen if the cluster is running different versions of the engine where the old version does not have the | ||
// field. If that is not the case, this can be a BUG | ||
if e.deleting && e.reapTime == 0 { | ||
logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent) | ||
e.reapTime = reapEntryInterval | ||
} | ||
|
||
nDB.Lock() | ||
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e) | ||
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e) | ||
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e) | ||
nDB.Unlock() | ||
|
||
if err != nil && tEvent.Type == TableEventTypeDelete { | ||
// If it is a delete event and we didn't have the entry here don't repropagate | ||
return true | ||
// If it is a delete event and we did not have a state for it, don't propagate to the application | ||
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around | ||
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too. | ||
return e.reapTime > reapPeriod/6 | ||
} | ||
|
||
var op opType | ||
|
@@ -303,22 +308,17 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { | |
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID] | ||
nDB.RUnlock() | ||
|
||
if !ok { | ||
return | ||
} | ||
|
||
broadcastQ := n.tableBroadcasts | ||
|
||
if broadcastQ == nil { | ||
// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present | ||
if !ok || n.leaving || n.tableBroadcasts == nil { | ||
return | ||
} | ||
|
||
broadcastQ.QueueBroadcast(&tableEventMessage{ | ||
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{ | ||
msg: buf, | ||
id: tEvent.NetworkID, | ||
tname: tEvent.TableName, | ||
key: tEvent.Key, | ||
node: nDB.config.NodeName, | ||
node: tEvent.NodeName, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Being a rebroadcast event and based on the current logic of depending on the residual reaptimer, what is the purpose of this change ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if I understood correctly flavio's explanation, it's not strictly related to this PR. it's an issue he saw while he was looking at the code. Without this change, when the event was rebroadcasted, the node original node emitting the event was replaced with the one rebroadcasting it. So you could receive event saying it originated from one node, but actually came from another one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The node is simply rebroadcasting the message of another node, at the top of the function there is the check that skips messages where the owner matches with the current node. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok. this is a bit subtle and I think a good testing coverage will help mitigate the risk. |
||
}) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we 100% sure the path will have 4
/
in it ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 4? it is actually 3 like:
fmt.Sprintf("/%s/%s/%s", nid, tname, key)
Also the insertion and deletion is now contained in single functions: createOrUpdateEntry and deleteEntry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I meant 3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. this is the current expectation in the code and as @fcrisciani mentioned it is all organized now under the same functions and this assumption is valid.