Skip to content

Commit

Permalink
ISPN-312: Grouping support
Browse files Browse the repository at this point in the history
* Add support for @group for keys which the
  application owns
* Add support for groupers which can group keys
  outside the application
* Group support needs to be explicitly enabled via
  Configuration (due to refl overhead)
* XML config is missing for now
* Javadoc and package Javadoc

Groups currently have one side effect, which is to
override location of keys. If grouping is enabled
and a group is specified this will be used to
determine key owner, rather than key hash.
  • Loading branch information
pmuir authored and maniksurtani committed May 20, 2011
1 parent 11ec110 commit c2d7783
Show file tree
Hide file tree
Showing 18 changed files with 656 additions and 9 deletions.
38 changes: 38 additions & 0 deletions core/src/main/java/org/infinispan/config/Configuration.java
Expand Up @@ -28,6 +28,7 @@
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.DefaultConsistentHash;
import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
import org.infinispan.distribution.group.Grouper;
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.eviction.EvictionThreadPolicy;
import org.infinispan.factories.ComponentRegistry;
Expand Down Expand Up @@ -1257,6 +1258,14 @@ public int getNumOwners() {
public int getNumVirtualNodes() {
return clustering.hash.numVirtualNodes;
}

public boolean isGroupsEnabled() {
return clustering.hash.groupsEnabled;
}

public List<Grouper<?>> getGroupers() {
return clustering.hash.groupers;
}

public boolean isRehashEnabled() {
return clustering.hash.rehashEnabled;
Expand Down Expand Up @@ -3107,6 +3116,12 @@ public static class HashType extends AbstractFluentConfigurationBean implements

@ConfigurationDocRef(bean = HashConfig.class, targetElement = "numVirtualNodes")
protected Integer numVirtualNodes = 1;

@XmlTransient
protected Boolean groupsEnabled = false;

@XmlTransient
protected List<Grouper<?>> groupers = new ArrayList<Grouper<?>>();

public void accept(ConfigurationBeanVisitor v) {
v.visitHashType(this);
Expand Down Expand Up @@ -3256,6 +3271,25 @@ public HashConfig rehashEnabled(Boolean rehashEnabled) {
setRehashEnabled(rehashEnabled);
return this;
}

@Override
public HashConfig groupers(List<Grouper<?>> groupers) {
testImmutability("groupers");
this.groupers = groupers;
return this;
}

@Override
public HashConfig groupsEnabled(Boolean groupsEnabled) {
testImmutability("groupsEnabled");
this.groupsEnabled = groupsEnabled;
return this;
}

@Override
public List<Grouper<?>> getGroupers() {
return groupers;
}

@Override
public boolean equals(Object o) {
Expand All @@ -3270,6 +3304,8 @@ public boolean equals(Object o) {
return false;
if (numOwners != null ? !numOwners.equals(hashType.numOwners) : hashType.numOwners != null) return false;
if (numVirtualNodes != null ? !numVirtualNodes.equals(hashType.numVirtualNodes) : hashType.numVirtualNodes != null) return false;
if (groupsEnabled != null ? !groupsEnabled.equals(hashType.groupsEnabled) : hashType.groupsEnabled != null) return false;
if (groupers != null ? !groupers.equals(hashType.groupers) : hashType.groupers != null) return false;
if (rehashRpcTimeout != null ? !rehashRpcTimeout.equals(hashType.rehashRpcTimeout) : hashType.rehashRpcTimeout != null)
return false;
if (rehashWait != null ? !rehashWait.equals(hashType.rehashWait) : hashType.rehashWait != null) return false;
Expand All @@ -3284,6 +3320,8 @@ public int hashCode() {
result = 31 * result + (hashFunctionClass != null ? hashFunctionClass.hashCode() : 0);
result = 31 * result + (numOwners != null ? numOwners.hashCode() : 0);
result = 31 * result + (numVirtualNodes != null ? numVirtualNodes.hashCode() : 0);
result = 31 * result + (groupsEnabled != null ? groupsEnabled.hashCode() : 0);
result = 31 * result + (groupers != null ? groupers.hashCode() : 0);
result = 31 * result + (rehashWait != null ? rehashWait.hashCode() : 0);
result = 31 * result + (rehashRpcTimeout != null ? rehashRpcTimeout.hashCode() : 0);
result = 31 * result + (rehashEnabled ? 0 : 1);
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/java/org/infinispan/config/FluentConfiguration.java
Expand Up @@ -23,8 +23,12 @@

package org.infinispan.config;

import java.util.List;

import org.infinispan.container.DataContainer;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.group.Group;
import org.infinispan.distribution.group.Grouper;
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.eviction.EvictionThreadPolicy;
import org.infinispan.interceptors.base.CommandInterceptor;
Expand Down Expand Up @@ -641,6 +645,25 @@ public interface HashConfig extends FluentTypes {
*
*/
HashConfig numVirtualNodes(Integer numVirtualNodes);

/**
* Enable grouping support, such that {@link Group} annotations are honoured and any configured
* groupers will be invoked
*/
HashConfig groupsEnabled(Boolean groupsEnabled);

/**
* Controls the groupers used in distribution
*
* @param groupers the groupers to use
* @see #getGroupers()
*/
HashConfig groupers(List<Grouper<?>> groupers);

/**
* Get's the current groupers in use
*/
List<Grouper<?>> getGroupers();
}

/**
Expand Down
Expand Up @@ -22,6 +22,7 @@
*/
package org.infinispan.distribution.ch;

import org.infinispan.distribution.group.GroupManager;
import org.infinispan.remoting.transport.Address;

import java.util.Collection;
Expand All @@ -47,6 +48,8 @@
public abstract class AbstractConsistentHash implements ConsistentHash {

protected Set<Address> caches;

protected GroupManager groupManager;

@Override
public void setCaches(Set<Address> caches) {
Expand Down Expand Up @@ -78,10 +81,26 @@ public boolean isKeyLocalToAddress(Address a, Object key, int replCount) {
return locate(key, replCount).contains(a);
}

public void setGroupManager(GroupManager groupManager) {
this.groupManager = groupManager;
}

@Override
public String toString() {
return getClass().getSimpleName() + " {" +
"caches=" + caches +
'}';
}

/**
* Get the grouping, if any, for this key.
*
* @param key the key to get the grouping for
* @return the group, or if no group is applicable, the key
*/
protected Object getGrouping(Object key) {
String group = groupManager != null ? groupManager.getGroup(key) : null;
return group != null ? group : key;
}

}
Expand Up @@ -22,16 +22,23 @@
*/
package org.infinispan.distribution.ch;

import static java.util.Collections.emptyList;

import org.infinispan.config.Configuration;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.Util;
import org.infinispan.util.hash.Hash;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.infinispan.distribution.group.GroupManager;
import org.infinispan.distribution.group.GroupManagerImpl;
import org.infinispan.distribution.group.Grouper;

/**
* A helper class that handles the construction of consistent hash instances based on configuration.
*
Expand Down Expand Up @@ -69,16 +76,26 @@ private static ConsistentHash constructConsistentHashInstance(Configuration c) {
wch.setHashFunction(h);
wch.setNumVirtualNodes(c.getNumVirtualNodes());
}
if (ch instanceof AbstractConsistentHash) {
AbstractConsistentHash ach = (AbstractConsistentHash) ch;
if (c.isGroupsEnabled())
ach.setGroupManager(new GroupManagerImpl(c.getGroupers()));
}
return ch;
}

private static ConsistentHash constructConsistentHashInstance(Class<? extends ConsistentHash> clazz, Hash hash, int numVirtualNodes) {
private static ConsistentHash constructConsistentHashInstance(Class<? extends ConsistentHash> clazz, Hash hash, int numVirtualNodes, GroupManager groupManager) {
ConsistentHash ch = Util.getInstance(clazz);
if (ch instanceof AbstractWheelConsistentHash) {
AbstractWheelConsistentHash wch = (AbstractWheelConsistentHash) ch;
wch.setHashFunction(hash);
wch.setNumVirtualNodes(numVirtualNodes);
}
if (ch instanceof AbstractConsistentHash) {
AbstractConsistentHash ach = (AbstractConsistentHash) ch;
if (groupManager != null)
ach.setGroupManager(groupManager);
}
return ch;
}

Expand Down Expand Up @@ -151,12 +168,14 @@ public static ConsistentHash createConsistentHash(Configuration c, Collection<Ad
public static ConsistentHash createConsistentHash(ConsistentHash template, Collection<Address> addresses) {
Hash hf = null;
int numVirtualNodes = 1;
GroupManager groupManager = null;
if (template instanceof AbstractWheelConsistentHash) {
AbstractWheelConsistentHash wTemplate = (AbstractWheelConsistentHash) template;
hf = wTemplate.hashFunction;
numVirtualNodes = wTemplate.numVirtualNodes;
groupManager = wTemplate.groupManager;
}
ConsistentHash ch = constructConsistentHashInstance(template.getClass(), hf, numVirtualNodes);
ConsistentHash ch = constructConsistentHashInstance(template.getClass(), hf, numVirtualNodes, groupManager);
if (addresses != null && !addresses.isEmpty()) ch.setCaches(toSet(addresses));
return ch;
}
Expand Down
Expand Up @@ -50,7 +50,7 @@ private int getNumCopiesToFind(int replCount) {
}

public List<Address> locate(Object key, int replCount) {
int hash = getNormalizedHash(key);
int hash = getNormalizedHash(getGrouping(key));
int numCopiesToFind = getNumCopiesToFind(replCount);

List<Address> owners = new ArrayList<Address>(numCopiesToFind);
Expand Down Expand Up @@ -84,7 +84,7 @@ public List<Address> locate(Object key, int replCount) {

@Override
public boolean isKeyLocalToAddress(Address target, Object key, int replCount) {
int hash = getNormalizedHash(key);
int hash = getNormalizedHash(getGrouping(key));
int numCopiesToFind = getNumCopiesToFind(replCount);

SortedMap<Integer, Address> candidates = positions.tailMap(hash);
Expand Down
Expand Up @@ -112,7 +112,7 @@ public List<Address> getStateProvidersOnJoin(Address joiner, int replCount) {
*/
private List<Address> getOwners(Address address, int numOwners) {
Address realAddress = getRealAddress(address);
int ownerHash = getNormalizedHash(address);
int ownerHash = getNormalizedHash(getGrouping(address));
Collection<Address> beforeOnWheel = positions.headMap(ownerHash).values();
Collection<Address> afterOnWheel = positions.tailMap(ownerHash).values();
ArrayList<Address> processSequence = new ArrayList<Address>(afterOnWheel);
Expand Down Expand Up @@ -180,7 +180,7 @@ private boolean isSameMachine(Address a, Address b) {
}

private Address getOwner(Object key) {
int hash = getNormalizedHash(key);
int hash = getNormalizedHash(getGrouping(key));
SortedMap<Integer, Address> map = positions.tailMap(hash);
if (map.size() == 0) {
return positions.get(positions.firstKey());
Expand Down
58 changes: 58 additions & 0 deletions core/src/main/java/org/infinispan/distribution/group/Group.java
@@ -0,0 +1,58 @@
package org.infinispan.distribution.group;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

/**
* <p>
* Identifies the key for a group.
* </p>
*
* <p>
* <code>@Group</code> should be used when you have control over the key class. For example:
* </p>
*
* <pre>
* class User {
*
* ...
* String office;
* ...
*
* int hashCode() {
* // Defines the hash for the key, normally used to determine location
* ...
* }
*
* // Override the location by specifying a group, all keys in the same
* // group end up with the same owner
* @Group
* String getOffice() {
* return office;
* }
*
* }
* </pre>
*
* <p>
* If you don't have control over the key class, you can specify a {@link Grouper} (in your configuration) which can be used to
* specify the group externally.
* </p>
*
* <p>
* You must set the <code>groupsEnabled<code> property to true in your configuration in order to use groups.
* </p>
*
* @see Grouper
*
* @author Pete Muir
*
*/
@Target(METHOD)
@Retention(RUNTIME)
public @interface Group {

}
@@ -0,0 +1,19 @@
package org.infinispan.distribution.group;

/**
* Control's key grouping.
*
* @author Pete Muir
*
*/
public interface GroupManager {

/**
* Get the group for a given key
*
* @param key the key for which to get the group
* @return the group, or null if no group is defined for the key
*/
public String getGroup(Object key);

}

0 comments on commit c2d7783

Please sign in to comment.