Skip to content

Commit

Permalink
ISPN-9699 Global 0.0f Capacity Factor
Browse files Browse the repository at this point in the history
  • Loading branch information
karesti authored and wburns committed Jan 30, 2019
1 parent e06c4e3 commit a184ba8
Show file tree
Hide file tree
Showing 19 changed files with 268 additions and 62 deletions.
Expand Up @@ -31,6 +31,8 @@
@Scope(Scopes.GLOBAL)
@SurvivesRestarts
public class GlobalConfiguration {
private static final String ZERO_CAPACITY_NODE_FEATURE = "zero-capacity-node";


/**
* Default replication version, from {@link org.infinispan.Version#getVersionShort}.
Expand All @@ -57,6 +59,7 @@ public class GlobalConfiguration {
private final ThreadPoolConfiguration asyncThreadPool;
private final Optional<String> defaultCacheName;
private final Features features;
private final boolean zeroCapacityNode;

GlobalConfiguration(ThreadPoolConfiguration expirationThreadPool,
ThreadPoolConfiguration listenerThreadPool,
Expand All @@ -68,7 +71,10 @@ public class GlobalConfiguration {
TransportConfiguration transport, GlobalSecurityConfiguration security,
SerializationConfiguration serialization, ShutdownConfiguration shutdown,
GlobalStateConfiguration globalState,
List<?> modules, SiteConfiguration site, Optional<String> defaultCacheName, ClassLoader cl, Features features) {
List<?> modules, SiteConfiguration site,
Optional<String> defaultCacheName,
ClassLoader cl, Features features,
boolean zeroCapacityNode) {
this.expirationThreadPool = expirationThreadPool;
this.listenerThreadPool = listenerThreadPool;
this.replicationQueueThreadPool = replicationQueueThreadPool;
Expand All @@ -82,14 +88,15 @@ public class GlobalConfiguration {
this.shutdown = shutdown;
this.globalState = globalState;
Map<Class<?>, Object> moduleMap = new HashMap<>();
for(Object module : modules) {
for (Object module : modules) {
moduleMap.put(module.getClass(), module);
}
this.modules = Collections.unmodifiableMap(moduleMap);
this.site = site;
this.defaultCacheName = defaultCacheName;
this.cl = cl;
this.features = features;
this.zeroCapacityNode = features.isAvailable(ZERO_CAPACITY_NODE_FEATURE) ? zeroCapacityNode : false;
}

/**
Expand Down Expand Up @@ -201,7 +208,7 @@ public GlobalStateConfiguration globalState() {

@SuppressWarnings("unchecked")
public <T> T module(Class<T> moduleClass) {
return (T)modules.get(moduleClass);
return (T) modules.get(moduleClass);
}

public Map<Class<?>, ?> modules() {
Expand Down Expand Up @@ -245,10 +252,15 @@ public String toString() {
", site=" + site +
", defaultCacheName=" + defaultCacheName +
", cl=" + cl +
", zeroCapacityNode=" + zeroCapacityNode +
'}';
}

public boolean isClustered() {
return transport().transport() != null;
}

public boolean isZeroCapacityNode() {
return zeroCapacityNode;
}
}
Expand Up @@ -34,6 +34,7 @@ public class GlobalConfigurationBuilder implements GlobalConfigurationChildBuild
private final Map<Class<?>, Builder<?>> modules;
private final SiteConfigurationBuilder site;
private Optional<String> defaultCacheName;
private boolean zeroCapacityNode;
private Features features;

public GlobalConfigurationBuilder() {
Expand All @@ -57,6 +58,7 @@ public GlobalConfigurationBuilder() {
this.asyncThreadPool = new ThreadPoolConfigurationBuilder(this);
this.modules = new LinkedHashMap();
this.defaultCacheName = Optional.empty();
this.zeroCapacityNode = false;
}

/**
Expand Down Expand Up @@ -165,6 +167,11 @@ public <T> T module(Class<T> moduleClass) {
return (T)modules.get(moduleClass);
}

public GlobalConfigurationBuilder zeroCapacityNode(boolean zeroCapacityNode) {
this.zeroCapacityNode = zeroCapacityNode;
return this;
}

public GlobalConfigurationBuilder clearModules() {
modules.clear();
return this;
Expand Down Expand Up @@ -259,7 +266,8 @@ public GlobalConfiguration build() {
site.create(),
defaultCacheName,
cl,
features);
features,
zeroCapacityNode);
}

public GlobalConfigurationBuilder read(GlobalConfiguration template) {
Expand Down
Expand Up @@ -167,6 +167,7 @@ public enum Attribute {
WAIT_TIME,
WHEN_SPLIT,
WRITE_SKEW_CHECK("write-skew"),
ZERO_CAPACITY_NODE("zero-capacity-node")
;

private final String name;
Expand Down
Expand Up @@ -628,13 +628,17 @@ private void parseContainer(XMLExtendedStreamReader reader, ConfigurationBuilder
break;
}
case STATISTICS: {
builder.globalJmxStatistics().enabled(Boolean.valueOf(value));
builder.globalJmxStatistics().enabled(Boolean.parseBoolean(value));
break;
}
case SHUTDOWN_HOOK: {
builder.shutdown().hookBehavior(ShutdownHookBehavior.valueOf(value));
break;
}
case ZERO_CAPACITY_NODE: {
builder.zeroCapacityNode(Boolean.parseBoolean(value));
break;
}
default: {
throw ParseUtils.unexpectedAttribute(reader, i);
}
Expand Down
Expand Up @@ -94,14 +94,16 @@ public void start() throws Exception {
persistentStateChecksum = Optional.empty();
}

float capacityFactor = globalConfiguration.isZeroCapacityNode() ? 0.0f : configuration.clustering().hash().capacityFactor();

CacheJoinInfo joinInfo = new CacheJoinInfo(pickConsistentHashFactory(globalConfiguration, configuration),
configuration.clustering().hash().hash(),
configuration.clustering().hash().numSegments(),
configuration.clustering().hash().numOwners(),
configuration.clustering().stateTransfer().timeout(),
configuration.transaction().transactionProtocol().isTotalOrder(),
configuration.clustering().cacheMode(),
configuration.clustering().hash().capacityFactor(),
capacityFactor,
localTopologyManager.getPersistentUUID(),
persistentStateChecksum);

Expand Down
5 changes: 5 additions & 0 deletions core/src/main/resources/schema/infinispan-config-10.0.xsd
Expand Up @@ -315,6 +315,11 @@
<xs:documentation>Indicates the default cache for this cache container</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="zero-capacity-node" type="xs:boolean" default="false">
<xs:annotation>
<xs:documentation>If 'true' then no data is stored in this node. Defaults to 'false'.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="start">
<xs:annotation><xs:documentation>Unused XML attribute</xs:documentation></xs:annotation>
</xs:attribute>
Expand Down
@@ -0,0 +1,59 @@
package org.infinispan.distribution;

import static org.testng.AssertJUnit.assertEquals;

import java.util.Map;

import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.impl.DefaultConsistentHash;
import org.infinispan.distribution.group.impl.PartitionerConsistentHash;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.Test;

/**
* Test the capacity factor for lite instance
*
* @author Katia Aresti
* @since 9.4
*/
@Test(groups = "functional", testName = "distribution.ch.ZeroCapacityNodeTest")
public class ZeroCapacityNodeTest extends MultipleCacheManagersTest {

public static final int NUM_SEGMENTS = 60;

@Override
protected void createCacheManagers() throws Throwable {
// Do nothing here, create the cache managers in the test
}

public void testCapacityFactorContainingAZeroCapacityNode() {

ConfigurationBuilder cb = new ConfigurationBuilder();
cb.clustering().cacheMode(CacheMode.DIST_SYNC);
cb.clustering().hash().numSegments(NUM_SEGMENTS);
cb.clustering().hash().capacityFactor(0.5f);

EmbeddedCacheManager node1 = addClusterEnabledCacheManager(GlobalConfigurationBuilder.defaultClusteredBuilder(), cb);
EmbeddedCacheManager node2 = addClusterEnabledCacheManager(GlobalConfigurationBuilder.defaultClusteredBuilder(), cb);
EmbeddedCacheManager zeroCapacityNode = addClusterEnabledCacheManager(GlobalConfigurationBuilder.defaultClusteredBuilder().zeroCapacityNode(true), cb);

waitForClusterToForm();
assertCapacityFactors(node1, 0.5f);
assertCapacityFactors(node2, 0.5f);
assertCapacityFactors(zeroCapacityNode, 0.0f);
}

private void assertCapacityFactors(EmbeddedCacheManager cm, float expectedCapacityFactors) {
ConsistentHash ch = cache(0).getAdvancedCache().getDistributionManager().getReadConsistentHash();
DefaultConsistentHash dch =
(DefaultConsistentHash) TestingUtil.extractField(PartitionerConsistentHash.class, ch, "ch");
Map<Address, Float> capacityFactors = dch.getCapacityFactors();
assertEquals(expectedCapacityFactors, capacityFactors.get(cm.getAddress()), 0.0);
}
}
Expand Up @@ -5,6 +5,7 @@
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;

import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -18,6 +19,7 @@
import org.infinispan.functional.impl.ReadOnlyMapImpl;
import org.infinispan.functional.impl.ReadWriteMapImpl;
import org.infinispan.functional.impl.WriteOnlyMapImpl;
import org.infinispan.util.concurrent.CompletableFutures;

public final class FunctionalTestUtils {

Expand Down Expand Up @@ -48,6 +50,10 @@ public static <T> T await(CompletableFuture<T> cf) {
}
}

public static <T> List<T> await(List<CompletableFuture<T>> cf) {
return await(CompletableFutures.sequence(cf));
}

public static <K> void assertReadOnlyViewEmpty(K k, ReadEntryView<K, ?> ro) {
assertEquals(k, ro.key());
assertFalse(ro.find().isPresent());
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/resources/configs/unified/10.0.xml
Expand Up @@ -97,7 +97,7 @@

<cache-container name="maximal" aliases="alias1 alias2" default-cache="local" async-executor="infinispan-async" expiration-executor="infinispan-expiration"
jndi-name="java:global/infinispan/maximal" state-transfer-executor="infinispan-state-transfer" listener-executor="infinispan-listener"
persistence-executor="infinispan-cached" module="org.infinispan" statistics="true" shutdown-hook="DONT_REGISTER">
persistence-executor="infinispan-cached" module="org.infinispan" statistics="true" shutdown-hook="DONT_REGISTER" zero-capacity-node="false">
<transport cluster="maximal-cluster" executor="infinispan-transport" remote-command-executor="infinispan-cached" lock-timeout="120000" stack="tcp" node-name="a-node" machine="a" rack="b" site="c"
initial-cluster-size="4" initial-cluster-timeout="30000" />
<serialization marshaller="org.infinispan.marshall.TestObjectStreamMarshaller" version="1.0">
Expand Down
1 change: 1 addition & 0 deletions core/src/test/resources/infinispan-features.properties
@@ -0,0 +1 @@
org.infinispan.feature.zero-capacity-node=true
@@ -0,0 +1,20 @@
package org.infinispan.counter;

import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.testng.annotations.Test;

/**
* A simple consistency test for {@link org.infinispan.counter.api.StrongCounter} where some nodes are capacity factor
* 0.
*
* @author Katia Aresti, karesti@redhat.com
* @since 9.4
*/
@Test(groups = "functional", testName = "counter.StrongCounterWithZeroCapacityNodesTest")
public class StrongCounterWithZeroCapacityNodesTest extends StrongCounterTest {

@Override
protected GlobalConfigurationBuilder configure(int nodeId) {
return GlobalConfigurationBuilder.defaultClusteredBuilder().zeroCapacityNode(nodeId % 2 == 0);
}
}
@@ -0,0 +1,20 @@
package org.infinispan.counter;

import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.counter.api.WeakCounter;
import org.testng.annotations.Test;

/**
* A simple consistency test for {@link WeakCounter} with zero capacity nodes.
*
* @author Katia Aresti, karesti@redhat.com
* @since 9.4
*/
@Test(groups = "functional", testName = "counter.WeakCounterWithZeroCapacityNodesTest")
public class WeakCounterWithZeroCapacityNodesTest extends WeakCounterTest {

@Override
protected GlobalConfigurationBuilder configure(int nodeId) {
return GlobalConfigurationBuilder.defaultClusteredBuilder().zeroCapacityNode(nodeId % 2 == 0);
}
}
15 changes: 15 additions & 0 deletions documentation/src/main/asciidoc/user_guide/clustering.adoc
Expand Up @@ -342,6 +342,21 @@ With cross-site replication as well, the "site master" should only deal with for
commands between sites and shouldn't handle user requests, so it makes sense to configure
it with a capacity factor of `0`.

===== Zero Capacity Node
You might need to configure a whole node where the capacity factor is `0` for every cache,
user defined caches and internal caches.
When defining a zero capacity node, the node won't hold any data.
This is how you declare a zero capacity node:

[source,xml]
----
<cache-container zero-capacity-node="true" />
----

[source, java]
----
new GlobalConfigurationBuilder().zeroCapacityNode(true);
----

===== Hashing Configuration
This is how you configure hashing declaratively, via XML:
Expand Down
Expand Up @@ -5,6 +5,7 @@
import java.util.concurrent.CompletableFuture;

import org.infinispan.Cache;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
Expand All @@ -25,6 +26,7 @@
import org.infinispan.lock.impl.lock.ClusteredLockFilter;
import org.infinispan.lock.impl.manager.CacheHolder;
import org.infinispan.lock.impl.manager.EmbeddedClusteredLockManager;
import org.infinispan.lock.logging.Log;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.partitionhandling.PartitionHandling;
import org.infinispan.registry.InternalCacheRegistry;
Expand All @@ -39,6 +41,7 @@
*/
@MetaInfServices(value = ModuleLifecycle.class)
public class ClusteredLockModuleLifecycle implements ModuleLifecycle {
private static final Log log = LogFactory.getLog(ClusteredLockModuleLifecycle.class, Log.class);

public static final String CLUSTERED_LOCK_CACHE_NAME = "org.infinispan.LOCKS";

Expand All @@ -61,8 +64,9 @@ public void cacheManagerStarted(GlobalComponentRegistry gcr) {
final InternalCacheRegistry internalCacheRegistry = gcr.getComponent(InternalCacheRegistry.class);

ClusteredLockManagerConfiguration config = extractConfiguration(gcr);
GlobalConfiguration globalConfig = gcr.getGlobalConfiguration();

internalCacheRegistry.registerInternalCache(CLUSTERED_LOCK_CACHE_NAME, createClusteredLockCacheConfiguration(config),
internalCacheRegistry.registerInternalCache(CLUSTERED_LOCK_CACHE_NAME, createClusteredLockCacheConfiguration(config, globalConfig),
EnumSet.of(InternalCacheRegistry.Flag.EXCLUSIVE));

CompletableFuture<CacheHolder> future = startCaches(cacheManager);
Expand All @@ -75,13 +79,15 @@ private static ClusteredLockManagerConfiguration extractConfiguration(GlobalComp
return config == null ? ClusteredLockManagerConfigurationBuilder.defaultConfiguration() : config;
}

private static Configuration createClusteredLockCacheConfiguration(ClusteredLockManagerConfiguration config) {
private static Configuration createClusteredLockCacheConfiguration(ClusteredLockManagerConfiguration config, GlobalConfiguration globalConfig) {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);

if (config.numOwners() > 0) {
builder.clustering().cacheMode(CacheMode.DIST_SYNC)
.hash().numOwners(config.numOwners());
} else if (globalConfig.isZeroCapacityNode()) {
throw log.zeroCapacityNodeError();
} else {
builder.clustering().cacheMode(CacheMode.REPL_SYNC);
}
Expand Down
3 changes: 3 additions & 0 deletions lock/src/main/java/org/infinispan/lock/logging/Log.java
Expand Up @@ -37,4 +37,7 @@ public interface Log extends BasicLogger {

@Message(value = "Invalid scope for tag <clustered-lock>. Expected CACHE_CONTAINER but was %s", id = 29007)
ClusteredLockException invalidScope(String scope);

@Message(value = "When the node is configured as a zero-capacity node, you need to specify the number of owners for the lock", id = 29008)
ClusteredLockException zeroCapacityNodeError();
}

0 comments on commit a184ba8

Please sign in to comment.