Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

ISPN-2897 NPE in DefaultConsistentHash

If any partition is rebalancing during a merge, the merge coordinator
should end the rebalance it and start a new one with all the members.
  • Loading branch information...
commit 5731ed9a4566851eed71b8d9ed1b07cd3f1f46f0 1 parent 2937d82
Dan Berindei danberindei authored Mircea Markus committed
2  core/src/main/java/org/infinispan/topology/ClusterCacheStatus.java
@@ -142,7 +142,7 @@ public boolean updateClusterMembers(List<Address> newClusterMembers) {
142 142 synchronized (this) {
143 143 if (newClusterMembers.containsAll(members)) {
144 144 if (trace) log.tracef("Cluster members updated for cache %s, no leavers detected: " +
145   - "cache members = %s", members, newClusterMembers);
  145 + "cache members = %s", cacheName, newClusterMembers);
146 146 return false;
147 147 }
148 148
81 core/src/main/java/org/infinispan/topology/ClusterTopologyManagerImpl.java
@@ -233,7 +233,7 @@ protected void handleNewView(List<Address> newMembers, boolean mergeView, int ne
233 233 for (Map.Entry<String, List<CacheTopology>> e : clusterCacheMap.entrySet()) {
234 234 String cacheName = e.getKey();
235 235 List<CacheTopology> topologyList = e.getValue();
236   - updateCacheStatusAfterMerge(cacheName, topologyList);
  236 + updateCacheStatusAfterMerge(cacheName, newMembers, topologyList);
237 237 }
238 238 } catch (InterruptedException e) {
239 239 log.tracef("Cluster state recovery interrupted because the coordinator is shutting down");
@@ -270,7 +270,8 @@ private ClusterCacheStatus initCacheStatusIfAbsent(String cacheName, CacheJoinIn
270 270 return cacheStatus;
271 271 }
272 272
273   - public void updateCacheStatusAfterMerge(String cacheName, List<CacheTopology> partitionTopologies)
  273 + public void updateCacheStatusAfterMerge(String cacheName, List<Address> clusterMembers,
  274 + List<CacheTopology> partitionTopologies)
274 275 throws Exception {
275 276 log.tracef("Initializing rebalance policy for cache %s, pre-existing partitions are %s",
276 277 cacheName, partitionTopologies);
@@ -278,61 +279,47 @@ public void updateCacheStatusAfterMerge(String cacheName, List<CacheTopology> pa
278 279 if (partitionTopologies.isEmpty())
279 280 return;
280 281
281   - int unionTopologyId = 0;
282   - ConsistentHash currentCHUnion = null;
283   - ConsistentHash pendingCHUnion = null;
284   - ConsistentHashFactory chFactory = cacheStatus.getJoinInfo().getConsistentHashFactory();
285   - for (CacheTopology topology : partitionTopologies) {
286   - if (topology.getTopologyId() > unionTopologyId) {
287   - unionTopologyId = topology.getTopologyId();
288   - }
289   - if (currentCHUnion == null) {
290   - currentCHUnion = topology.getCurrentCH();
291   - } else {
292   - currentCHUnion = chFactory.union(currentCHUnion, topology.getCurrentCH());
293   - }
  282 + synchronized (cacheStatus) {
  283 + int unionTopologyId = 0;
  284 + // We only use the currentCH, we ignore any ongoing rebalance in the partitions
  285 + ConsistentHash currentCHUnion = null;
  286 + ConsistentHashFactory chFactory = cacheStatus.getJoinInfo().getConsistentHashFactory();
  287 + for (CacheTopology topology : partitionTopologies) {
  288 + if (topology.getTopologyId() > unionTopologyId) {
  289 + unionTopologyId = topology.getTopologyId();
  290 + }
294 291
295   - if (pendingCHUnion == null) {
296   - pendingCHUnion = topology.getPendingCH();
297   - } else {
298   - if (topology.getPendingCH() != null)
299   - pendingCHUnion = chFactory.union(pendingCHUnion, topology.getPendingCH());
  292 + if (currentCHUnion == null) {
  293 + currentCHUnion = topology.getCurrentCH();
  294 + } else {
  295 + currentCHUnion = chFactory.union(currentCHUnion, topology.getCurrentCH());
  296 + }
300 297 }
301   - }
302 298
303   - // We have added each node to the cache status when we received its status response
304   - List<Address> members = cacheStatus.getMembers();
305   - if (currentCHUnion != null) {
306   - currentCHUnion = chFactory.updateMembers(currentCHUnion, members);
307   - }
308   - if (pendingCHUnion != null) {
309   - pendingCHUnion = chFactory.updateMembers(pendingCHUnion, members);
310   - }
  299 + // We have added each node to the cache status when we received its status response
  300 + List<Address> members = cacheStatus.getMembers();
  301 + // Filter out any nodes that aren't members of the cluster any more
  302 + cacheStatus.updateClusterMembers(clusterMembers);
  303 + if (currentCHUnion != null) {
  304 + currentCHUnion = chFactory.updateMembers(currentCHUnion, members);
  305 + }
311 306
312   - // Make sure the topology id is higher than any topology id we had before in the cluster
313   - unionTopologyId += 2;
314   - CacheTopology cacheTopology = new CacheTopology(unionTopologyId, currentCHUnion, pendingCHUnion);
315   - boolean wasRebalanceInProgress = pendingCHUnion != null;
  307 + // Make sure the topology id is higher than any topology id we had before in the cluster
  308 + unionTopologyId += 2;
  309 + CacheTopology cacheTopology = new CacheTopology(unionTopologyId, currentCHUnion, null);
316 310
317   - synchronized (cacheStatus) {
318   - // TODO Deal with members had joined in a partition, but which did not start receiving data yet
319   - // (i.e. they weren't in the current or in the pending CH)
320   - cacheStatus.setMembers(cacheTopology.getMembers());
321   - if (wasRebalanceInProgress) {
322   - cacheStatus.startRebalance(cacheTopology);
323   - } else {
324   - cacheStatus.updateCacheTopology(cacheTopology);
  311 + // End any running rebalance
  312 + if (cacheStatus.isRebalanceInProgress()) {
  313 + cacheStatus.endRebalance();
325 314 }
  315 + cacheStatus.updateCacheTopology(cacheTopology);
326 316 }
327 317
  318 + // End any rebalance that was running in the other partitions
328 319 broadcastConsistentHashUpdate(cacheName, cacheStatus);
329 320
330   - if (wasRebalanceInProgress) {
331   - broadcastRebalanceStart(cacheName, cacheStatus);
332   - } else {
333   - // Trigger another rebalance in case the CH is not balanced (even though there was no rebalance in progress)
334   - triggerRebalance(cacheName);
335   - }
  321 + // Trigger another rebalance in case the CH is not balanced
  322 + triggerRebalance(cacheName);
336 323 }
337 324
338 325 private void broadcastConsistentHashUpdate(String cacheName, ClusterCacheStatus cacheStatus) throws Exception {
204 core/src/test/java/org/infinispan/statetransfer/ClusterTopologyManagerTest.java
@@ -22,33 +22,59 @@
22 22 */
23 23 package org.infinispan.statetransfer;
24 24
  25 +import java.util.ArrayList;
  26 +import java.util.Collections;
  27 +import java.util.HashMap;
  28 +import java.util.List;
  29 +import java.util.Map;
  30 +import java.util.concurrent.Callable;
  31 +import java.util.concurrent.Future;
  32 +import java.util.concurrent.TimeUnit;
  33 +import java.util.concurrent.TimeoutException;
  34 +import java.util.concurrent.locks.Condition;
  35 +import java.util.concurrent.locks.Lock;
  36 +import java.util.concurrent.locks.ReentrantLock;
  37 +
25 38 import org.infinispan.Cache;
26 39 import org.infinispan.configuration.cache.CacheMode;
27 40 import org.infinispan.configuration.cache.ConfigurationBuilder;
28   -import org.infinispan.configuration.global.GlobalConfigurationBuilder;
  41 +import org.infinispan.manager.EmbeddedCacheManager;
  42 +import org.infinispan.remoting.transport.Address;
29 43 import org.infinispan.test.MultipleCacheManagersTest;
30 44 import org.infinispan.test.TestingUtil;
31 45 import org.infinispan.test.fwk.CleanupAfterMethod;
32 46 import org.infinispan.test.fwk.TransportFlags;
  47 +import org.infinispan.topology.CacheTopology;
  48 +import org.infinispan.topology.LocalTopologyManager;
33 49 import org.infinispan.util.Util;
34 50 import org.jgroups.protocols.DISCARD;
  51 +import org.mockito.invocation.InvocationOnMock;
  52 +import org.mockito.stubbing.Answer;
35 53 import org.testng.annotations.Test;
36 54
  55 +import static org.mockito.Matchers.any;
  56 +import static org.mockito.Matchers.anyInt;
  57 +import static org.mockito.Matchers.eq;
  58 +import static org.mockito.Mockito.doAnswer;
  59 +import static org.mockito.Mockito.spy;
  60 +
37 61 @Test(groups = "functional", testName = "statetransfer.ClusterTopologyManagerTest")
38 62 @CleanupAfterMethod
39 63 public class ClusterTopologyManagerTest extends MultipleCacheManagersTest {
40 64
  65 + public static final String CACHE_NAME = "cache";
  66 + private ConfigurationBuilder defaultConfig;
41 67 Cache c1, c2, c3;
42 68 DISCARD d1, d2, d3;
43 69
44 70 @Override
45 71 protected void createCacheManagers() throws Throwable {
46   - ConfigurationBuilder defaultConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false);
  72 + defaultConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false);
47 73 createClusteredCaches(3, defaultConfig, new TransportFlags().withFD(true).withMerge(true));
48 74
49   - c1 = cache(0, "cache");
50   - c2 = cache(1, "cache");
51   - c3 = cache(2, "cache");
  75 + c1 = cache(0, CACHE_NAME);
  76 + c2 = cache(1, CACHE_NAME);
  77 + c3 = cache(2, CACHE_NAME);
52 78 d1 = TestingUtil.getDiscardForCache(c1);
53 79 d1.setExcludeItself(true);
54 80 d2 = TestingUtil.getDiscardForCache(c2);
@@ -91,7 +117,7 @@ public void testNodeAbruptLeave() throws Exception {
91 117 // Check that a new node can join
92 118 ConfigurationBuilder defaultConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false);
93 119 addClusterEnabledCacheManager(defaultConfig, new TransportFlags().withFD(true).withMerge(true));
94   - Cache<Object, Object> c4 = cache(3, "cache");
  120 + Cache<Object, Object> c4 = cache(3, CACHE_NAME);
95 121 TestingUtil.blockUntilViewsReceived(30000, true, c1, c2, c4);
96 122 TestingUtil.waitForRehashToComplete(c1, c2, c4);
97 123
@@ -124,7 +150,7 @@ public void testClusterRecoveryAfterCoordLeave() throws Exception {
124 150 // Check that a new node can join
125 151 ConfigurationBuilder defaultConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false);
126 152 addClusterEnabledCacheManager(defaultConfig, new TransportFlags().withFD(true).withMerge(true));
127   - Cache<Object, Object> c4 = cache(3, "cache");
  153 + Cache<Object, Object> c4 = cache(3, CACHE_NAME);
128 154 TestingUtil.blockUntilViewsReceived(30000, true, c2, c3, c4);
129 155 TestingUtil.waitForRehashToComplete(c2, c3, c4);
130 156 }
@@ -162,7 +188,7 @@ public void testClusterRecoveryAfterThreeWaySplit() throws Exception {
162 188 // Check that a new node can join
163 189 ConfigurationBuilder defaultConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false);
164 190 addClusterEnabledCacheManager(defaultConfig, new TransportFlags().withFD(true).withMerge(true));
165   - Cache<Object, Object> c4 = cache(3, "cache");
  191 + Cache<Object, Object> c4 = cache(3, CACHE_NAME);
166 192 TestingUtil.blockUntilViewsReceived(30000, true, c1, c2, c3, c4);
167 193 TestingUtil.waitForRehashToComplete(c1, c2, c3, c4);
168 194 }
@@ -202,8 +228,168 @@ public void testClusterRecoveryAfterSplitAndCoordLeave() throws Exception {
202 228 // Check that a new node can join
203 229 ConfigurationBuilder defaultConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false);
204 230 addClusterEnabledCacheManager(defaultConfig, new TransportFlags().withFD(true).withMerge(true));
205   - Cache<Object, Object> c4 = cache(3, "cache");
  231 + Cache<Object, Object> c4 = cache(3, CACHE_NAME);
206 232 TestingUtil.blockUntilViewsReceived(30000, true, c2, c3, c4);
207 233 TestingUtil.waitForRehashToComplete(c2, c3, c4);
208 234 }
  235 +
  236 + public void testClusterRecoveryWithRebalance() throws Exception {
  237 + // Compute the merge coordinator by sorting the JGroups addresses, the same way MERGE2/3 do
  238 + List<Address> members = new ArrayList<Address>(manager(0).getMembers());
  239 + Collections.sort(members);
  240 + Address mergeCoordAddress = members.get(0);
  241 + log.debugf("The merge coordinator will be %s", mergeCoordAddress);
  242 + EmbeddedCacheManager mergeCoordManager = manager(mergeCoordAddress);
  243 + int mergeCoordIndex = cacheManagers.indexOf(mergeCoordManager);
  244 +
  245 + // create the partitions
  246 + log.debugf("Splitting the cluster in three");
  247 + d1.setDiscardAll(true);
  248 + d2.setDiscardAll(true);
  249 + d3.setDiscardAll(true);
  250 +
  251 + // wait for the coordinator to be separated (don't care about the others)
  252 + TestingUtil.blockUntilViewsReceived(30000, false, c1);
  253 + TestingUtil.blockUntilViewsReceived(30000, false, c2);
  254 + TestingUtil.blockUntilViewsReceived(30000, false, c3);
  255 + TestingUtil.waitForRehashToComplete(c1);
  256 + TestingUtil.waitForRehashToComplete(c2);
  257 + TestingUtil.waitForRehashToComplete(c3);
  258 +
  259 + // Disable DISCARD *only* on the merge coordinator
  260 + if (mergeCoordIndex == 0) d1.setDiscardAll(false);
  261 + if (mergeCoordIndex == 1) d2.setDiscardAll(false);
  262 + if (mergeCoordIndex == 2) d3.setDiscardAll(false);
  263 +
  264 + int viewIdAfterSplit = mergeCoordManager.getTransport().getViewId();
  265 + final LocalTopologyManager localTopologyManager = TestingUtil.extractGlobalComponent(mergeCoordManager,
  266 + LocalTopologyManager.class);
  267 + final CheckPoint checkpoint = new CheckPoint();
  268 + LocalTopologyManager spyLocalTopologyManager = spy(localTopologyManager);
  269 + doAnswer(new Answer<Object>() {
  270 + @Override
  271 + public Object answer(InvocationOnMock invocation) throws Throwable {
  272 + int viewId = (Integer) invocation.getArguments()[2];
  273 + checkpoint.trigger("rebalance" + viewId);
  274 + log.debugf("Blocking the REBALANCE_START command on the merge coordinator");
  275 + checkpoint.awaitStrict("merge", 10, TimeUnit.SECONDS);
  276 + return invocation.callRealMethod();
  277 + }
  278 + }).when(spyLocalTopologyManager).handleRebalance(eq(CACHE_NAME), any(CacheTopology.class), anyInt());
  279 + TestingUtil.replaceComponent(mergeCoordManager, LocalTopologyManager.class, spyLocalTopologyManager, true);
  280 +
  281 + final EmbeddedCacheManager cm4 = addClusterEnabledCacheManager(defaultConfig, new TransportFlags().withFD(true).withMerge(true));
  282 + Future<Cache<Object,Object>> cacheFuture = fork(new Callable<Cache<Object, Object>>() {
  283 + @Override
  284 + public Cache<Object, Object> call() throws Exception {
  285 + return cm4.getCache(CACHE_NAME);
  286 + }
  287 + });
  288 +
  289 + log.debugf("Waiting for the REBALANCE_START command to reach the merge coordinator");
  290 + checkpoint.awaitStrict("rebalance" + (viewIdAfterSplit + 1), 10, TimeUnit.SECONDS);
  291 +
  292 + // merge the partitions
  293 + log.debugf("Merging the cluster partitions");
  294 + d1.setDiscardAll(false);
  295 + d2.setDiscardAll(false);
  296 + d3.setDiscardAll(false);
  297 +
  298 + // wait for the JGroups merge
  299 + long startTime = System.currentTimeMillis();
  300 + TestingUtil.blockUntilViewsReceived(30000, cacheManagers);
  301 +
  302 + // unblock the REBALANCE_START command
  303 + log.debugf("Unblocking the REBALANCE_START command on the coordinator");
  304 + checkpoint.triggerForever("merge");
  305 +
  306 + // wait for the 4th cache to finish joining
  307 + Cache<Object, Object> c4 = cacheFuture.get(30, TimeUnit.SECONDS);
  308 + TestingUtil.waitForRehashToComplete(c1, c2, c3, c4);
  309 +
  310 + long endTime = System.currentTimeMillis();
  311 + log.debugf("Merge took %s", Util.prettyPrintTime(endTime - startTime));
  312 + assert endTime - startTime < 30000 : "Merge took too long: " + Util.prettyPrintTime(endTime - startTime);
  313 +
  314 + // Check that another node can join
  315 + ConfigurationBuilder defaultConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false);
  316 + EmbeddedCacheManager cm5 = addClusterEnabledCacheManager(defaultConfig, new TransportFlags().withFD(true).withMerge(true));
  317 + Cache<Object, Object> c5 = cm5.getCache(CACHE_NAME);
  318 + TestingUtil.blockUntilViewsReceived(30000, true, c1, c2, c3, c4, c5);
  319 + TestingUtil.waitForRehashToComplete(c1, c2, c3, c4, c5);
  320 + }
  321 +
  322 +}
  323 +
  324 +class CheckPoint {
  325 + private final Lock lock = new ReentrantLock();
  326 + private final Condition unblockCondition = lock.newCondition();
  327 + private final Map<String, Integer> events = new HashMap<String, Integer>();
  328 +
  329 + public void awaitStrict(String event, long timeout, TimeUnit unit)
  330 + throws InterruptedException, TimeoutException {
  331 + awaitStrict(event, 1, timeout, unit);
  332 + }
  333 +
  334 + public boolean await(String event, long timeout, TimeUnit unit) throws InterruptedException {
  335 + return await(event, 1, timeout, unit);
  336 + }
  337 +
  338 + public void awaitStrict(String event, int count, long timeout, TimeUnit unit)
  339 + throws InterruptedException, TimeoutException {
  340 + if (!await(event, count, timeout, unit)) {
  341 + throw new TimeoutException("Timed out waiting for event " + event);
  342 + }
  343 + }
  344 +
  345 + public boolean await(String event, int count, long timeout, TimeUnit unit) throws InterruptedException {
  346 + lock.lock();
  347 + try {
  348 + long waitNanos = unit.toNanos(timeout);
  349 + while (waitNanos > 0) {
  350 + Integer currentCount = events.get(event);
  351 + if (currentCount != null && currentCount >= count) {
  352 + events.put(event, currentCount - count);
  353 + break;
  354 + }
  355 + waitNanos = unblockCondition.awaitNanos(waitNanos);
  356 + }
  357 +
  358 + if (waitNanos <= 0) {
  359 + // let the triggering thread know that we timed out
  360 + events.put(event, -1);
  361 + return false;
  362 + }
  363 +
  364 + return true;
  365 + } finally {
  366 + lock.unlock();
  367 + }
  368 + }
  369 +
  370 + public void trigger(String event) {
  371 + trigger(event, 1);
  372 + }
  373 +
  374 + public void triggerForever(String event) {
  375 + trigger(event, Integer.MAX_VALUE);
  376 + }
  377 +
  378 + public void trigger(String event, int count) {
  379 + lock.lock();
  380 + try {
  381 + Integer currentCount = events.get(event);
  382 + if (currentCount == null) {
  383 + currentCount = 0;
  384 + } else if (currentCount < 0) {
  385 + throw new IllegalStateException("Thread already timed out waiting for event " + event);
  386 + }
  387 +
  388 + // If triggerForever is called more than once, it will cause an overflow and the waiters will fail.
  389 + events.put(event, currentCount + count);
  390 + unblockCondition.signalAll();
  391 + } finally {
  392 + lock.unlock();
  393 + }
  394 + }
209 395 }
9 core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
@@ -425,6 +425,15 @@ protected EmbeddedCacheManager manager(int i) {
425 425 return cacheManagers.get(i);
426 426 }
427 427
  428 + public EmbeddedCacheManager manager(Address a) {
  429 + for (EmbeddedCacheManager cm : cacheManagers) {
  430 + if (cm.getAddress().equals(a)) {
  431 + return cm;
  432 + }
  433 + }
  434 + throw new IllegalArgumentException(a + " is not a valid cache manager address!");
  435 + }
  436 +
428 437 protected <K, V> Cache<K, V> cache(int managerIndex, String cacheName) {
429 438 return manager(managerIndex).getCache(cacheName);
430 439 }

0 comments on commit 5731ed9

Please sign in to comment.
Something went wrong with that request. Please try again.