Permalink
Browse files

ISPN-2825 ClusterTopologyManagerImpl should not hold a lock while inv…

…oking an RPC

If the REBALANCE_START command takes a long time to be processed on
one of the nodes, the REBALANCE_CONFIRM commands from the rest of
the nodes are blocked and end up filling the OOB thread pool.

Add two new tests for large clusters.
Move CacheCreationStressTest to the profiling group.
  • Loading branch information...
1 parent 56a2c43 commit ce86510e9385bb8469cd7fb8af5b9f3b32339c46 @danberindei danberindei committed with Mircea Markus Mar 5, 2013
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+
+package org.infinispan.client.hotrod.stress;
+
+import java.io.UnsupportedEncodingException;
+import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+/**
+ * Test the performance of ConsistentHashV1/V2.
+ *
+ * @author Dan Berindei
+ * @since 5.3
+ */
+@Test(groups = "profiling", testName = "client.hotrod.ClientConsistentHashPerfTest")
+public class ClientConsistentHashPerfTest extends MultiHotRodServersTest {
+
+ private static final int NUM_SERVERS = 64;
+ private static final int ITERATIONS = 10000000;
+ private static final int NUM_KEYS = 100000;
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ ConfigurationBuilder config = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false);
+ createHotRodServers(NUM_SERVERS, config);
+ }
+
+ public void testConsistentHashPerf() throws Exception {
+ RemoteCacheManager rcm = client(0);
+ RemoteCache<Object, Object> cache = rcm.getCache();
+ // This will initialize the consistent hash
+ cache.put("k", "v");
+
+ TcpTransportFactory transportFactory = (TcpTransportFactory) TestingUtil.extractField(rcm, "transportFactory");
+ ConsistentHash ch = transportFactory.getConsistentHash();
+ byte[][] keys = new byte[NUM_KEYS][];
+
+ for (int i = 0; i < NUM_KEYS; i++) {
+ keys[i] = String.valueOf(i).getBytes("UTF-8");
+ }
+
+ SocketAddress aServer = null;
+ // warm-up
+ for (int i = 0; i < ITERATIONS/10; i++) {
+ SocketAddress server = ch.getServer(keys[i % keys.length]);
+ if (server != null) aServer = server;
+ }
+
+ long startNanos = System.nanoTime();
+ for (int i = 0; i < ITERATIONS; i++) {
+ SocketAddress server = ch.getServer(keys[i % keys.length]);
+ if (server != null) aServer = server;
+ }
+ double duration = System.nanoTime() - startNanos;
+ log.infof("Test took %.3f s, average CH lookup was %.3f ns", duration / 1000000000L, duration / ITERATIONS);
+ }
+}
@@ -79,7 +79,7 @@
private final ConcurrentMap<String, ClusterCacheStatus> cacheStatusMap = ConcurrentMapFactory.makeConcurrentMap();
- private ClusterTopologyManagerImpl.ClusterViewListener viewListener;
+ private ClusterViewListener viewListener;
@Inject
public void inject(Transport transport, RebalancePolicy rebalancePolicy,
@@ -336,18 +336,13 @@ public void updateCacheStatusAfterMerge(String cacheName, List<CacheTopology> pa
}
private void broadcastConsistentHashUpdate(String cacheName, ClusterCacheStatus cacheStatus) throws Exception {
- // Serialize CH update commands, so that they don't arrive on the other members out-of-order.
- // We are ok with sending the same CH update twice, we just don't want the other members to receive
- // an older CH after they got the latest CH.
- synchronized (cacheStatus) {
- CacheTopology cacheTopology = cacheStatus.getCacheTopology();
- log.debugf("Updating cluster-wide consistent hash for cache %s, topology = %s",
- cacheName, cacheTopology);
- ReplicableCommand command = new CacheTopologyControlCommand(cacheName,
- CacheTopologyControlCommand.Type.CH_UPDATE, transport.getAddress(), cacheTopology,
- transport.getViewId());
- executeOnClusterSync(command, getGlobalTimeout());
- }
+ CacheTopology cacheTopology = cacheStatus.getCacheTopology();
+ log.debugf("Updating cluster-wide consistent hash for cache %s, topology = %s",
+ cacheName, cacheTopology);
+ ReplicableCommand command = new CacheTopologyControlCommand(cacheName,
+ CacheTopologyControlCommand.Type.CH_UPDATE, transport.getAddress(), cacheTopology,
+ transport.getViewId());
+ executeOnClusterSync(command, getGlobalTimeout());
}
private void startRebalance(String cacheName) throws Exception {
@@ -401,15 +396,13 @@ private void startRebalance(String cacheName) throws Exception {
}
private void broadcastRebalanceStart(String cacheName, ClusterCacheStatus cacheStatus) throws Exception {
- synchronized (cacheStatus) {
- CacheTopology cacheTopology = cacheStatus.getCacheTopology();
- log.debugf("Starting cluster-wide rebalance for cache %s, topology = %s",
- cacheName, cacheTopology);
- ReplicableCommand command = new CacheTopologyControlCommand(cacheName,
- CacheTopologyControlCommand.Type.REBALANCE_START, transport.getAddress(), cacheTopology,
- transport.getViewId());
- executeOnClusterSync(command, getGlobalTimeout());
- }
+ CacheTopology cacheTopology = cacheStatus.getCacheTopology();
+ log.debugf("Starting cluster-wide rebalance for cache %s, topology = %s",
+ cacheName, cacheTopology);
+ ReplicableCommand command = new CacheTopologyControlCommand(cacheName,
+ CacheTopologyControlCommand.Type.REBALANCE_START, transport.getAddress(), cacheTopology,
+ transport.getViewId());
+ executeOnClusterSync(command, getGlobalTimeout());
}
private void endRebalance(String cacheName, ClusterCacheStatus cacheStatus) {
@@ -174,9 +174,10 @@ public void handleConsistentHashUpdate(String cacheName, CacheTopology cacheTopo
}
synchronized (cacheStatus) {
- if (cacheStatus.getTopology() != null && cacheStatus.getTopology().getTopologyId() > cacheTopology.getTopologyId()){
+ CacheTopology existingTopology = cacheStatus.getTopology();
+ if (existingTopology != null && cacheTopology.getTopologyId() < existingTopology.getTopologyId()){
log.tracef("Ignoring consistent hash update %s for cache %s, we have already received a newer topology %s",
- cacheTopology.getTopologyId(), cacheName, cacheStatus.getTopology().getTopologyId());
+ cacheTopology.getTopologyId(), cacheName, existingTopology.getTopologyId());
return;
}
@@ -189,9 +190,17 @@ public void handleConsistentHashUpdate(String cacheName, CacheTopology cacheTopo
}
CacheTopologyHandler handler = cacheStatus.getHandler();
- CacheTopology unionTopology = new CacheTopology(cacheTopology.getTopologyId(), cacheTopology.getCurrentCH(), unionCH);
+ CacheTopology unionTopology = new CacheTopology(cacheTopology.getTopologyId(),
+ cacheTopology.getCurrentCH(), unionCH);
unionTopology.logRoutingTableInformation();
- handler.updateConsistentHash(unionTopology);
+ if ((existingTopology == null || existingTopology.getPendingCH() == null) && unionCH != null) {
+ // This CH_UPDATE command was sent after a REBALANCE_START command, but arrived first.
+ // We will start the rebalance now and ignore the REBALANCE_START command when it arrives.
+ log.tracef("This topology update has a pending CH, starting the rebalance now");
+ handler.rebalance(unionTopology);
+ } else {
+ handler.updateConsistentHash(unionTopology);
+ }
}
}
@@ -223,12 +232,12 @@ public void handleRebalance(String cacheName, CacheTopology cacheTopology, int v
log.debugf("Starting local rebalance for cache %s, topology = %s", cacheName, cacheTopology);
cacheTopology.logRoutingTableInformation();
cacheStatus.setTopology(cacheTopology);
- }
- ConsistentHash unionCH = cacheStatus.getJoinInfo().getConsistentHashFactory().union(
- cacheTopology.getCurrentCH(), cacheTopology.getPendingCH());
- CacheTopologyHandler handler = cacheStatus.getHandler();
- handler.rebalance(new CacheTopology(cacheTopology.getTopologyId(), cacheTopology.getCurrentCH(), unionCH));
+ ConsistentHash unionCH = cacheStatus.getJoinInfo().getConsistentHashFactory().union(
+ cacheTopology.getCurrentCH(), cacheTopology.getPendingCH());
+ CacheTopologyHandler handler = cacheStatus.getHandler();
+ handler.rebalance(new CacheTopology(cacheTopology.getTopologyId(), cacheTopology.getCurrentCH(), unionCH));
+ }
}
@Override
@@ -39,7 +39,7 @@
* @author Galder Zamarreño
* @since 4.2
*/
-@Test(groups = "functional", testName = "profiling.CacheCreationStressTest")
+@Test(groups = "profiling", testName = "profiling.CacheCreationStressTest")
public class CacheCreationStressTest extends AbstractInfinispanTest {
public void testCreateCachesFromSameContainer() {
@@ -126,6 +126,15 @@ public Object call() throws Exception {
// Allow command forwarded from c2 to proceed on c3.
// StateTransferInterceptor will then forward the command to c1 and c4.
+ // NB: This will fail if the number of core threads in the regular thread pool is < 3
+ // and queueing is enabled because of a deadlock:
+ // a) an Incoming thread on c3 is blocked in DelayInterceptor, waiting for the unblock(1) call
+ // b) another Incoming thread on c3 is blocked in DelayInterceptor, waiting for the unblock(2) call
+ // c) the OOB thread on c3 processing the REBALANCE_START is blocked in LocalTopologyManagerImpl,
+ // waiting for the new JGroups view to be installed on c3
+ // d) the VIEW message is queued by the regular thread pool on c3, waiting for a) or b) to finish
+ // e) the main thread is blocked waiting for the rebalance to finish, so it won't call unblock(1)
+ // until c) is done
log.tracef("Forwarding the command from %s", c3);
di3.unblock(1);
@@ -1,48 +0,0 @@
-/*
- * Copyright 2011 Red Hat, Inc. and/or its affiliates.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA
- */
-
-package org.infinispan.stress;
-
-import org.infinispan.Cache;
-import org.infinispan.test.CacheManagerCallable;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.testng.annotations.Test;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.infinispan.test.TestingUtil.withCacheManager;
-
-@Test(groups = {"performance", "manual"})
-public class CreateThousandCachesTest {
- public void doTest() {
- System.out.println("Starting... ");
- withCacheManager(new CacheManagerCallable(TestCacheManagerFactory.createCacheManager()) {
- @Override
- public void call() {
- List<Cache<?, ?>> thousandCaches = new LinkedList<Cache<?, ?>>();
- long start = System.currentTimeMillis();
- for (int i = 0; i < 1000; i++) {
- thousandCaches.add(cm.getCache("cache" + i));
- }
- System.out.println("Created 1000 basic caches in " + (System.currentTimeMillis() - start) + " millis");
- }
- });
- }
-}
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+
+package org.infinispan.stress;
+
+import org.infinispan.Cache;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.testng.annotations.Test;
+
+/**
+ * Test that we're able to start a large cluster in a single JVM.
+ *
+ * @author Dan Berindei
+ * @since 5.3
+ */
+@Test(groups = "stress", testName = "statetransfer.LargeClusterStressTest")
+public class LargeClusterStressTest extends MultipleCacheManagersTest {
+
+ private static final int NUM_NODES = 64;
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ // start the cache managers in the test itself
+ }
+
+ public void testLargeCluster() {
+ ConfigurationBuilder distConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false);
+ ConfigurationBuilder replConfig = getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, false);
+ for (int i = 0; i < NUM_NODES; i++) {
+ defineConfigurationOnAllManagers("dist", distConfig);
+ defineConfigurationOnAllManagers("repl", replConfig);
+ EmbeddedCacheManager cm = addClusterEnabledCacheManager();
+ Cache<Object,Object> replCache = cm.getCache("repl");
+ Cache<Object, Object> distCache = cm.getCache("dist");
+
+ replCache.put(cm.getAddress(), "bla");
+
+ waitForClusterToForm("repl");
+ waitForClusterToForm("dist");
+ }
+ }
+}
@@ -187,15 +187,18 @@ private static String replaceMCastAddressAndPort(JGroupsProtocolCfg jgroupsCfg)
private static String replaceTcpStartPort(JGroupsProtocolCfg jgroupsCfg, TransportFlags transportFlags) {
Map<String, String> props = jgroupsCfg.getProtocol(TCP).getProperties();
Integer startPort = threadTcpStartPort.get();
+ int portRange = TCP_PORT_RANGE_PER_THREAD;
if (transportFlags.isSiteIndexSpecified()) {
- int sitePortRange = 10;
- int maxIndex = TCP_PORT_RANGE_PER_THREAD / sitePortRange - 1;
+ portRange = 10;
+ int maxIndex = TCP_PORT_RANGE_PER_THREAD / portRange - 1;
if (transportFlags.siteIndex() > maxIndex) {
throw new IllegalStateException("Currently we only support " + (maxIndex + 1) + " sites!");
}
- startPort += transportFlags.siteIndex() * sitePortRange;
+ startPort += transportFlags.siteIndex() * portRange;
}
props.put("bind_port", startPort.toString());
+ // In JGroups, the port_range is inclusive
+ props.put("port_range", String.valueOf(portRange - 1));
return replaceProperties(jgroupsCfg, props, TCP);
}

0 comments on commit ce86510

Please sign in to comment.