From b38872971885c0e287b5bdc31b5f0c1d4054ef1d Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Tue, 19 Sep 2017 17:36:00 -0700 Subject: [PATCH] Changed ReapTable logic - Changed the loop per network. Previous implementation was taking a ReadLock to update the reapTime but now with the residualReapTime also the bulkSync is using the same ReadLock creating possible issues in concurrent read and update of the value. The new logic fetches the list of networks and proceed to the cleanup network by network locking the database and releasing it after each network. This should ensure a fair locking avoiding to keep the database blocked for too much time. Note: The ticker does not guarantee that the reap logic runs precisely every reapTimePeriod, actually documentation says that if the routine is too long will skip ticks. In case of slowdown of the process itself it is possible that the lifetime of the deleted entries increases, it still should not be a huge problem because now the residual reaptime is propagated among all the nodes a slower node will let the deleted entry being repropagate multiple times but the state will still remain consistent. Signed-off-by: Flavio Crisciani --- networkdb/cluster.go | 74 +++++++++++++++------------- networkdb/delegate.go | 8 +-- networkdb/networkdb.go | 42 +++++++++++----- networkdb/networkdb_test.go | 6 +++ test/networkDb/README | 2 +- test/networkDb/dbclient/ndbClient.go | 10 ++-- 6 files changed, 85 insertions(+), 57 deletions(-) diff --git a/networkdb/cluster.go b/networkdb/cluster.go index eb20524cc5..ca1a959480 100644 --- a/networkdb/cluster.go +++ b/networkdb/cluster.go @@ -321,43 +321,49 @@ func (nDB *NetworkDB) reapNetworks() { } func (nDB *NetworkDB) reapTableEntries() { - var paths []string - + var nodeNetworks []string + // This is best effort, if the list of network changes will be picked up in the next cycle nDB.RLock() - nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { - entry, ok := v.(*entry) - if !ok { - return false - } - - if !entry.deleting { - return false - } - if entry.reapTime > 0 { - entry.reapTime -= reapPeriod - return false - } - paths = append(paths, path) - return false - }) + for nid := range nDB.networks[nDB.config.NodeName] { + nodeNetworks = append(nodeNetworks, nid) + } nDB.RUnlock() - nDB.Lock() - for _, path := range paths { - params := strings.Split(path[1:], "/") - tname := params[0] - nid := params[1] - key := params[2] - - okTable, okNetwork := nDB.deleteEntry(nid, tname, key) - if !okTable { - logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key) - } - if !okNetwork { - logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key) - } + cycleStart := time.Now() + // In order to avoid blocking the database for a long time, apply the garbage collection logic by network + // The lock is taken at the beginning of the cycle and the deletion is inline + for _, nid := range nodeNetworks { + nDB.Lock() + nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool { + // timeCompensation compensate in case the lock took some time to be released + timeCompensation := time.Since(cycleStart) + entry, ok := v.(*entry) + if !ok || !entry.deleting { + return false + } + + if entry.reapTime > reapPeriod+timeCompensation { + entry.reapTime -= reapPeriod + timeCompensation + return false + } + + params := strings.Split(path[1:], "/") + nid := params[0] + tname := params[1] + key := params[2] + + okTable, okNetwork := nDB.deleteEntry(nid, tname, key) + if !okTable { + logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key) + } + if !okNetwork { + logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key) + } + + return false + }) + nDB.Unlock() } - nDB.Unlock() } func (nDB *NetworkDB) gossip() { @@ -406,7 +412,7 @@ func (nDB *NetworkDB) gossip() { // Collect stats and print the queue info, note this code is here also to have a view of the queues empty network.qMessagesSent += len(msgs) if printStats { - logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d", + logrus.Infof("NetworkDB stats - net:%s Entries:%d Queue qLen:%d netPeers:%d netMsg/s:%d", nid, network.entriesNumber, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second))) network.qMessagesSent = 0 diff --git a/networkdb/delegate.go b/networkdb/delegate.go index 9f97e4d3ee..bcddc9014b 100644 --- a/networkdb/delegate.go +++ b/networkdb/delegate.go @@ -238,8 +238,8 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { } // All the entries marked for deletion should have a reapTime set greater than 0 - // This case can happens if the cluster is running different versions of the engine where the old version does not have the - // field. In both cases we should raise a warning message + // This case can happen if the cluster is running different versions of the engine where the old version does not have the + // field. If that is not the case, this can be a BUG if e.deleting && e.reapTime == 0 { logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent) e.reapTime = reapInterval @@ -251,9 +251,9 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { if err != nil && tEvent.Type == TableEventTypeDelete { // If it is a delete event and we did not have a state for it, don't propagate to the application - // If the residual reapTime is lower than 1/6 of the total reapTime don't bother broadcasting it around + // If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around // most likely the cluster is already aware of it, if not who will sync with this node will catch the state too. - return e.reapTime >= reapPeriod/6 + return e.reapTime > reapPeriod/6 } var op opType diff --git a/networkdb/networkdb.go b/networkdb/networkdb.go index 8894abc860..95576f0eab 100644 --- a/networkdb/networkdb.go +++ b/networkdb/networkdb.go @@ -473,7 +473,7 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { entry := &entry{ ltime: oldEntry.ltime, - node: node, + node: oldEntry.node, value: oldEntry.value, deleting: true, reapTime: reapInterval, @@ -555,14 +555,26 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { nodeNetworks = make(map[string]*network) nDB.networks[nDB.config.NodeName] = nodeNetworks } - nodeNetworks[nid] = &network{id: nid, ltime: ltime} - nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{ - NumNodes: func() int { - nDB.RLock() - defer nDB.RUnlock() - return len(nDB.networkNodes[nid]) - }, - RetransmitMult: 4, + n, ok := nodeNetworks[nid] + if !ok { + n = &network{ + id: nid, + ltime: ltime, + tableBroadcasts: &memberlist.TransmitLimitedQueue{ + NumNodes: func() int { + nDB.RLock() + defer nDB.RUnlock() + return len(nDB.networkNodes[nid]) + }, + RetransmitMult: 4, + }, + } + nodeNetworks[nid] = n + } else { + // The network object is already present, it is possible that is still waiting to be garbage collected + n.leaving = false + n.ltime = ltime + n.reapTime = 0 } nDB.addNetworkNode(nid, nDB.config.NodeName) networkNodes := nDB.networkNodes[nid] @@ -684,8 +696,10 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac _, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) if !okNetwork { // Add only if it is an insert not an update - n := nDB.networks[nDB.config.NodeName][nid] - n.entriesNumber++ + n, ok := nDB.networks[nDB.config.NodeName][nid] + if ok { + n.entriesNumber++ + } } return okTable, okNetwork } @@ -697,8 +711,10 @@ func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) { _, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) if okNetwork { // Remove only if the delete is successful - n := nDB.networks[nDB.config.NodeName][nid] - n.entriesNumber-- + n, ok := nDB.networks[nDB.config.NodeName][nid] + if ok { + n.entriesNumber-- + } } return okTable, okNetwork } diff --git a/networkdb/networkdb_test.go b/networkdb/networkdb_test.go index 326aaadecd..3b290e3e09 100644 --- a/networkdb/networkdb_test.go +++ b/networkdb/networkdb_test.go @@ -473,6 +473,9 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) { if len(dbs[0].networkNodes["network1"]) != 2 { t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"]) } + if n, ok := dbs[0].networks[dbs[0].config.NodeName]["network1"]; !ok || n.leaving { + t.Fatalf("The network should not be marked as leaving:%t", n.leaving) + } // Wait for the propagation on db[1] for i := 0; i < maxRetry; i++ { @@ -484,6 +487,9 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) { if len(dbs[1].networkNodes["network1"]) != 2 { t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"]) } + if n, ok := dbs[1].networks[dbs[1].config.NodeName]["network1"]; !ok || n.leaving { + t.Fatalf("The network should not be marked as leaving:%t", n.leaving) + } // Try a quick leave/join err = dbs[0].LeaveNetwork("network1") diff --git a/test/networkDb/README b/test/networkDb/README index 72a08cee5c..dbc2d1884d 100644 --- a/test/networkDb/README +++ b/test/networkDb/README @@ -1,7 +1,7 @@ SERVER cd test/networkdb -env GOOS=linux go build -v server/testMain.go && docker build -t fcrisciani/networkdb-test . +env GOOS=linux go build -v testMain.go && docker build -t fcrisciani/networkdb-test . (only for testkit case) docker push fcrisciani/networkdb-test Run server: docker service create --name testdb --network net1 --replicas 3 --env TASK_ID="{{.Task.ID}}" -p mode=host,target=8000 fcrisciani/networkdb-test server 8000 diff --git a/test/networkDb/dbclient/ndbClient.go b/test/networkDb/dbclient/ndbClient.go index beeee2a4c6..aaf2dec4d8 100644 --- a/test/networkDb/dbclient/ndbClient.go +++ b/test/networkDb/dbclient/ndbClient.go @@ -567,7 +567,7 @@ func doWriteWaitLeaveJoin(ips []string, args []string) { tableName := args[1] parallelWriters, _ := strconv.Atoi(args[2]) writeTimeSec, _ := strconv.Atoi(args[3]) - parallerlLeaver, _ := strconv.Atoi(args[4]) + parallelLeaver, _ := strconv.Atoi(args[4]) // Start parallel writers that will create and delete unique keys doneCh := make(chan resultTuple, parallelWriters) @@ -586,23 +586,23 @@ func doWriteWaitLeaveJoin(ips []string, args []string) { keysExpected := keyMap[totalWrittenKeys] // The Leavers will leave the network - for i := 0; i < parallerlLeaver; i++ { + for i := 0; i < parallelLeaver; i++ { logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i]) go leaveNetwork(ips[i], servicePort, networkName, doneCh) // Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed keysExpected -= keyMap[ips[i]] } - waitWriters(parallerlLeaver, false, doneCh) + waitWriters(parallelLeaver, false, doneCh) // Give some time time.Sleep(100 * time.Millisecond) // The writers will join the network - for i := 0; i < parallerlLeaver; i++ { + for i := 0; i < parallelLeaver; i++ { logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i]) go joinNetwork(ips[i], servicePort, networkName, doneCh) } - waitWriters(parallerlLeaver, false, doneCh) + waitWriters(parallelLeaver, false, doneCh) // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)