Skip to content

Commit

Permalink
ISPN-8962 Prefer the newer topology when members are overlapping
Browse files Browse the repository at this point in the history
  • Loading branch information
danberindei authored and ryanemerson committed Apr 24, 2018
1 parent 47492a1 commit 20938b4
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void onClusterViewChange(AvailabilityStrategyContext context, List<Addres
context.queueRebalance(newMembers);
}

protected void checkForLostData(String cacheName, CacheTopology stableTopology, List<Address> newMembers) {
private void checkForLostData(String cacheName, CacheTopology stableTopology, List<Address> newMembers) {
List<Address> stableMembers = stableTopology.getMembers();
List<Address> lostMembers = new ArrayList<>(stableMembers);
lostMembers.removeAll(newMembers);
Expand Down Expand Up @@ -150,13 +150,11 @@ public void onPartitionMerge(AvailabilityStrategyContext context,
int mergeRebalanceId = 0;
for (Partition p : partitions) {
CacheTopology topology = p.topology;
if (topology != null) {
if (mergeTopologyId <= topology.getTopologyId()) {
mergeTopologyId = topology.getTopologyId() + 1;
}
if (mergeRebalanceId <= topology.getRebalanceId()) {
mergeRebalanceId = topology.getRebalanceId() + 1;
}
if (mergeTopologyId <= topology.getTopologyId()) {
mergeTopologyId = topology.getTopologyId() + 1;
}
if (mergeRebalanceId <= topology.getRebalanceId()) {
mergeRebalanceId = topology.getRebalanceId() + 1;
}
}

Expand Down Expand Up @@ -228,7 +226,7 @@ private Partition selectPreferredPartition(List<Partition> partitions) {
for (Partition p : partitions) {
// TODO Investigate comparing the number of segments owned by the senders +
// the number of the number of segments for partition(senders includes owner) agrees
if (p.isPreferable(preferredPartition)) {
if (!p.isConflictResolutionOnly() && p.isPreferable(preferredPartition)) {
preferredPartition = p;
}
}
Expand Down Expand Up @@ -317,6 +315,7 @@ private List<Partition> computePartitions(Map<Address, CacheStatusResponse> stat
if (trace)
log.tracef("Cache %s partition of %s overlaps with partition of %s but possibly holds extra entries",
cacheName, p.senders, referencePartition.senders);
p.setConflictResolutionOnly();
}
}
if (fold) {
Expand Down Expand Up @@ -345,6 +344,7 @@ private static class Partition {
final ConsistentHash readCH;
final List<Address> actualMembers;
final List<Address> senders = new ArrayList<>();
private boolean conflictResolutionOnly;

Partition(Address sender, CacheTopology topology, CacheTopology stableTopology, ConsistentHash readCH) {
this.topology = topology;
Expand All @@ -370,5 +370,13 @@ private boolean isPreferable(Partition other) {
}
return false;
}

boolean isConflictResolutionOnly() {
return conflictResolutionOnly;
}

void setConflictResolutionOnly() {
conflictResolutionOnly = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -361,23 +361,22 @@ public void testMerge1Paused2StableAfterLosingAnotherNode() {
when(context.getCacheName()).thenReturn(CACHE_NAME);
when(context.resolveConflictsOnMerge()).thenReturn(conflicts.resolve());
if (conflicts.resolve()) {
when(context.calculateConflictHash(cacheA.readConsistentHash(),
when(context.calculateConflictHash(cacheC.readConsistentHash(),
setOf(cacheA.readConsistentHash(), cacheC.readConsistentHash())))
.thenReturn(conflictResolutionConsistentHash(cacheA, cacheC));
.thenReturn(conflictResolutionConsistentHash(cacheC, cacheA));
}

strategy.onPartitionMerge(context, statusResponses);

TestClusterCacheStatus expectedCache = cacheA.copy();
expectedCache.updateActualMembers(mergeMembers);
TestClusterCacheStatus expectedCache = cacheC.copy();
if (conflicts.resolve()) {
expectedCache.startConflictResolution(conflictResolutionConsistentHash(cacheA, cacheC), A, C);
expectedCache.startConflictResolution(conflictResolutionConsistentHash(cacheC, cacheA), A, C);
}
expectedCache.incrementIdsIfNeeded(cacheC);
verify(context).updateTopologiesAfterMerge(expectedCache.topology(), expectedCache.stableTopology(), null,
conflicts.resolve());
if (!conflicts.resolve()) {
verify(context).updateCurrentTopology(mergeMembers);
verify(context).updateCurrentTopology(singletonList(C));
}
verify(context).queueRebalance(mergeMembers);
verifyNoMoreInteractions(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public void testNewTopologySentAfterCleanMerge() {
partition(0).merge(partition(1));
eventuallyExpectCompleteTopology(client, initialTopology + 7);
// Check that we got the number of topology updates to NO_REBALANCE right
// T+2: NO_REBALANCE in partition [B] before merge
// T+3: CONFLICT_RESOLUTION, preferred CH: owners = (1) [test-NodeA-22368: 256+0]
// T+4:READ_OLD (rebalance starts), T+5:READ_ALL, T+6:READ_NEW, T+7: NO_REBALANCE
LocalizedCacheTopology newTopology = advancedCache(0).getDistributionManager().getCacheTopology();
assertEquals(CacheTopology.Phase.NO_REBALANCE, newTopology.getPhase());
assertEquals(initialTopology + 7, newTopology.getTopologyId());
Expand All @@ -110,11 +113,12 @@ public void testNewTopologySentAfterOverlappingMerge() {
eventuallyExpectPartialTopology(client, initialTopology + 1);

partition(0).merge(partition(1));
eventuallyExpectCompleteTopology(client, initialTopology + 3);
eventuallyExpectCompleteTopology(client, initialTopology + 6);
// Check that we got the number of topology updates to NO_REBALANCE right
// T+2: CONFLICT_RESOLUTION, preferred CH: owners = (1) [test-NodeA-22368: 256+0]
// T+3:READ_OLD (rebalance starts), T+4:READ_ALL, T+5:READ_NEW, T+6: NO_REBALANCE
LocalizedCacheTopology newTopology = advancedCache(0).getDistributionManager().getCacheTopology();
assertEquals(CacheTopology.Phase.NO_REBALANCE, newTopology.getPhase());
assertEquals(initialTopology + 3, newTopology.getTopologyId());
}


Expand Down

0 comments on commit 20938b4

Please sign in to comment.