Skip to content

Commit

Permalink
ISPN-5567 CreateCacheCommand waiting for a number of members should b…
Browse files Browse the repository at this point in the history
…e optional

* Don't wait for any members when CreateCacheCommand has a size of 0
  Fixes ClusteredCLITest
* Reduce the state transfer timeout in DistributedIntermediate[Shared]CacheFourNodesMapReduceTest
* General cleanup
  • Loading branch information
danberindei authored and tristantarrant committed Jun 24, 2015
1 parent 22e0540 commit 35f82d3
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 55 deletions.
Expand Up @@ -17,7 +17,6 @@
import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.rpc.RpcManager; import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.util.TimeService; import org.infinispan.util.TimeService;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;


Expand Down
Expand Up @@ -405,10 +405,9 @@ <KOut, VOut> ReduceCommand<KOut, VOut> buildReduceCommand(String taskId,
/** /**
* Builds a CreateCacheCommand used to create/start cache around Infinispan cluster * Builds a CreateCacheCommand used to create/start cache around Infinispan cluster
* *
* @param start if true, then this command also makes sure that the cache is started on all the nodes in the cluster. * @param size If {@code size > 0}, the command will wait until the cache runs on at least {@code size} nodes.
* @param size the expected number of nodes where this node runs
*/ */
CreateCacheCommand buildCreateCacheCommand(String tmpCacheName, String defaultTmpCacheConfigurationName, boolean start, int size); CreateCacheCommand buildCreateCacheCommand(String tmpCacheName, String defaultTmpCacheConfigurationName, int size);


/** /**
* Builds CancelCommandCommand used to cancel other commands executing on Infinispan cluster * Builds CancelCommandCommand used to cancel other commands executing on Infinispan cluster
Expand Down
Expand Up @@ -580,8 +580,8 @@ public CreateCacheCommand buildCreateCacheCommand(String cacheNameToCreate, Stri
} }


@Override @Override
public CreateCacheCommand buildCreateCacheCommand(String cacheNameToCreate, String cacheConfigurationName, boolean start, int size) { public CreateCacheCommand buildCreateCacheCommand(String cacheNameToCreate, String cacheConfigurationName, int size) {
return new CreateCacheCommand(cacheName, cacheNameToCreate, cacheConfigurationName, start, size); return new CreateCacheCommand(cacheName, cacheNameToCreate, cacheConfigurationName, size);
} }


@Override @Override
Expand Down
24 changes: 10 additions & 14 deletions core/src/main/java/org/infinispan/commands/CreateCacheCommand.java
Expand Up @@ -32,8 +32,7 @@ public class CreateCacheCommand extends BaseRpcCommand {
private EmbeddedCacheManager cacheManager; private EmbeddedCacheManager cacheManager;
private String cacheNameToCreate; private String cacheNameToCreate;
private String cacheConfigurationName; private String cacheConfigurationName;
private boolean start; private int expectedMembers;
private int size;


private CreateCacheCommand() { private CreateCacheCommand() {
super(null); super(null);
Expand All @@ -44,15 +43,15 @@ public CreateCacheCommand(String ownerCacheName) {
} }


public CreateCacheCommand(String ownerCacheName, String cacheNameToCreate, String cacheConfigurationName) { public CreateCacheCommand(String ownerCacheName, String cacheNameToCreate, String cacheConfigurationName) {
this(ownerCacheName, cacheNameToCreate, cacheConfigurationName, false, 0); this(ownerCacheName, cacheNameToCreate, cacheConfigurationName, 0);
} }


public CreateCacheCommand(String cacheName, String cacheNameToCreate, String cacheConfigurationName, boolean start, int size) { public CreateCacheCommand(String cacheName, String cacheNameToCreate, String cacheConfigurationName,
int expectedMembers) {
super(cacheName); super(cacheName);
this.cacheNameToCreate = cacheNameToCreate; this.cacheNameToCreate = cacheNameToCreate;
this.cacheConfigurationName = cacheConfigurationName; this.cacheConfigurationName = cacheConfigurationName;
this.start = start; this.expectedMembers = expectedMembers;
this.size = size;
} }


public void init(EmbeddedCacheManager cacheManager) { public void init(EmbeddedCacheManager cacheManager) {
Expand Down Expand Up @@ -95,9 +94,8 @@ protected void waitForCacheToStabilize(Cache<Object, Object> cache, Configuratio


long endTime = timeService.expectedEndTime(cacheConfig.clustering().stateTransfer().timeout(), long endTime = timeService.expectedEndTime(cacheConfig.clustering().stateTransfer().timeout(),
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
int expectedSize = cacheManager.getTransport().getMembers().size();
CacheTopology cacheTopology = stateTransferManager.getCacheTopology(); CacheTopology cacheTopology = stateTransferManager.getCacheTopology();
while (cacheTopology.getMembers().size() != expectedSize || cacheTopology.getPendingCH() != null) { while (cacheTopology.getMembers().size() < expectedMembers || cacheTopology.getPendingCH() != null) {
long remainingTime = timeService.remainingTime(endTime, TimeUnit.NANOSECONDS); long remainingTime = timeService.remainingTime(endTime, TimeUnit.NANOSECONDS);
try { try {
stateTransferLock.waitForTopology(cacheTopology.getTopologyId() + 1, remainingTime, stateTransferLock.waitForTopology(cacheTopology.getTopologyId() + 1, remainingTime,
Expand All @@ -116,7 +114,7 @@ public byte getCommandId() {


@Override @Override
public Object[] getParameters() { public Object[] getParameters() {
return new Object[] {cacheNameToCreate, cacheConfigurationName, start, size}; return new Object[]{cacheNameToCreate, cacheConfigurationName, expectedMembers};
} }


@Override @Override
Expand All @@ -127,8 +125,7 @@ public void setParameters(int commandId, Object[] parameters) {
int i = 0; int i = 0;
cacheNameToCreate = (String) parameters[i++]; cacheNameToCreate = (String) parameters[i++];
cacheConfigurationName = (String) parameters[i++]; cacheConfigurationName = (String) parameters[i++];
start = (Boolean) parameters[i++]; expectedMembers = (Integer) parameters[i];
size = (Integer) parameters[i];
} }


@Override @Override
Expand Down Expand Up @@ -167,7 +164,7 @@ public boolean equals(Object obj) {
} else if (!cacheNameToCreate.equals(other.cacheNameToCreate)) { } else if (!cacheNameToCreate.equals(other.cacheNameToCreate)) {
return false; return false;
} }
return this.start == other.start && this.size == other.size; return this.expectedMembers == other.expectedMembers;
} }


@Override @Override
Expand All @@ -176,8 +173,7 @@ public String toString() {
"cacheManager=" + cacheManager + "cacheManager=" + cacheManager +
", cacheNameToCreate='" + cacheNameToCreate + '\'' + ", cacheNameToCreate='" + cacheNameToCreate + '\'' +
", cacheConfigurationName='" + cacheConfigurationName + '\'' + ", cacheConfigurationName='" + cacheConfigurationName + '\'' +
", start=" + start + '\'' + ", expectedMembers=" + expectedMembers +
", size=" + size +
'}'; '}';
} }


Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Address;
import org.infinispan.security.AuthorizationManager; import org.infinispan.security.AuthorizationManager;
import org.infinispan.security.AuthorizationPermission; import org.infinispan.security.AuthorizationPermission;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;


Expand Down Expand Up @@ -159,7 +158,6 @@ public class MapReduceTask<KIn, VIn, KOut, VOut> {
protected RpcOptionsBuilder rpcOptionsBuilder; protected RpcOptionsBuilder rpcOptionsBuilder;
protected String customIntermediateCacheName; protected String customIntermediateCacheName;
protected String intermediateCacheConfigurationName = DEFAULT_TMP_CACHE_CONFIGURATION_NAME; protected String intermediateCacheConfigurationName = DEFAULT_TMP_CACHE_CONFIGURATION_NAME;
private StateTransferManager stateTransferManager;
private static final int MAX_COLLECTOR_SIZE = 1000; private static final int MAX_COLLECTOR_SIZE = 1000;


/** /**
Expand Down Expand Up @@ -218,7 +216,6 @@ public MapReduceTask(Cache<KIn, VIn> masterCacheNode, boolean distributeReducePh
this.marshaller = componentRegistry.getComponent(StreamingMarshaller.class, CACHE_MARSHALLER); this.marshaller = componentRegistry.getComponent(StreamingMarshaller.class, CACHE_MARSHALLER);
this.mapReduceManager = componentRegistry.getComponent(MapReduceManager.class); this.mapReduceManager = componentRegistry.getComponent(MapReduceManager.class);
this.cancellationService = componentRegistry.getComponent(CancellationService.class); this.cancellationService = componentRegistry.getComponent(CancellationService.class);
this.stateTransferManager = componentRegistry.getComponent(StateTransferManager.class);
this.taskId = UUID.randomUUID(); this.taskId = UUID.randomUUID();
if (useIntermediateSharedCache) { if (useIntermediateSharedCache) {
this.customIntermediateCacheName = DEFAULT_TMP_CACHE_CONFIGURATION_NAME; this.customIntermediateCacheName = DEFAULT_TMP_CACHE_CONFIGURATION_NAME;
Expand Down Expand Up @@ -531,7 +528,9 @@ protected void executeTaskInit(String tmpCacheName) throws MapReduceException {
CommandsFactory factory = cache.getComponentRegistry().getComponent(CommandsFactory.class); CommandsFactory factory = cache.getComponentRegistry().getComponent(CommandsFactory.class);


//first create tmp caches on all nodes //first create tmp caches on all nodes
final CreateCacheCommand ccc = factory.buildCreateCacheCommand(tmpCacheName, intermediateCacheConfigurationName, true, rpc.getMembers().size()); final CreateCacheCommand ccc = factory.buildCreateCacheCommand(tmpCacheName,
intermediateCacheConfigurationName,
rpc.getMembers().size());
log.debugf("Invoking %s across members %s ", ccc, cache.getRpcManager().getMembers()); log.debugf("Invoking %s across members %s ", ccc, cache.getRpcManager().getMembers());


// invoke remotely // invoke remotely
Expand All @@ -541,12 +540,8 @@ protected void executeTaskInit(String tmpCacheName) throws MapReduceException {
// invoke locally // invoke locally
try { try {
ccc.init(cache.getCacheManager()); ccc.init(cache.getCacheManager());
try { ccc.perform(null);
ccc.perform(null); } catch (Throwable e) {
} catch (Throwable e) {
throw new MapReduceException("Could not initialize temporary caches for MapReduce task on remote nodes ", e);
}
} catch (Exception e) {
throw new MapReduceException(e); throw new MapReduceException(e);
} }


Expand Down
@@ -1,13 +1,12 @@
package org.infinispan.distexec.mapreduce; package org.infinispan.distexec.mapreduce;




import java.util.Iterator; import java.util.concurrent.TimeUnit;


import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.commons.CacheException; import org.infinispan.commons.CacheException;
import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.testng.annotations.Test; import org.testng.annotations.Test;




Expand Down Expand Up @@ -52,18 +51,17 @@ protected MapReduceTask<String, String, String, Integer> createMapReduceTask(Cac
@Test(expectedExceptions={CacheException.class}) @Test(expectedExceptions={CacheException.class})
public void testIntermediateCacheNotCreatedOnAllNodes() throws Exception { public void testIntermediateCacheNotCreatedOnAllNodes() throws Exception {
String cacheNameConfig = "notCreatedOnAllNodes"; String cacheNameConfig = "notCreatedOnAllNodes";
Cache c = cache(0, cacheName()); Cache<String, String> c = cache(0, cacheName());
MapReduceTask<String, String, String, Integer> t = new MapReduceTask<String, String, String, Integer>(c, true, MapReduceTask<String, String, String, Integer> t = new MapReduceTask<>(c, true, false);
false);
ConfigurationBuilder cacheConfig = new ConfigurationBuilder(); ConfigurationBuilder cacheConfig = new ConfigurationBuilder();
cacheConfig.unsafe().unreliableReturnValues(true).clustering().cacheMode(CacheMode.DIST_SYNC).hash().numOwners(2) cacheConfig.unsafe().unreliableReturnValues(true)
.sync(); .clustering().cacheMode(CacheMode.DIST_SYNC).hash().numOwners(2)
.stateTransfer().timeout(5, TimeUnit.SECONDS);


//define configuration only on first node //define configuration only on first node
Iterator<EmbeddedCacheManager> iterator = getCacheManagers().iterator(); manager(0).defineConfiguration(cacheNameConfig, cacheConfig.build());
iterator.next().defineConfiguration(cacheNameConfig, cacheConfig.build());


t.usingIntermediateCache(cacheNameConfig); t.usingSharedIntermediateCache("irrelevant", cacheNameConfig);
t.mappedWith(new WordCountMapper()).reducedWith(new WordCountReducer()); t.mappedWith(new WordCountMapper()).reducedWith(new WordCountReducer());
t.execute(); t.execute();
} }
Expand Down
@@ -1,15 +1,14 @@
package org.infinispan.distexec.mapreduce; package org.infinispan.distexec.mapreduce;




import java.util.Iterator;

import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.commons.CacheException; import org.infinispan.commons.CacheException;
import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.testng.annotations.Test; import org.testng.annotations.Test;


import java.util.concurrent.TimeUnit;



/** /**
* DistributedIntermediateSharedCacheFourNodesMapReduceTest tests Map/Reduce functionality using four Infinispan nodes, * DistributedIntermediateSharedCacheFourNodesMapReduceTest tests Map/Reduce functionality using four Infinispan nodes,
Expand Down Expand Up @@ -39,8 +38,7 @@ protected void createCacheManagers() throws Throwable {
@Override @Override
protected MapReduceTask<String, String, String, Integer> createMapReduceTask(Cache c) { protected MapReduceTask<String, String, String, Integer> createMapReduceTask(Cache c) {
//run distributed reduce with per task cache - cache specified by the user //run distributed reduce with per task cache - cache specified by the user
MapReduceTask<String, String, String, Integer> t = new MapReduceTask<String, String, String, Integer>(c, true, MapReduceTask<String, String, String, Integer> t = new MapReduceTask<>(c, true, false);
false);
ConfigurationBuilder cacheConfig = new ConfigurationBuilder(); ConfigurationBuilder cacheConfig = new ConfigurationBuilder();
cacheConfig.unsafe().unreliableReturnValues(true) cacheConfig.unsafe().unreliableReturnValues(true)
.clustering().cacheMode(CacheMode.DIST_SYNC).hash().numOwners(2).sync(); .clustering().cacheMode(CacheMode.DIST_SYNC).hash().numOwners(2).sync();
Expand All @@ -53,19 +51,18 @@ protected MapReduceTask<String, String, String, Integer> createMapReduceTask(Cac
@Test(expectedExceptions={CacheException.class}) @Test(expectedExceptions={CacheException.class})
public void testIntermediateCacheNotCreatedOnAllNodes() throws Exception { public void testIntermediateCacheNotCreatedOnAllNodes() throws Exception {
String cacheNameConfig = "notCreatedOnAllNodes"; String cacheNameConfig = "notCreatedOnAllNodes";
Cache c = cache(0, cacheName()); Cache<String, String> c = cache(0, cacheName());
MapReduceTask<String, String, String, Integer> t = new MapReduceTask<String, String, String, Integer>(c, true, MapReduceTask<String, String, String, Integer> t = new MapReduceTask<>(c, true, false);
false);
ConfigurationBuilder cacheConfig = new ConfigurationBuilder(); ConfigurationBuilder cacheConfig = new ConfigurationBuilder();
cacheConfig.unsafe().unreliableReturnValues(true).clustering().cacheMode(CacheMode.DIST_SYNC).hash().numOwners(2) cacheConfig.unsafe().unreliableReturnValues(true)
.sync(); .clustering().cacheMode(CacheMode.DIST_SYNC).hash().numOwners(2)
.stateTransfer().timeout(5, TimeUnit.SECONDS);


//define configuration only on first node //define configuration only on first node
Iterator<EmbeddedCacheManager> iterator = getCacheManagers().iterator(); manager(0).defineConfiguration(cacheNameConfig, cacheConfig.build());
iterator.next().defineConfiguration(cacheNameConfig, cacheConfig.build());


t.usingSharedIntermediateCache("irrelevant", cacheNameConfig); t.usingSharedIntermediateCache("irrelevant", cacheNameConfig);
t.mappedWith(new WordCountMapper()).reducedWith(new WordCountReducer()); t.mappedWith(new WordCountMapper()).reducedWith(new WordCountReducer());
t.execute(); t.execute();
} }
} }
Expand Up @@ -347,8 +347,8 @@ public CancelCommand buildCancelCommandCommand(UUID commandUUID) {
} }


@Override @Override
public CreateCacheCommand buildCreateCacheCommand(String tmpCacheName, String defaultTmpCacheConfigurationName, boolean start, int size) { public CreateCacheCommand buildCreateCacheCommand(String tmpCacheName, String defaultTmpCacheConfigurationName, int size) {
return actual.buildCreateCacheCommand(tmpCacheName, defaultTmpCacheConfigurationName, start, size); return actual.buildCreateCacheCommand(tmpCacheName, defaultTmpCacheConfigurationName, size);
} }


@Override @Override
Expand Down

0 comments on commit 35f82d3

Please sign in to comment.