From c2d77839f77757ad7987004003c03e141ba89167 Mon Sep 17 00:00:00 2001 From: Pete Muir Date: Wed, 18 May 2011 15:44:46 +0100 Subject: [PATCH] ISPN-312: Grouping support * Add support for @Group for keys which the application owns * Add support for groupers which can group keys outside the application * Group support needs to be explicitly enabled via Configuration (due to refl overhead) * XML config is missing for now * Javadoc and package Javadoc Groups currently have one side effect, which is to override location of keys. If grouping is enabled and a group is specified this will be used to determine key owner, rather than key hash. --- .../org/infinispan/config/Configuration.java | 38 ++++++ .../config/FluentConfiguration.java | 23 ++++ .../ch/AbstractConsistentHash.java | 19 +++ .../distribution/ch/ConsistentHashHelper.java | 25 +++- .../ch/DefaultConsistentHash.java | 4 +- .../ch/TopologyAwareConsistentHash.java | 4 +- .../infinispan/distribution/group/Group.java | 58 +++++++++ .../distribution/group/GroupManager.java | 19 +++ .../distribution/group/GroupManagerImpl.java | 121 ++++++++++++++++++ .../distribution/group/Grouper.java | 64 +++++++++ .../distribution/group/package-info.java | 12 ++ .../org/infinispan/util/ReflectionUtil.java | 4 +- .../distribution/BaseDistFunctionalTest.java | 8 ++ .../distribution/groups/GroupedKey.java | 33 +++++ .../groups/GroupsChFunctionalTest.java | 112 ++++++++++++++++ .../groups/GroupsDistAsyncFuncTest.java | 43 +++++++ .../groups/GroupsDistSyncUnsafeFuncTest.java | 44 +++++++ .../distribution/groups/KXGrouper.java | 34 +++++ 18 files changed, 656 insertions(+), 9 deletions(-) create mode 100644 core/src/main/java/org/infinispan/distribution/group/Group.java create mode 100644 core/src/main/java/org/infinispan/distribution/group/GroupManager.java create mode 100644 core/src/main/java/org/infinispan/distribution/group/GroupManagerImpl.java create mode 100644 core/src/main/java/org/infinispan/distribution/group/Grouper.java create mode 100644 core/src/main/java/org/infinispan/distribution/group/package-info.java create mode 100644 core/src/test/java/org/infinispan/distribution/groups/GroupedKey.java create mode 100644 core/src/test/java/org/infinispan/distribution/groups/GroupsChFunctionalTest.java create mode 100644 core/src/test/java/org/infinispan/distribution/groups/GroupsDistAsyncFuncTest.java create mode 100644 core/src/test/java/org/infinispan/distribution/groups/GroupsDistSyncUnsafeFuncTest.java create mode 100644 core/src/test/java/org/infinispan/distribution/groups/KXGrouper.java diff --git a/core/src/main/java/org/infinispan/config/Configuration.java b/core/src/main/java/org/infinispan/config/Configuration.java index 8c4a84402639..3d183ae67ac5 100644 --- a/core/src/main/java/org/infinispan/config/Configuration.java +++ b/core/src/main/java/org/infinispan/config/Configuration.java @@ -28,6 +28,7 @@ import org.infinispan.distribution.ch.ConsistentHash; import org.infinispan.distribution.ch.DefaultConsistentHash; import org.infinispan.distribution.ch.TopologyAwareConsistentHash; +import org.infinispan.distribution.group.Grouper; import org.infinispan.eviction.EvictionStrategy; import org.infinispan.eviction.EvictionThreadPolicy; import org.infinispan.factories.ComponentRegistry; @@ -1257,6 +1258,14 @@ public int getNumOwners() { public int getNumVirtualNodes() { return clustering.hash.numVirtualNodes; } + + public boolean isGroupsEnabled() { + return clustering.hash.groupsEnabled; + } + + public List> getGroupers() { + return clustering.hash.groupers; + } public boolean isRehashEnabled() { return clustering.hash.rehashEnabled; @@ -3107,6 +3116,12 @@ public static class HashType extends AbstractFluentConfigurationBean implements @ConfigurationDocRef(bean = HashConfig.class, targetElement = "numVirtualNodes") protected Integer numVirtualNodes = 1; + + @XmlTransient + protected Boolean groupsEnabled = false; + + @XmlTransient + protected List> groupers = new ArrayList>(); public void accept(ConfigurationBeanVisitor v) { v.visitHashType(this); @@ -3256,6 +3271,25 @@ public HashConfig rehashEnabled(Boolean rehashEnabled) { setRehashEnabled(rehashEnabled); return this; } + + @Override + public HashConfig groupers(List> groupers) { + testImmutability("groupers"); + this.groupers = groupers; + return this; + } + + @Override + public HashConfig groupsEnabled(Boolean groupsEnabled) { + testImmutability("groupsEnabled"); + this.groupsEnabled = groupsEnabled; + return this; + } + + @Override + public List> getGroupers() { + return groupers; + } @Override public boolean equals(Object o) { @@ -3270,6 +3304,8 @@ public boolean equals(Object o) { return false; if (numOwners != null ? !numOwners.equals(hashType.numOwners) : hashType.numOwners != null) return false; if (numVirtualNodes != null ? !numVirtualNodes.equals(hashType.numVirtualNodes) : hashType.numVirtualNodes != null) return false; + if (groupsEnabled != null ? !groupsEnabled.equals(hashType.groupsEnabled) : hashType.groupsEnabled != null) return false; + if (groupers != null ? !groupers.equals(hashType.groupers) : hashType.groupers != null) return false; if (rehashRpcTimeout != null ? !rehashRpcTimeout.equals(hashType.rehashRpcTimeout) : hashType.rehashRpcTimeout != null) return false; if (rehashWait != null ? !rehashWait.equals(hashType.rehashWait) : hashType.rehashWait != null) return false; @@ -3284,6 +3320,8 @@ public int hashCode() { result = 31 * result + (hashFunctionClass != null ? hashFunctionClass.hashCode() : 0); result = 31 * result + (numOwners != null ? numOwners.hashCode() : 0); result = 31 * result + (numVirtualNodes != null ? numVirtualNodes.hashCode() : 0); + result = 31 * result + (groupsEnabled != null ? groupsEnabled.hashCode() : 0); + result = 31 * result + (groupers != null ? groupers.hashCode() : 0); result = 31 * result + (rehashWait != null ? rehashWait.hashCode() : 0); result = 31 * result + (rehashRpcTimeout != null ? rehashRpcTimeout.hashCode() : 0); result = 31 * result + (rehashEnabled ? 0 : 1); diff --git a/core/src/main/java/org/infinispan/config/FluentConfiguration.java b/core/src/main/java/org/infinispan/config/FluentConfiguration.java index 430b883b52d0..6525eebb8f0c 100644 --- a/core/src/main/java/org/infinispan/config/FluentConfiguration.java +++ b/core/src/main/java/org/infinispan/config/FluentConfiguration.java @@ -23,8 +23,12 @@ package org.infinispan.config; +import java.util.List; + import org.infinispan.container.DataContainer; import org.infinispan.distribution.ch.ConsistentHash; +import org.infinispan.distribution.group.Group; +import org.infinispan.distribution.group.Grouper; import org.infinispan.eviction.EvictionStrategy; import org.infinispan.eviction.EvictionThreadPolicy; import org.infinispan.interceptors.base.CommandInterceptor; @@ -641,6 +645,25 @@ public interface HashConfig extends FluentTypes { * */ HashConfig numVirtualNodes(Integer numVirtualNodes); + + /** + * Enable grouping support, such that {@link Group} annotations are honoured and any configured + * groupers will be invoked + */ + HashConfig groupsEnabled(Boolean groupsEnabled); + + /** + * Controls the groupers used in distribution + * + * @param groupers the groupers to use + * @see #getGroupers() + */ + HashConfig groupers(List> groupers); + + /** + * Get's the current groupers in use + */ + List> getGroupers(); } /** diff --git a/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java b/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java index 5f1f3d15db4c..e923c697287e 100644 --- a/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java +++ b/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java @@ -22,6 +22,7 @@ */ package org.infinispan.distribution.ch; +import org.infinispan.distribution.group.GroupManager; import org.infinispan.remoting.transport.Address; import java.util.Collection; @@ -47,6 +48,8 @@ public abstract class AbstractConsistentHash implements ConsistentHash { protected Set
caches; + + protected GroupManager groupManager; @Override public void setCaches(Set
caches) { @@ -78,10 +81,26 @@ public boolean isKeyLocalToAddress(Address a, Object key, int replCount) { return locate(key, replCount).contains(a); } + public void setGroupManager(GroupManager groupManager) { + this.groupManager = groupManager; + } + @Override public String toString() { return getClass().getSimpleName() + " {" + "caches=" + caches + '}'; } + + /** + * Get the grouping, if any, for this key. + * + * @param key the key to get the grouping for + * @return the group, or if no group is applicable, the key + */ + protected Object getGrouping(Object key) { + String group = groupManager != null ? groupManager.getGroup(key) : null; + return group != null ? group : key; + } + } diff --git a/core/src/main/java/org/infinispan/distribution/ch/ConsistentHashHelper.java b/core/src/main/java/org/infinispan/distribution/ch/ConsistentHashHelper.java index faa5d9feb6d6..e023035d15fb 100644 --- a/core/src/main/java/org/infinispan/distribution/ch/ConsistentHashHelper.java +++ b/core/src/main/java/org/infinispan/distribution/ch/ConsistentHashHelper.java @@ -22,16 +22,23 @@ */ package org.infinispan.distribution.ch; +import static java.util.Collections.emptyList; + import org.infinispan.config.Configuration; import org.infinispan.remoting.transport.Address; import org.infinispan.util.Util; import org.infinispan.util.hash.Hash; - import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; +import org.infinispan.distribution.group.GroupManager; +import org.infinispan.distribution.group.GroupManagerImpl; +import org.infinispan.distribution.group.Grouper; + /** * A helper class that handles the construction of consistent hash instances based on configuration. * @@ -69,16 +76,26 @@ private static ConsistentHash constructConsistentHashInstance(Configuration c) { wch.setHashFunction(h); wch.setNumVirtualNodes(c.getNumVirtualNodes()); } + if (ch instanceof AbstractConsistentHash) { + AbstractConsistentHash ach = (AbstractConsistentHash) ch; + if (c.isGroupsEnabled()) + ach.setGroupManager(new GroupManagerImpl(c.getGroupers())); + } return ch; } - private static ConsistentHash constructConsistentHashInstance(Class clazz, Hash hash, int numVirtualNodes) { + private static ConsistentHash constructConsistentHashInstance(Class clazz, Hash hash, int numVirtualNodes, GroupManager groupManager) { ConsistentHash ch = Util.getInstance(clazz); if (ch instanceof AbstractWheelConsistentHash) { AbstractWheelConsistentHash wch = (AbstractWheelConsistentHash) ch; wch.setHashFunction(hash); wch.setNumVirtualNodes(numVirtualNodes); } + if (ch instanceof AbstractConsistentHash) { + AbstractConsistentHash ach = (AbstractConsistentHash) ch; + if (groupManager != null) + ach.setGroupManager(groupManager); + } return ch; } @@ -151,12 +168,14 @@ public static ConsistentHash createConsistentHash(Configuration c, Collection addresses) { Hash hf = null; int numVirtualNodes = 1; + GroupManager groupManager = null; if (template instanceof AbstractWheelConsistentHash) { AbstractWheelConsistentHash wTemplate = (AbstractWheelConsistentHash) template; hf = wTemplate.hashFunction; numVirtualNodes = wTemplate.numVirtualNodes; + groupManager = wTemplate.groupManager; } - ConsistentHash ch = constructConsistentHashInstance(template.getClass(), hf, numVirtualNodes); + ConsistentHash ch = constructConsistentHashInstance(template.getClass(), hf, numVirtualNodes, groupManager); if (addresses != null && !addresses.isEmpty()) ch.setCaches(toSet(addresses)); return ch; } diff --git a/core/src/main/java/org/infinispan/distribution/ch/DefaultConsistentHash.java b/core/src/main/java/org/infinispan/distribution/ch/DefaultConsistentHash.java index c35c26877974..3eaecc6b84cb 100644 --- a/core/src/main/java/org/infinispan/distribution/ch/DefaultConsistentHash.java +++ b/core/src/main/java/org/infinispan/distribution/ch/DefaultConsistentHash.java @@ -50,7 +50,7 @@ private int getNumCopiesToFind(int replCount) { } public List
locate(Object key, int replCount) { - int hash = getNormalizedHash(key); + int hash = getNormalizedHash(getGrouping(key)); int numCopiesToFind = getNumCopiesToFind(replCount); List
owners = new ArrayList
(numCopiesToFind); @@ -84,7 +84,7 @@ public List
locate(Object key, int replCount) { @Override public boolean isKeyLocalToAddress(Address target, Object key, int replCount) { - int hash = getNormalizedHash(key); + int hash = getNormalizedHash(getGrouping(key)); int numCopiesToFind = getNumCopiesToFind(replCount); SortedMap candidates = positions.tailMap(hash); diff --git a/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java b/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java index 090817184ebc..fe357a8dd55d 100644 --- a/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java +++ b/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java @@ -112,7 +112,7 @@ public List
getStateProvidersOnJoin(Address joiner, int replCount) { */ private List
getOwners(Address address, int numOwners) { Address realAddress = getRealAddress(address); - int ownerHash = getNormalizedHash(address); + int ownerHash = getNormalizedHash(getGrouping(address)); Collection
beforeOnWheel = positions.headMap(ownerHash).values(); Collection
afterOnWheel = positions.tailMap(ownerHash).values(); ArrayList
processSequence = new ArrayList
(afterOnWheel); @@ -180,7 +180,7 @@ private boolean isSameMachine(Address a, Address b) { } private Address getOwner(Object key) { - int hash = getNormalizedHash(key); + int hash = getNormalizedHash(getGrouping(key)); SortedMap map = positions.tailMap(hash); if (map.size() == 0) { return positions.get(positions.firstKey()); diff --git a/core/src/main/java/org/infinispan/distribution/group/Group.java b/core/src/main/java/org/infinispan/distribution/group/Group.java new file mode 100644 index 000000000000..9e4b88964f5f --- /dev/null +++ b/core/src/main/java/org/infinispan/distribution/group/Group.java @@ -0,0 +1,58 @@ +package org.infinispan.distribution.group; + +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +/** + *

+ * Identifies the key for a group. + *

+ * + *

+ * @Group should be used when you have control over the key class. For example: + *

+ * + *
+ * class User {
+ * 
+ *    ...
+ *    String office;
+ *    ...
+ *    
+ *    int hashCode() {
+ *       // Defines the hash for the key, normally used to determine location
+ *       ...
+ *    }
+ *    
+ *    // Override the location by specifying a group, all keys in the same 
+ *    // group end up with the same owner
+ *    @Group
+ *    String getOffice() {
+ *       return office;
+ *    }
+ *    
+ * }
+ * 
+ * + *

+ * If you don't have control over the key class, you can specify a {@link Grouper} (in your configuration) which can be used to + * specify the group externally. + *

+ * + *

+ * You must set the groupsEnabled property to true in your configuration in order to use groups. + *

+ * + * @see Grouper + * + * @author Pete Muir + * + */ +@Target(METHOD) +@Retention(RUNTIME) +public @interface Group { + +} diff --git a/core/src/main/java/org/infinispan/distribution/group/GroupManager.java b/core/src/main/java/org/infinispan/distribution/group/GroupManager.java new file mode 100644 index 000000000000..39c2a75ab143 --- /dev/null +++ b/core/src/main/java/org/infinispan/distribution/group/GroupManager.java @@ -0,0 +1,19 @@ +package org.infinispan.distribution.group; + +/** + * Control's key grouping. + * + * @author Pete Muir + * + */ +public interface GroupManager { + + /** + * Get the group for a given key + * + * @param key the key for which to get the group + * @return the group, or null if no group is defined for the key + */ + public String getGroup(Object key); + +} diff --git a/core/src/main/java/org/infinispan/distribution/group/GroupManagerImpl.java b/core/src/main/java/org/infinispan/distribution/group/GroupManagerImpl.java new file mode 100644 index 000000000000..ddbeb90fd563 --- /dev/null +++ b/core/src/main/java/org/infinispan/distribution/group/GroupManagerImpl.java @@ -0,0 +1,121 @@ +package org.infinispan.distribution.group; + +import static org.infinispan.util.ReflectionUtil.invokeAccessibly; + +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + +import org.infinispan.util.ReflectionUtil; +import org.infinispan.util.Util; + +public class GroupManagerImpl implements GroupManager { + + private static interface GroupMetadata { + + static final GroupMetadata NONE = new GroupMetadata() { + + public String getGroup(Object instance) { + return null; + } + + }; + + String getGroup(Object instance); + + } + + private static class GroupMetadataImpl implements GroupMetadata { + + + private static Object[] EMPTY_ARGS = new Object[0]; + + private final Method method; + + public GroupMetadataImpl(Method method) { + if (!String.class.isAssignableFrom(method.getReturnType())) + throw new IllegalArgumentException(Util.formatString("@Group method %s must return java.lang.String", method)); + if (method.getParameterTypes().length > 0) + throw new IllegalArgumentException(Util.formatString("@Group method %s must jave zero arguments", method)); + this.method = method; + } + + public String getGroup(Object instance) { + return String.class.cast(invokeAccessibly(instance, method, EMPTY_ARGS)); + } + + } + + private static GroupMetadata createGroupMetadata(Class clazz) { + Collection possibleMethods = ReflectionUtil.getAllMethods(clazz, Group.class); + if (possibleMethods.isEmpty()) + return GroupMetadata.NONE; + else if (possibleMethods.size() == 1) + return new GroupMetadataImpl(possibleMethods.iterator().next()); + else + throw new IllegalStateException(Util.formatString("Cannot define more that one @Group method for class hierarchy rooted at %s", clazz.getName())); + } + + private final ConcurrentMap, Future> groupMetadataCache; + private final List> groupers; + + public GroupManagerImpl(List> groupers) { + this.groupMetadataCache = new ConcurrentHashMap, Future>(); + if (groupers != null) + this.groupers = groupers; + else + this.groupers = Collections.emptyList(); + } + + @Override + public String getGroup(Object key) { + GroupMetadata metadata = getMetadata(key); + if (metadata != null) { + return applyGroupers(metadata.getGroup(key), key); + } else + return applyGroupers(null, key); + } + + private String applyGroupers(String group, Object key) { + for (Grouper grouper : groupers) { + if (grouper.getKeyType().isAssignableFrom(key.getClass())) + group = ((Grouper) grouper).computeGroup(key, group); + } + return group; + } + + private GroupMetadata getMetadata(Object key) { + final Class keyClass = key.getClass(); + if (!groupMetadataCache.containsKey(keyClass)) { + Callable c = new Callable() { + + @Override + public GroupMetadata call() throws Exception { + return createGroupMetadata(keyClass); + } + + }; + FutureTask ft = new FutureTask(c); + if (groupMetadataCache.putIfAbsent(keyClass, ft) == null) { + ft.run(); + } + } + try { + return groupMetadataCache.get(keyClass).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Error extracting @Group from class hierarchy", e); + } catch (ExecutionException e) { + throw new IllegalStateException("Error extracting @Group from class hierarchy", e); + } + } + +} diff --git a/core/src/main/java/org/infinispan/distribution/group/Grouper.java b/core/src/main/java/org/infinispan/distribution/group/Grouper.java new file mode 100644 index 000000000000..134322e23c57 --- /dev/null +++ b/core/src/main/java/org/infinispan/distribution/group/Grouper.java @@ -0,0 +1,64 @@ +package org.infinispan.distribution.group; + +/** + *

+ * User applications may implement this interface in order to customize the compution of groups in cases when the modifying the + * key is not possible, or when the value determined by the {@link Group} annotation needs customizing. + *

+ * + *

+ * Grouper acts as an interceptor, passing the previously computed value in. The group passed to the first + * Grouper will be that determined by @Group (if @Group is defined). + *

+ * + *

+ * For example: + *

+ * + *
+ * public class KXGrouper implements Grouper<String> {
+ * 
+ *     // A pattern that can extract from a "kX" (e.g. k1, k2) style key
+ *     private static Pattern kPattern = Pattern.compile("(ˆk)(\\d)$");
+ * 
+ *     public String computeGroup(String key, String group) {
+ *         Matcher matcher = kPattern.matcher(key);
+ *         if (matcher.matches()) {
+ *             String g = Integer.parseInt(matcher.group(2)) % 2 + "";
+ *             return g;
+ *         } else
+ *             return null;
+ *     }
+ * 
+ *     public Class<String> getKeyType() {
+ *         return String.class;
+ *     }
+ * 
+ * }
+ * 
+ * + *

+ * You must set the + * groupsEnabled property to true in your configuration in order to use groups. You can specify an order list of groupers there. + *

+ * + * @see Group + * + * @author Pete Muir + * + * @param + */ +public interface Grouper { + + /** + * Compute the group for a given key + * + * @param key the key to compute the group for + * @param group the group as currently computed, or null if no group has been determined yet + * @return the group, or null if no group is defined + */ + String computeGroup(T key, String group); + + Class getKeyType(); + +} \ No newline at end of file diff --git a/core/src/main/java/org/infinispan/distribution/group/package-info.java b/core/src/main/java/org/infinispan/distribution/group/package-info.java new file mode 100644 index 000000000000..ae28713efea5 --- /dev/null +++ b/core/src/main/java/org/infinispan/distribution/group/package-info.java @@ -0,0 +1,12 @@ +/** + *

+ * Groups allow keys with differeing hash codes to be co-located on the same node. + *

+ * + *

+ * Infinispan offers support for both instrinsic grouping ( see{@link org.infinispan.distribution.group.Group}) + * and extrinsic grouping (see {@link org.infinispan.distribution.group.Grouper}). + *

+ */ +package org.infinispan.distribution.group; + diff --git a/core/src/main/java/org/infinispan/util/ReflectionUtil.java b/core/src/main/java/org/infinispan/util/ReflectionUtil.java index 39c2d6b84474..52a8878fba1d 100644 --- a/core/src/main/java/org/infinispan/util/ReflectionUtil.java +++ b/core/src/main/java/org/infinispan/util/ReflectionUtil.java @@ -166,10 +166,10 @@ private static Field findFieldRecursively(Class c, String fieldName) { * @param method method to execute * @param parameters parameters */ - public static void invokeAccessibly(Object instance, Method method, Object[] parameters) { + public static Object invokeAccessibly(Object instance, Method method, Object[] parameters) { try { method.setAccessible(true); - method.invoke(instance, parameters); + return method.invoke(instance, parameters); } catch (Exception e) { throw new CacheException("Unable to invoke method " + method + " on object " + //instance + diff --git a/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java b/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java index 9c0d5f99fea6..38468182300f 100644 --- a/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java +++ b/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java @@ -33,6 +33,7 @@ import org.infinispan.distribution.ch.ConsistentHashHelper; import org.infinispan.distribution.ch.DefaultConsistentHash; import org.infinispan.distribution.ch.UnionConsistentHash; +import org.infinispan.distribution.group.Grouper; import org.infinispan.manager.CacheContainer; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.remoting.transport.Address; @@ -47,6 +48,7 @@ import javax.transaction.TransactionManager; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -73,6 +75,8 @@ public abstract class BaseDistFunctionalTest extends MultipleCacheManagersTest { protected int numOwners = 2; protected int lockTimeout = 45; protected int numVirtualNodes = 1; + protected boolean groupsEnabled = false; + protected List> groupers; protected void createCacheManagers() throws Throwable { cacheName = "dist"; @@ -90,6 +94,10 @@ protected void createCacheManagers() throws Throwable { configuration.setLockAcquisitionTimeout(lockTimeout, TimeUnit.SECONDS); configuration.setL1CacheEnabled(l1CacheEnabled); configuration.fluent().clustering().hash().numVirtualNodes(numVirtualNodes); + if (groupsEnabled) { + configuration.fluent().hash().groupsEnabled(true); + configuration.fluent().hash().groupers(groupers); + } if (l1CacheEnabled) configuration.setL1OnRehash(l1OnRehash); if (l1CacheEnabled) configuration.setL1InvalidationThreshold(l1Threshold); caches = createClusteredCaches(INIT_CLUSTER_SIZE, cacheName, configuration); diff --git a/core/src/test/java/org/infinispan/distribution/groups/GroupedKey.java b/core/src/test/java/org/infinispan/distribution/groups/GroupedKey.java new file mode 100644 index 000000000000..9fb15c6ce470 --- /dev/null +++ b/core/src/test/java/org/infinispan/distribution/groups/GroupedKey.java @@ -0,0 +1,33 @@ +package org.infinispan.distribution.groups; + +import org.infinispan.distribution.group.Group; + +public class GroupedKey { + + private final String group; + private final String key; + + public GroupedKey(String group, String key) { + this.group = group; + this.key = key; + } + + @Group + public String getGroup() { + return group; + } + + @Override + public int hashCode() { + return key.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof GroupedKey) + return ((GroupedKey) obj).key.equals(this.key); + else + return false; + } + +} diff --git a/core/src/test/java/org/infinispan/distribution/groups/GroupsChFunctionalTest.java b/core/src/test/java/org/infinispan/distribution/groups/GroupsChFunctionalTest.java new file mode 100644 index 000000000000..6cea5f5861f5 --- /dev/null +++ b/core/src/test/java/org/infinispan/distribution/groups/GroupsChFunctionalTest.java @@ -0,0 +1,112 @@ +/* + * JBoss, Home of Professional Open Source + * Copyright 2011 Red Hat Inc. and/or its affiliates and other + * contributors as indicated by the @author tags. All rights reserved. + * See the copyright.txt in the distribution for a full listing of + * individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.infinispan.distribution.groups; + +import java.util.Collections; + +import org.infinispan.Cache; +import org.infinispan.distribution.DistSyncFuncTest; +import org.infinispan.distribution.group.Grouper; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * @author Pete Muir + * @since 5.0 + */ +@Test (groups = "functional", testName = "distribution.GroupsChFunctionalTest") +public class GroupsChFunctionalTest extends DistSyncFuncTest { + + public GroupsChFunctionalTest() { + groupsEnabled = true; + groupers = Collections.>singletonList(new KXGrouper()); + } + + public void testGrouper() throws Throwable { + for (Cache c : caches) assert c.isEmpty(); + + // Based on the grouping fn which uses computes a group by taking the digit from kX + // and doing a modulo 2 on it we can verify the owners of keys + Assert.assertNotSame(getOwners("k1"), getOwners("k2")); + Assert.assertNotSame(getOwners("k1"), getOwners("k4")); + Assert.assertNotSame(getOwners("k3"), getOwners("k2")); + Assert.assertNotSame(getOwners("k3"), getOwners("k4")); + Assert.assertEquals(getOwners("k1"), getOwners("k3")); + Assert.assertEquals(getOwners("k2"), getOwners("k4")); + + } + + public void testIntrinsicGrouping() throws Throwable { + for (Cache c : caches) assert c.isEmpty(); + + GroupedKey k1 = new GroupedKey("groupA", "k1"); + GroupedKey k2 = new GroupedKey("groupB", "k2"); + GroupedKey k3 = new GroupedKey("groupA", "k3"); + GroupedKey k4 = new GroupedKey("groupB", "k4"); + + Assert.assertNotSame(getOwners(k1), getOwners(k2)); + Assert.assertNotSame(getOwners(k1), getOwners(k4)); + Assert.assertNotSame(getOwners(k3), getOwners(k2)); + Assert.assertNotSame(getOwners(k3), getOwners(k4)); + Assert.assertEquals(getOwners(k1), getOwners(k3)); + Assert.assertEquals(getOwners(k2), getOwners(k4)); + + GroupedKey k1A = new GroupedKey("groupA", "k1"); + GroupedKey k1B = new GroupedKey("groupB", "k1"); + + // Check that the same key in different groups is mapped to different nodes (nb this is not something you want to really do!) + Assert.assertNotSame(getOwners(k1A), getOwners(k1B)); + + } + + public void testRehash() throws Throwable { + for (Cache c : caches) assert c.isEmpty(); + + GroupedKey k1 = new GroupedKey("groupA", "k1"); + GroupedKey k2 = new GroupedKey("groupA", "k2"); + GroupedKey k3 = new GroupedKey("groupA", "k3"); + GroupedKey k4 = new GroupedKey("groupA", "k4"); + + Assert.assertEquals(getOwners(k1), getOwners(k2)); + Assert.assertEquals(getOwners(k1), getOwners(k3)); + Assert.assertEquals(getOwners(k1), getOwners(k4)); + + Cache[] owners1 = getOwners(k1); + Cache[] owners2 = getOwners(k2); + Cache[] owners3 = getOwners(k3); + Cache[] owners4 = getOwners(k4); + + removeCacheFromCluster(getOwners(k1)[0].getName()); + + Assert.assertNotSame(getOwners(k1), owners1); + Assert.assertNotSame(getOwners(k2), owners2); + Assert.assertNotSame(getOwners(k3), owners3); + Assert.assertNotSame(getOwners(k4), owners4); + + Assert.assertEquals(getOwners(k1), getOwners(k2)); + Assert.assertEquals(getOwners(k1), getOwners(k3)); + Assert.assertEquals(getOwners(k1), getOwners(k4)); + + } + +} diff --git a/core/src/test/java/org/infinispan/distribution/groups/GroupsDistAsyncFuncTest.java b/core/src/test/java/org/infinispan/distribution/groups/GroupsDistAsyncFuncTest.java new file mode 100644 index 000000000000..bed69a6f0a11 --- /dev/null +++ b/core/src/test/java/org/infinispan/distribution/groups/GroupsDistAsyncFuncTest.java @@ -0,0 +1,43 @@ +/* + * JBoss, Home of Professional Open Source + * Copyright 2011 Red Hat Inc. and/or its affiliates and other + * contributors as indicated by the @author tags. All rights reserved. + * See the copyright.txt in the distribution for a full listing of + * individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.infinispan.distribution.groups; + +import java.util.Collections; + +import org.infinispan.distribution.DistAsyncFuncTest; +import org.infinispan.distribution.group.Grouper; +import org.testng.annotations.Test; + +/** + * @author Pete Muir + * @since 5.0 + */ +@Test (groups = "functional", testName = "distribution.GroupsDistAsyncFuncTest") +public class GroupsDistAsyncFuncTest extends DistAsyncFuncTest { + + public GroupsDistAsyncFuncTest() { + groupsEnabled = true; + groupers = Collections.>singletonList(new KXGrouper()); + } + +} diff --git a/core/src/test/java/org/infinispan/distribution/groups/GroupsDistSyncUnsafeFuncTest.java b/core/src/test/java/org/infinispan/distribution/groups/GroupsDistSyncUnsafeFuncTest.java new file mode 100644 index 000000000000..b67e1369b4f0 --- /dev/null +++ b/core/src/test/java/org/infinispan/distribution/groups/GroupsDistSyncUnsafeFuncTest.java @@ -0,0 +1,44 @@ +/* + * JBoss, Home of Professional Open Source + * Copyright 2011 Red Hat Inc. and/or its affiliates and other + * contributors as indicated by the @author tags. All rights reserved. + * See the copyright.txt in the distribution for a full listing of + * individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.infinispan.distribution.groups; + +import java.util.Collections; + +import org.infinispan.distribution.DistSyncUnsafeFuncTest; +import org.infinispan.distribution.group.Grouper; +import org.testng.annotations.Test; + +/** + * @author Pete Muir + * @since 5.0 + */ +@Test(testName="distribution.GroupsDistSyncUnsafeFuncTest", groups = "functional") +public class GroupsDistSyncUnsafeFuncTest extends DistSyncUnsafeFuncTest { + + + public GroupsDistSyncUnsafeFuncTest() { + groupsEnabled = true; + groupers = Collections.>singletonList(new KXGrouper()); + } + +} diff --git a/core/src/test/java/org/infinispan/distribution/groups/KXGrouper.java b/core/src/test/java/org/infinispan/distribution/groups/KXGrouper.java new file mode 100644 index 000000000000..59b062ccbf6b --- /dev/null +++ b/core/src/test/java/org/infinispan/distribution/groups/KXGrouper.java @@ -0,0 +1,34 @@ +package org.infinispan.distribution.groups; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.infinispan.distribution.group.Grouper; + +/** + * A simple grouper which groups String based keys using a pattern for kX keys + * @author Pete Muir + * + */ +public class KXGrouper implements Grouper { + + private static Pattern kPattern = Pattern.compile("(^k)(\\d)$"); + + @Override + public String computeGroup(String key, String group) { + Matcher matcher = kPattern.matcher(key); + if (matcher.matches()) { + String g = Integer.parseInt(matcher.group(2)) % 2 + ""; + return g; + } + else + return null; + } + + @Override + public Class getKeyType() { + return String.class; + } + + +}