Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

ISPN-2918 TopologyAwareConsistentHashFactory doesn't distribute data …

…to nodes evenly

Improve the topology-aware CH algorithm to spread backup copies of segments
to all the nodes.
  • Loading branch information...
commit 65cbbf1811b6165923af04a6eed1d2ddcb996cc4 1 parent da5c3f0
@danberindei danberindei authored
View
2  core/src/main/java/org/infinispan/distribution/ch/OwnershipStatistics.java
@@ -33,7 +33,7 @@
* @author Dan Berindei
* @since 5.2
*/
-class OwnershipStatistics {
+public class OwnershipStatistics {
private final Map<Address, Integer> nodes;
private final int[] primaryOwned;
private final int[] owned;
View
8 core/src/main/java/org/infinispan/distribution/ch/SyncConsistentHashFactory.java
@@ -138,7 +138,7 @@ protected void populateOwnersManySegments(Builder builder, SortedMap<Integer, Ad
// based on numSegments. This is not perfect because we may end up with too many virtual nodes,
// but the only downside in that is a little more shuffling when a node joins/leaves.
int numSegments = builder.getNumSegments();
- int numVirtualNodes = (int) (Math.log(numSegments + 1) / Math.log(2));
+ int numVirtualNodes = (int) (Math.log(builder.getNumOwners() * numSegments + 1) / Math.log(2)) + 1;
int numNodes = builder.getSortedMembers().size();
Map<Integer, Address> primarySegments = new HashMap<Integer, Address>(numNodes * numVirtualNodes);
@@ -216,6 +216,7 @@ public DefaultConsistentHash union(DefaultConsistentHash ch1, DefaultConsistentH
protected static class Builder {
private final Hash hashFunction;
+ private final int numOwners;
private final int actualNumOwners;
private final int numSegments;
private final List<Address> sortedMembers;
@@ -225,6 +226,7 @@ public DefaultConsistentHash union(DefaultConsistentHash ch1, DefaultConsistentH
private Builder(Hash hashFunction, int numOwners, int numSegments, List<Address> members) {
this.hashFunction = hashFunction;
this.numSegments = numSegments;
+ this.numOwners = numOwners;
this.actualNumOwners = Math.min(numOwners, members.size());
this.sortedMembers = sort(members);
this.segmentSize = (int)Math.ceil((double)Integer.MAX_VALUE / numSegments);
@@ -238,6 +240,10 @@ public Hash getHashFunction() {
return hashFunction;
}
+ public int getNumOwners() {
+ return numOwners;
+ }
+
public int getActualNumOwners() {
return actualNumOwners;
}
View
164 core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHashFactory.java
@@ -27,6 +27,8 @@
import java.util.List;
import java.util.Set;
+import org.infinispan.distribution.topologyaware.TopologyInfo;
+import org.infinispan.distribution.topologyaware.TopologyLevel;
import org.infinispan.marshall.AbstractExternalizer;
import org.infinispan.marshall.Ids;
import org.infinispan.remoting.transport.Address;
@@ -39,10 +41,10 @@
* @since 5.2
*/
public class TopologyAwareConsistentHashFactory extends DefaultConsistentHashFactory {
- private enum Level { SITE, RACK, MACHINE, NONE }
@Override
protected void addBackupOwners(Builder builder) {
+ TopologyInfo topologyInfo = new TopologyInfo(builder.getMembers());
int minSegments = builder.getActualNumOwners() * builder.getNumSegments() / builder.getNumNodes();
// 1. Remove extra owners (could be leftovers from addPrimaryOwners).
@@ -52,108 +54,145 @@ protected void addBackupOwners(Builder builder) {
// 2. If owners(segment) < numOwners, add new owners.
// Unlike the parent class, we allow many more segments for one node just in order to get
// as many different sites, racks and machines in the same owner list.
- addBackupOwnersForLevel(builder, minSegments, Level.SITE);
- addBackupOwnersForLevel(builder, minSegments, Level.RACK);
- addBackupOwnersForLevel(builder, minSegments, Level.MACHINE);
+ addBackupOwnersForLevel(builder, topologyInfo, TopologyLevel.SITE);
+ addBackupOwnersForLevel(builder, topologyInfo, TopologyLevel.RACK);
+ addBackupOwnersForLevel(builder, topologyInfo, TopologyLevel.MACHINE);
- addBackupOwnersForLevel(builder, minSegments, Level.NONE);
+ addBackupOwnersForLevel(builder, topologyInfo, TopologyLevel.NODE);
// 3. Now owners(segment) == numOwners for every segment because of steps 1 and 2.
- replaceBackupOwnersForLevel(builder, Level.SITE);
- replaceBackupOwnersForLevel(builder, Level.RACK);
- replaceBackupOwnersForLevel(builder, Level.MACHINE);
+ replaceBackupOwnersForLevel(builder, topologyInfo, TopologyLevel.SITE);
+ replaceBackupOwnersForLevel(builder, topologyInfo, TopologyLevel.RACK);
+ replaceBackupOwnersForLevel(builder, topologyInfo, TopologyLevel.MACHINE);
// Replace owners that have too many segments with owners that have too few.
- replaceBackupOwnerForMachineLevel(builder, minSegments);
+ replaceBackupOwnerNoLevel(builder, topologyInfo);
}
- private void addBackupOwnersForLevel(Builder builder, int minSegments, Level level) {
+ private void addBackupOwnersForLevel(Builder builder, TopologyInfo topologyInfo, TopologyLevel level) {
// In the first phase, the new owners must own < minSegments segments.
// It may not be possible to fill all the segments with numOwners owners this way,
// so we repeat this in a loop, each iteration with a higher limit of owned segments
- int currentMax = minSegments;
- while (doAddBackupOwnersForLevel(builder, currentMax, level)) {
- currentMax++;
+ int extraSegments = 0;
+ while (doAddBackupOwnersForLevel(builder, topologyInfo, level, extraSegments)) {
+ extraSegments++;
}
}
- private boolean doAddBackupOwnersForLevel(Builder builder, int maxSegments, Level level) {
- // Mostly copied from DefaultConsistentHashFactory.doAddBackupOwners, but with an extra location check
+ private boolean doAddBackupOwnersForLevel(Builder builder, TopologyInfo topologyInfo, TopologyLevel level, int extraSegments) {
boolean sufficientOwners = true;
- boolean modified = false;
for (int segment = 0; segment < builder.getNumSegments(); segment++) {
List<Address> owners = builder.getOwners(segment);
- for (Address candidate : builder.getMembers()) {
- if (owners.size() >= builder.getActualNumOwners())
- break;
+ if (owners.size() >= builder.getActualNumOwners())
+ continue;
+
+ int maxDistinctLocations = topologyInfo.getDistinctLocationsCount(level, builder.getActualNumOwners());
+ int distinctLocations = new TopologyInfo(owners).getDistinctLocationsCount(level, builder.getActualNumOwners());
+ if (distinctLocations == maxDistinctLocations)
+ continue;
+ for (Address candidate : builder.getMembers()) {
+ int maxSegments = topologyInfo.computeMaxSegments(builder.getNumSegments(),
+ builder.getActualNumOwners(), candidate) + extraSegments;
if (builder.getOwned(candidate) < maxSegments) {
- if (!owners.contains(candidate) && !locationIsDuplicate(candidate, owners, level)) {
+ if (!owners.contains(candidate) && !locationIsDuplicate(owners, candidate, level)) {
builder.addOwner(segment, candidate);
- modified = true;
+ distinctLocations++;
+ // The owners list is live, no need to query it again
+ if (owners.size() >= builder.getActualNumOwners())
+ break;
}
}
}
- sufficientOwners &= owners.size() >= builder.getActualNumOwners();
+
+ if (distinctLocations < maxDistinctLocations && owners.size() < builder.getActualNumOwners()) {
+ sufficientOwners = false;
+ }
}
- // If we didn't add any owners this time, we won't add any owners with a higher maxSegments either
- return !sufficientOwners && modified;
+ return !sufficientOwners;
}
- protected void replaceBackupOwnersForLevel(Builder builder, Level level) {
+ private void replaceBackupOwnersForLevel(Builder builder, TopologyInfo topologyInfo, TopologyLevel level) {
+ int extraSegments = 0;
+ while (doReplaceBackupOwnersForLevel(builder, topologyInfo, level, extraSegments)) {
+ extraSegments++;
+ }
+ }
+
+ private boolean doReplaceBackupOwnersForLevel(Builder builder, TopologyInfo topologyInfo,
+ TopologyLevel level, int extraSegments) {
+ boolean sufficientLocations = true;
// At this point each segment already has actualNumOwners owners.
for (int segment = 0; segment < builder.getNumSegments(); segment++) {
List<Address> owners = builder.getOwners(segment);
- List<Address> backupOwners = builder.getBackupOwners(segment);
- for (int i = backupOwners.size() - 1; i >= 0; i--) {
- Address owner = backupOwners.get(i);
- if (locationIsDuplicate(owner, owners, level)) {
+ int maxDistinctLocations = topologyInfo.getDistinctLocationsCount(level, builder.getActualNumOwners());
+ int distinctLocations = new TopologyInfo(owners).getDistinctLocationsCount(level, builder.getActualNumOwners());
+ if (distinctLocations == maxDistinctLocations)
+ continue;
+
+ for (int i = owners.size() - 1; i >= 1; i--) {
+ Address owner = owners.get(i);
+ if (locationIsDuplicate(owners, owner, level)) {
// Got a duplicate site/rack/machine, we might have an alternative for it.
for (Address candidate : builder.getMembers()) {
- if (!owners.contains(candidate) && !locationIsDuplicate(candidate, owners, level)) {
- builder.addOwner(segment, candidate);
- builder.removeOwner(segment, owner);
- // Update the owners list, needed for the locationIsDuplicate check.
- owners = builder.getOwners(segment);
- backupOwners = builder.getBackupOwners(segment);
- break;
+ int maxSegments = topologyInfo.computeMaxSegments(builder.getNumSegments(),
+ builder.getActualNumOwners(), candidate);
+ if (builder.getOwned(candidate) < maxSegments + extraSegments) {
+ if (!owners.contains(candidate) && !locationIsDuplicate(owners, candidate, level)) {
+ builder.addOwner(segment, candidate);
+ builder.removeOwner(segment, owner);
+ distinctLocations++;
+ // The owners list is live, no need to query it again
+ break;
+ }
}
}
}
}
+
+ if (distinctLocations < maxDistinctLocations) {
+ sufficientLocations = false;
+ }
}
+ return !sufficientLocations;
}
- protected void replaceBackupOwnerForMachineLevel(Builder builder, int minSegments) {
- // 3.1. If there is an owner with owned(owner) > minSegments + 1, find another node
- // with owned(node) < minSegments and replace that owner with it.
- doReplaceBackupOwnersSameMachine(builder, minSegments, minSegments + 1);
- // 3.2. Same as step 3.1, but also replace owners that own minSegments + 1 segments.
+ private void replaceBackupOwnerNoLevel(Builder builder, TopologyInfo topologyInfo) {
+ // 3.1. If there is an owner with owned(owner) > maxSegments, find another node
+ // with owned(node) < maxSegments and replace that owner with it.
+ doReplaceBackupOwnersNoLevel(builder, topologyInfo, -1, 0);
+ // 3.2. Same as step 3.1, but also replace owners that own maxSegments segments.
// Doing this in a separate iteration minimizes the number of moves from nodes with
- // owned(node) == minSegments + 1, when numOwners*numSegments doesn't divide evenly with numNodes.
- doReplaceBackupOwnersSameMachine(builder, minSegments, minSegments);
- // 3.3. Same as step 3.1, but allow replacing with nodes that already have owned(node) = minSegments.
+ // owned(node) == maxSegments, when numOwners*numSegments doesn't divide evenly with numNodes.
+ doReplaceBackupOwnersNoLevel(builder, topologyInfo, -1, -1);
+ // 3.3. Same as step 3.1, but allow replacing with nodes that already have owned(node) = maxSegments - 1.
// Necessary when numOwners*numSegments doesn't divide evenly with numNodes,
- // because all nodes could own minSegments segments and yet one node could own
- // minSegments + (numOwners*numSegments % numNodes) segments.
- doReplaceBackupOwnersSameMachine(builder, minSegments + 1, minSegments + 1);
+ // because all nodes could own maxSegments - 1 segments and yet one node could own
+ // maxSegments + (numOwners*numSegments % numNodes) segments.
+ doReplaceBackupOwnersNoLevel(builder, topologyInfo, 0, 0);
}
- private void doReplaceBackupOwnersSameMachine(Builder builder, int minSegments, int maxSegments) {
+ private void doReplaceBackupOwnersNoLevel(Builder builder, TopologyInfo topologyInfo,
+ int minSegmentsDiff, int maxSegmentsDiff) {
// Iterate over the owners in the outer loop so that we minimize the number of owner changes
// for the same segment. At this point each segment already has actualNumOwners owners.
for (int ownerIdx = builder.getActualNumOwners() - 1; ownerIdx >= 1; ownerIdx--) {
for (int segment = 0; segment < builder.getNumSegments(); segment++) {
List<Address> owners = builder.getOwners(segment);
Address owner = owners.get(ownerIdx);
+ int maxSegments = topologyInfo.computeMaxSegments(builder.getNumSegments(),
+ builder.getActualNumOwners(), owner) + maxSegmentsDiff;
if (builder.getOwned(owner) > maxSegments) {
// Owner has too many segments. Find another node to replace it with.
for (Address candidate : builder.getMembers()) {
+ int minSegments = topologyInfo.computeMaxSegments(builder.getNumSegments(),
+ builder.getActualNumOwners(), candidate) + minSegmentsDiff;
if (builder.getOwned(candidate) < minSegments) {
- if (!owners.contains(candidate) && maintainsMachines(owners, candidate, owner)) {
+ if (!owners.contains(candidate) && maintainsDiversity(owners, candidate, owner)) {
builder.addOwner(segment, candidate);
builder.removeOwner(segment, owner);
+ // The owners list is live, no need to query it again
break;
}
}
@@ -163,7 +202,7 @@ private void doReplaceBackupOwnersSameMachine(Builder builder, int minSegments,
}
}
- private Object getLocationId(Address address, Level level) {
+ private Object getLocationId(Address address, TopologyLevel level) {
TopologyAwareAddress taa = (TopologyAwareAddress) address;
Object locationId;
switch (level) {
@@ -176,7 +215,7 @@ private Object getLocationId(Address address, Level level) {
case MACHINE:
locationId = taa.getSiteId() + "|" + taa.getRackId() + "|" + taa.getMachineId();
break;
- case NONE:
+ case NODE:
locationId = address;
break;
default:
@@ -185,27 +224,36 @@ private Object getLocationId(Address address, Level level) {
return locationId;
}
- private boolean locationIsDuplicate(Address target, List<Address> addresses, Level level) {
+ private boolean locationIsDuplicate(List<Address> addresses, Address target, TopologyLevel level) {
+ Object targetLocationId = getLocationId(target, level);
for (Address address : addresses) {
- if (address != target && getLocationId(address, level).equals(getLocationId(target, level)))
+ if (address != target && getLocationId(address, level).equals(targetLocationId))
return true;
}
return false;
}
- private boolean maintainsMachines(List<Address> owners, Address candidate, Address replaced) {
+ private boolean maintainsDiversity(List<Address> owners, Address candidate, Address replaced) {
+ return maintainsDiversity(owners, candidate, replaced, TopologyLevel.SITE)
+ && maintainsDiversity(owners, candidate, replaced, TopologyLevel.RACK)
+ && maintainsDiversity(owners, candidate, replaced, TopologyLevel.MACHINE);
+ }
+
+ private boolean maintainsDiversity(List<Address> owners, Address candidate, Address replaced, TopologyLevel machine) {
+ Set<Object> oldMachines = new HashSet<Object>(owners.size());
Set<Object> newMachines = new HashSet<Object>(owners.size());
- newMachines.add(getLocationId(candidate, Level.MACHINE));
+ newMachines.add(getLocationId(candidate, machine));
for (Address node : owners) {
+ oldMachines.add(getLocationId(node, machine));
if (!node.equals(replaced)) {
- newMachines.add(getLocationId(node, Level.MACHINE));
+ newMachines.add(getLocationId(node, machine));
}
}
- return newMachines.contains(getLocationId(replaced, Level.MACHINE));
+ return newMachines.size() >= oldMachines.size();
}
-
+
public static class Externalizer extends AbstractExternalizer<TopologyAwareConsistentHashFactory> {
@Override
View
28 .../src/main/java/org/infinispan/distribution/ch/TopologyAwareSyncConsistentHashFactory.java
@@ -28,6 +28,7 @@
import java.util.Set;
import java.util.SortedMap;
+import org.infinispan.distribution.topologyaware.TopologyLevel;
import org.infinispan.marshall.AbstractExternalizer;
import org.infinispan.marshall.Ids;
import org.infinispan.remoting.transport.Address;
@@ -49,9 +50,6 @@
* @since 5.2
*/
public class TopologyAwareSyncConsistentHashFactory extends SyncConsistentHashFactory {
-
- private enum Level { SITE, RACK, MACHINE, NONE }
-
@Override
protected void populateOwnersFewSegments(Builder builder, SortedMap<Integer, Address> primarySegments) {
// Too few segments for each member to have one "primary segment",
@@ -67,13 +65,13 @@ protected void populateOwnersFewSegments(Builder builder, SortedMap<Integer, Add
// Continue with the backup owners. Assign each member as owner to one segment,
// then repeat until each segment has numOwners owners.
- populateBackupOwners(builder, Level.SITE);
- populateBackupOwners(builder, Level.RACK);
- populateBackupOwners(builder, Level.MACHINE);
- populateBackupOwners(builder, Level.NONE);
+ populateBackupOwners(builder, TopologyLevel.SITE);
+ populateBackupOwners(builder, TopologyLevel.RACK);
+ populateBackupOwners(builder, TopologyLevel.MACHINE);
+ populateBackupOwners(builder, TopologyLevel.NODE);
}
- private boolean populateBackupOwners(Builder builder, Level level) {
+ private boolean populateBackupOwners(Builder builder, TopologyLevel level) {
boolean modified = false;
// Try to add each node as an owner to one segment
for (Address member : builder.getSortedMembers()) {
@@ -98,15 +96,15 @@ protected void populateOwnersManySegments(Builder builder, SortedMap<Integer, Ad
// to populate the owner lists. For each segment assign the owners of the next numOwners
// "primary segments" as owners.
for (int segment = 0; segment < builder.getNumSegments(); segment++) {
- populateSegmentOwners(builder, primarySegments, segment, Level.SITE);
- populateSegmentOwners(builder, primarySegments, segment, Level.RACK);
- populateSegmentOwners(builder, primarySegments, segment, Level.MACHINE);
- populateSegmentOwners(builder, primarySegments, segment, Level.NONE);
+ populateSegmentOwners(builder, primarySegments, segment, TopologyLevel.SITE);
+ populateSegmentOwners(builder, primarySegments, segment, TopologyLevel.RACK);
+ populateSegmentOwners(builder, primarySegments, segment, TopologyLevel.MACHINE);
+ populateSegmentOwners(builder, primarySegments, segment, TopologyLevel.NODE);
}
}
private void populateSegmentOwners(Builder builder, SortedMap<Integer, Address> primarySegments,
- int segment, Level level) {
+ int segment, TopologyLevel level) {
List<Address> owners = builder.getOwners(segment);
if (owners.size() >= builder.getActualNumOwners())
return;
@@ -127,7 +125,7 @@ private void populateSegmentOwners(Builder builder, SortedMap<Integer, Address>
}
}
- private boolean locationAlreadyAdded(Address candidate, List<Address> owners, Level level) {
+ private boolean locationAlreadyAdded(Address candidate, List<Address> owners, TopologyLevel level) {
TopologyAwareAddress topologyAwareCandidate = (TopologyAwareAddress) candidate;
boolean locationAlreadyAdded = false;
for (Address owner : owners) {
@@ -142,7 +140,7 @@ private boolean locationAlreadyAdded(Address candidate, List<Address> owners, Le
case MACHINE:
locationAlreadyAdded = topologyAwareCandidate.isSameMachine(topologyAwareOwner);
break;
- case NONE:
+ case NODE:
locationAlreadyAdded = owner.equals(candidate);
}
if (locationAlreadyAdded)
View
302 core/src/main/java/org/infinispan/distribution/topologyaware/TopologyInfo.java
@@ -0,0 +1,302 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other
+ * contributors as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.distribution.topologyaware;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.remoting.transport.TopologyAwareAddress;
+
+
+/**
+ * This class holds the topology hierarchy of a cache's members.
+ *
+ * @author Dan Berindei
+ * @since 5.2
+ */
+public class TopologyInfo {
+ private final Map<String, Site> allSites = new HashMap<String, Site>();
+ private List<Rack> allRacks = new ArrayList<Rack>();
+ private List<Machine> allMachines = new ArrayList<Machine>();
+ int allNodesCount;
+
+ public TopologyInfo(Collection<Address> members) {
+ for (Address node : members) {
+ addTopology(node);
+ }
+ }
+
+ private void addTopology(Address node) {
+ TopologyAwareAddress taNode = (TopologyAwareAddress) node;
+ String siteId = taNode.getSiteId();
+ String rackId = taNode.getRackId();
+ String machineId = taNode.getMachineId();
+
+ Site site = allSites.get(siteId);
+ if (site == null) {
+ site = new Site(siteId);
+ allSites.put(siteId, site);
+ }
+ Rack rack = site.racks.get(rackId);
+ if (rack == null) {
+ rack = new Rack(siteId, rackId);
+ site.racks.put(rackId, rack);
+ allRacks.add(rack);
+ }
+ Machine machine = rack.machines.get(machineId);
+ if (machine == null) {
+ machine = new Machine(siteId, rackId, machineId);
+ rack.machines.put(machineId, machine);
+ allMachines.add(machine);
+ }
+ machine.nodes.add(node);
+ rack.nodes.add(node);
+ site.nodes.add(node);
+ allNodesCount++;
+ }
+
+ public Collection<Address> getSiteNodes(String site) {
+ return allSites.get(site).nodes;
+ }
+
+ public Collection<Address> getRackNodes(String site, String rack) {
+ return allSites.get(site).racks.get(rack).nodes;
+ }
+
+ public Collection<Address> getMachineNodes(String site, String rack, String machine) {
+ return allSites.get(site).racks.get(rack).machines.get(machine).nodes;
+ }
+
+ public Set<String> getAllSites() {
+ return allSites.keySet();
+ }
+
+ public Set<String> getSiteRacks(String site) {
+ return allSites.get(site).racks.keySet();
+ }
+
+ public Set<String> getRackMachines(String site, String rack) {
+ return allSites.get(site).racks.get(rack).machines.keySet();
+ }
+
+ public int getAllSitesCount() {
+ return allSites.size();
+ }
+
+ public int getAllRacksCount() {
+ return allRacks.size();
+ }
+
+ public int getAllMachinesCount() {
+ return allMachines.size();
+ }
+
+ public int getAllNodesCount() {
+ return allNodesCount;
+ }
+
+ public int getDistinctLocationsCount(TopologyLevel level, int numOwners) {
+ switch (level) {
+ case NODE:
+ return Math.min(numOwners, getAllNodesCount());
+ case MACHINE:
+ return Math.min(numOwners, getAllMachinesCount());
+ case RACK:
+ return Math.min(numOwners, getAllRacksCount());
+ case SITE:
+ return Math.min(numOwners, getAllSitesCount());
+ default:
+ throw new IllegalArgumentException("Unexpected topology level: " + level);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("TopologyInfo{\n");
+ for (Map.Entry<String, Site> site : allSites.entrySet()) {
+ String siteId = site.getKey();
+ sb.append(String.format("%s: {", siteId));
+ for (Map.Entry<String, Rack> rack : site.getValue().racks.entrySet()) {
+ String rackId = rack.getKey();
+ sb.append(String.format("%s: {", rackId));
+ for (Map.Entry<String, Machine> machine : rack.getValue().machines.entrySet()) {
+ String machineId = machine.getKey();
+ sb.append(String.format("%s: {", machineId));
+ for (Address node : machine.getValue().nodes) {
+ sb.append(node);
+ sb.append(", ");
+ }
+ sb.setLength(sb.length() - 2);
+ sb.append("}, ");
+ }
+ sb.setLength(sb.length() - 3);
+ sb.append("}, ");
+ }
+ sb.setLength(sb.length() - 3);
+ sb.append("}, ");
+ }
+ sb.setLength(sb.length() - 3);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ public double computeMaxSegmentsForNode(int numSegments, double numCopies, int nodesCount) {
+ if (nodesCount < numCopies) {
+ return numSegments;
+ } else {
+ // The number of segment copies on each node should be the same
+ return numCopies * numSegments / nodesCount;
+ }
+ }
+
+ public double computeMaxSegmentsForMachine(int numSegments, double numCopies, Collection<Machine> machines,
+ Machine machine) {
+ // The number of segment copies on each machine should be the same, except where not possible
+ double copiesPerMachine = numCopies / machines.size();
+ if (machine.nodes.size() <= copiesPerMachine) {
+ copiesPerMachine = 1;
+ } else {
+ int fullMachines = 0;
+ for (Machine m : machines) {
+ if (m.nodes.size() <= copiesPerMachine) {
+ fullMachines++;
+ }
+ }
+ copiesPerMachine = (numCopies - fullMachines) / (machines.size() - fullMachines);
+ }
+ return computeMaxSegmentsForNode(numSegments, copiesPerMachine, machine.nodes.size());
+ }
+
+ public double computeMaxSegmentsForRack(int numSegments, double numCopies, Collection<Rack> racks, Rack rack,
+ Machine machine) {
+ // Not enough racks to have an owner in each rack.
+ // The number of segment copies on each Rack should be the same, except where not possible
+ double copiesPerRack = numCopies / racks.size();
+ if (rack.machines.size() <= copiesPerRack) {
+ copiesPerRack = 1;
+ } else {
+ int fullRacks = 0;
+ for (Rack m : racks) {
+ if (m.machines.size() <= copiesPerRack) {
+ fullRacks++;
+ }
+ }
+ copiesPerRack = (numCopies - fullRacks) / (racks.size() - fullRacks);
+ }
+ if (copiesPerRack <= 1) {
+ return computeMaxSegmentsForNode(numSegments, copiesPerRack, rack.nodes.size());
+ } else {
+ return computeMaxSegmentsForMachine(numSegments, copiesPerRack, rack.machines.values(), machine);
+ }
+ }
+
+ public double computeMaxSegmentsForSite(int numSegments, double numCopies, Collection<Site> sites,
+ Site site, Rack rack, Machine machine) {
+ // Not enough allSites to have an owner in each site.
+ // The number of segment copies on each Site should be the same, except where not possible
+ double copiesPerSite = numCopies / sites.size();
+ if (site.racks.size() <= copiesPerSite) {
+ copiesPerSite = 1;
+ } else {
+ int fullSites = 0;
+ for (Site s : sites) {
+ if (s.racks.size() <= copiesPerSite) {
+ fullSites++;
+ }
+ }
+ // need to compute for racks if there are enough racks in total
+ copiesPerSite = (numCopies - fullSites) / (sites.size() - fullSites);
+ }
+ if (copiesPerSite <= 1) {
+ return computeMaxSegmentsForNode(numSegments, copiesPerSite, site.nodes.size());
+ } else {
+ return computeMaxSegmentsForRack(numSegments, copiesPerSite, site.racks.values(), rack, machine);
+ }
+ }
+
+
+ public int computeMaxSegments(int numSegments, int numOwners, Address node) {
+ TopologyAwareAddress taa = (TopologyAwareAddress) node;
+ String siteId = taa.getSiteId();
+ String rackId = taa.getRackId();
+ String machineId = taa.getMachineId();
+
+ Site site = allSites.get(siteId);
+ Rack rack = site.racks.get(rackId);
+ Machine machine = rack.machines.get(machineId);
+
+ double maxSegments;
+ if (numOwners == 1) {
+ maxSegments = computeMaxSegmentsForNode(numSegments, numOwners, allNodesCount);
+ } else if (getAllNodesCount() <= numOwners) {
+ maxSegments = numSegments;
+ } else if (getAllMachinesCount() <= numOwners) {
+ maxSegments = computeMaxSegmentsForMachine(numSegments, numOwners, allMachines, machine);
+ } else if (getAllRacksCount() <= numOwners) {
+ maxSegments = computeMaxSegmentsForRack(numSegments, numOwners, allRacks, rack, machine);
+ } else {
+ maxSegments = computeMaxSegmentsForSite(numSegments, numOwners, allSites.values(), site, rack, machine);
+ }
+ return (int) Math.ceil(maxSegments);
+ }
+
+ private static class Site {
+ String site;
+ Map<String, Rack> racks = new HashMap<String, Rack>();
+ List<Address> nodes = new ArrayList<Address>();
+
+ private Site(String site) {
+ this.site = site;
+ }
+ }
+
+ private static class Rack {
+ String site;
+ String rack;
+ Map<String, Machine> machines = new HashMap<String, Machine>();
+ List<Address> nodes = new ArrayList<Address>();
+
+ private Rack(String site, String rack) {
+ this.site = site;
+ this.rack = rack;
+ }
+ }
+
+ private static class Machine {
+ String site;
+ String rack;
+ String machine;
+ List<Address> nodes = new ArrayList<Address>();
+
+ private Machine(String site, String rack, String machine) {
+ this.site = site;
+ this.rack = rack;
+ this.machine = machine;
+ }
+ }
+}
View
34 core/src/main/java/org/infinispan/distribution/topologyaware/TopologyLevel.java
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+
+package org.infinispan.distribution.topologyaware;
+
+/**
+ * The cluster topology is a tree with five levels: the entire cluster, sites, racks, machines, and
+ * individual nodes.
+ *
+ * @author Dan Berindei
+ * @since 5.2
+ */
+public enum TopologyLevel {
+ NODE,
+ MACHINE,
+ RACK,
+ SITE,
+}
View
12 core/src/test/java/org/infinispan/distribution/ch/DefaultConsistentHashFactoryTest.java
@@ -204,11 +204,11 @@ protected int maxPrimaryOwned(int numSegments, int numNodes) {
}
protected int minOwned(int numSegments, int numNodes, int actualNumOwners) {
- return numSegments * actualNumOwners / numNodes;
+ return Math.min(numSegments, numSegments * actualNumOwners / numNodes);
}
protected int maxOwned(int numSegments, int numNodes, int actualNumOwners) {
- return (int) Math.ceil((double)numSegments * actualNumOwners / numNodes);
+ return Math.min(numSegments, (int) Math.ceil((double)numSegments * actualNumOwners / numNodes));
}
protected int allowedMoves(int numSegments, int numOwners, Collection<Address> oldMembers,
@@ -269,18 +269,18 @@ public void test1() {
TestAddress D = new TestAddress(3, "D");
DefaultConsistentHash ch1 = chf.create(new MurmurHash3(), 2, 60, Arrays.<Address>asList(A));
- System.out.println(ch1);
+ //System.out.println(ch1);
DefaultConsistentHash ch2 = chf.updateMembers(ch1, Arrays.<Address>asList(A, B));
ch2 = chf.rebalance(ch2);
- System.out.println(ch2);
+ //System.out.println(ch2);
DefaultConsistentHash ch3 = chf.updateMembers(ch2, Arrays.<Address>asList(A, B, C));
ch3 = chf.rebalance(ch3);
- System.out.println(ch3);
+ //System.out.println(ch3);
DefaultConsistentHash ch4 = chf.updateMembers(ch3, Arrays.<Address>asList(A, B, C, D));
ch4 = chf.rebalance(ch4);
- System.out.println(ch4);
+ //System.out.println(ch4);
}
}
View
225 ...est/java/org/infinispan/distribution/ch/SyncConsistentHashFactoryKeyDistributionTest.java
@@ -0,0 +1,225 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 021101301, USA.
+ */
+
+package org.infinispan.distribution.ch;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.infinispan.commons.hash.MurmurHash3;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
+import org.infinispan.test.AbstractInfinispanTest;
+import org.testng.annotations.Test;
+
+import static java.lang.Math.sqrt;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Tests the uniformity of the SyncConsistentHashFactory algorithm, which is very similar to the 5.1
+ * default consistent hash algorithm virtual nodes.
+ *
+ * <p>This test assumes that key hashes are random and follow a uniform distribution so a key has the same chance
+ * to land on each one of the 2^31 positions on the hash wheel.
+ *
+ * <p>The output should stay pretty much the same between runs, so I added and example output here: vnodes_key_dist.txt.
+ *
+ * <p>Notes about the test output:
+ * <ul>
+ * <li>{@code P(p)} is the probability of proposition {@code p} being true
+ * <li>In the "Primary" rows {@code mean == total_keys / num_nodes} (each key has only one primary owner),
+ * but in the "Any owner" rows {@code mean == total_keys / num_nodes * num_owners} (each key is stored on
+ * {@code num_owner} nodes).
+ * </ul>
+ * @author Dan Berindei
+ * @since 5.2
+ */
+@Test(testName = "distribution.ch.SyncConsistentHashFactoryKeyDistributionTest", groups = "manual", enabled = false, description = "See the results in vnodes_key_dist.txt")
+public class SyncConsistentHashFactoryKeyDistributionTest extends AbstractInfinispanTest {
+
+ // numbers of nodes to test
+ public static final int[] NUM_NODES = {2, 4, 8, 16, 32, 48, 64, 128, 256};
+ // numbers of virtual nodes to test
+ public static final int[] NUM_SEGMENTS = {64, 256, 1024, 4096, 163841};
+ // number of key owners
+ public static final int NUM_OWNERS = 2;
+
+ // controls precision + duration of test
+ public static final int LOOPS = 2000;
+ // confidence intervals to print for any owner
+ public static final double[] INTERVALS = { 1.25 };
+ // confidence intervals to print for primary owner
+ public static final double[] INTERVALS_PRIMARY = { 1.5 };
+ // percentiles to print
+ public static final double[] PERCENTILES = { .999 };
+
+ private DefaultConsistentHash createConsistentHash(int numSegments, int numOwners, int numNodes) {
+ MurmurHash3 hash = new MurmurHash3();
+ SyncConsistentHashFactory chf = new SyncConsistentHashFactory();
+ DefaultConsistentHash ch = chf.create(hash, numOwners, numSegments, createAddresses(numNodes));
+ return ch;
+ }
+
+ private List<Address> createAddresses(int numNodes) {
+ ArrayList<Address> addresses = new ArrayList<Address>(numNodes);
+ for (int i = 0; i < numNodes; i++) {
+ addresses.add(new IndexedJGroupsAddress(org.jgroups.util.UUID.randomUUID(), i));
+ }
+ return addresses;
+ }
+
+ public void testDistribution() {
+ for (int nn : NUM_NODES) {
+ Map<String, Map<Integer, String>> metrics = new TreeMap<String, Map<Integer, String>>();
+ for (int ns : NUM_SEGMENTS) {
+ for (Map.Entry<String, String> entry : computeMetrics(ns, NUM_OWNERS, nn).entrySet()) {
+ String metricName = entry.getKey();
+ String metricValue = entry.getValue();
+ Map<Integer, String> metric = metrics.get(metricName);
+ if (metric == null) {
+ metric = new HashMap<Integer, String>();
+ metrics.put(metricName, metric);
+ }
+ metric.put(ns, metricValue);
+ };
+ }
+
+ printMetrics(nn, metrics);
+ }
+ }
+
+ private void printMetrics(int nn, Map<String, Map<Integer, String>> metrics) {
+ // print the header
+ System.out.printf("Distribution for %3d nodes\n===\n", nn);
+ System.out.printf("%54s = ", "Segments");
+ for (int i = 0; i < NUM_SEGMENTS.length; i++) {
+ System.out.printf("%7d", NUM_SEGMENTS[i]);
+ }
+ System.out.println();
+
+ // print each metric for each vnodes setting
+ for (Map.Entry<String, Map<Integer, String>> entry : metrics.entrySet()) {
+ String metricName = entry.getKey();
+ Map<Integer, String> metricValues = entry.getValue();
+
+ System.out.printf("%54s = ", metricName);
+ for (int i = 0; i < NUM_SEGMENTS.length; i++) {
+ System.out.print(metricValues.get(NUM_SEGMENTS[i]));
+ }
+ System.out.println();
+ }
+ System.out.println();
+ }
+
+ private Map<String, String> computeMetrics(int numSegments, int numOwners, int numNodes) {
+ Map<String, String> metrics = new HashMap<String, String>();
+ long[] distribution = new long[LOOPS * numNodes];
+ long[] distributionPrimary = new long[LOOPS * numNodes];
+ int distIndex = 0;
+ for (int i = 0; i < LOOPS; i++) {
+ DefaultConsistentHash ch = createConsistentHash(numSegments, numOwners, numNodes);
+ OwnershipStatistics stats = new OwnershipStatistics(ch, ch.getMembers());
+ for (Address node : ch.getMembers()) {
+ distribution[distIndex] = stats.getOwned(node);
+ distributionPrimary[distIndex] = stats.getPrimaryOwned(node);
+ distIndex++;
+ }
+ }
+ Arrays.sort(distribution);
+ Arrays.sort(distributionPrimary);
+
+ addMetrics(metrics, "Any owner:", numSegments, numOwners, numNodes, distribution, INTERVALS);
+ addMetrics(metrics, "Primary:", numSegments, 1, numNodes, distributionPrimary, INTERVALS_PRIMARY);
+ return metrics;
+ }
+
+ private void addMetrics(Map<String, String> metrics, String prefix, int numSegments, int numOwners,
+ int numNodes, long[] distribution, double[] intervals) {
+ double mean = 0;
+ long sum = 0;
+ for (long x : distribution) sum += x;
+ assertEquals(sum, (long) LOOPS * numOwners * numSegments);
+ mean = sum / numNodes / LOOPS;
+
+ double variance = 0;
+ for (long x : distribution) variance += (x - mean) * (x - mean);
+
+ double stdDev = sqrt(variance);
+ // metrics.put(prefix + " relative standard deviation", stdDev / mean);
+
+ long max = distribution[distribution.length - 1];
+ // metrics.put(prefix + " min", (double) min / mean);
+ addDoubleMetric(metrics, prefix + " max(num_keys(node)/mean)", (double) max / mean);
+
+ double[] intervalConfidence = new double[intervals.length];
+ int intervalIndex = 0;
+ for (int i = 0; i < distribution.length; i++) {
+ long x = distribution[i];
+ if (x > intervals[intervalIndex] * mean) {
+ intervalConfidence[intervalIndex] = (double) i / distribution.length;
+ intervalIndex++;
+ if (intervalIndex >= intervals.length)
+ break;
+ }
+ }
+ for (int i = intervalIndex; i < intervals.length; i++) {
+ intervalConfidence[i] = 1.;
+ }
+
+ for (int i = 0; i < intervals.length; i++) {
+ if (intervals[i] < 1) {
+ addPercentageMetric(metrics, String.format("%s P(num_keys(node) < %3.2f * mean)", prefix, intervals[i]), intervalConfidence[i]);
+ } else {
+ addPercentageMetric(metrics, String.format("%s P(num_keys(node) > %3.2f * mean)", prefix, intervals[i]), 1 - intervalConfidence[i]);
+ }
+ }
+
+ double[] percentiles = new double[PERCENTILES.length];
+ for (int i = 0; i < PERCENTILES.length; i++) {
+ percentiles[i] = (double)distribution[(int) Math.ceil(PERCENTILES[i] * (LOOPS * numNodes + 1))] / mean;
+ }
+ for (int i = 0; i < PERCENTILES.length; i++) {
+ addDoubleMetric(metrics, String.format("%s P(num_keys(node) <= x * mean) = %5.2f%% => x", prefix, PERCENTILES[i] * 100), percentiles[i]);
+ }
+ }
+
+ private void addDoubleMetric(Map<String, String> metrics, String name, double value) {
+ metrics.put(name, String.format("%7.3f", value));
+ }
+
+ private void addPercentageMetric(Map<String, String> metrics, String name, double value) {
+ metrics.put(name, String.format("%6.2f%%", value * 100));
+ }
+}
+
+/**
+ * We extend JGroupsAddress to make mapping an address to a node easier.
+ */
+class IndexedJGroupsAddress extends JGroupsAddress {
+ final int nodeIndex;
+
+ IndexedJGroupsAddress(org.jgroups.Address address, int nodeIndex) {
+ super(address);
+ this.nodeIndex = nodeIndex;
+ }
+}
View
8 core/src/test/java/org/infinispan/distribution/ch/SyncConsistentHashFactoryTest.java
@@ -42,22 +42,22 @@ protected ConsistentHashFactory createConsistentHashFactory() {
// in order to guarantee a better distribution, but I haven't done anything in that area yet.
@Override
protected int minPrimaryOwned(int numSegments, int numNodes) {
- return (int) (0.25 * super.minPrimaryOwned(numSegments, numNodes));
+ return (int) (0.4 * super.minPrimaryOwned(numSegments, numNodes));
}
@Override
protected int maxPrimaryOwned(int numSegments, int numNodes) {
- return (int) Math.ceil(3 * super.maxPrimaryOwned(numSegments, numNodes));
+ return (int) Math.ceil(2.5 * super.maxPrimaryOwned(numSegments, numNodes));
}
@Override
protected int minOwned(int numSegments, int numNodes, int actualNumOwners) {
- return (int) (0.25 * super.minOwned(numSegments, numNodes, actualNumOwners));
+ return (int) (0.4 * super.minOwned(numSegments, numNodes, actualNumOwners));
}
@Override
protected int maxOwned(int numSegments, int numNodes, int actualNumOwners) {
- return (int) Math.ceil(3 * super.maxOwned(numSegments, numNodes, actualNumOwners));
+ return (int) Math.ceil(2.5 * super.maxOwned(numSegments, numNodes, actualNumOwners));
}
@Override
View
174 ...ava/org/infinispan/distribution/topologyaware/TopologyAwareConsistentHashFactoryTest.java
@@ -26,6 +26,7 @@
import org.infinispan.distribution.TestTopologyAwareAddress;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.distribution.ch.DefaultConsistentHash;
+import org.infinispan.distribution.ch.OwnershipStatistics;
import org.infinispan.distribution.ch.TopologyAwareConsistentHashFactory;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.TopologyAwareAddress;
@@ -42,6 +43,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
/**
* @author Mircea.Markus@jboss.com
@@ -50,7 +52,6 @@
*/
@Test(groups = "unit", testName = "topologyaware.TopologyAwareConsistentHashFactoryTest")
public class TopologyAwareConsistentHashFactoryTest extends AbstractInfinispanTest {
-
private static final Log log = LogFactory.getLog(TopologyAwareConsistentHashFactoryTest.class);
private static final int CLUSTER_SIZE = 10;
public int numSegments = 100;
@@ -58,7 +59,7 @@
private TestTopologyAwareAddress[] testAddresses;
private List<Address> chMembers;
private ConsistentHashFactory<DefaultConsistentHash> chf;
- private DefaultConsistentHash ch;
+ protected DefaultConsistentHash ch;
@BeforeMethod()
public void setUp() {
@@ -156,9 +157,9 @@ public void testDifferentMachines2() {
}
public void testDifferentMachines3() {
- addNode(testAddresses[0], "primary", "primary", "primary");
- addNode(testAddresses[1], "primary", "primary", "primary");
- addNode(testAddresses[2], "secondary", "primary", "primary");
+ addNode(testAddresses[0], "m0", "r1", "s1");
+ addNode(testAddresses[1], "m1", "r1", "s1");
+ addNode(testAddresses[2], "m2", "r1", "s1");
assertAllLocationsWithRebalance(1);
assertAllLocationsWithRebalance(2);
@@ -293,21 +294,52 @@ public void testComplexScenario() {
assertAllLocationsWithRebalance(4);
}
+ public void testComplexScenario2() {
+ // {s0: {r0: {m0, m1, m2}, r1: {m3, m4, m5}, r1: {m6, m7, m8}}}
+ addNode(testAddresses[0], "m0", "r0", "s0");
+ addNode(testAddresses[1], "m1", "r0", "s0");
+ addNode(testAddresses[2], "m2", "r0", "s0");
+ addNode(testAddresses[3], "m3", "r1", "s0");
+ addNode(testAddresses[4], "m4", "r1", "s0");
+ addNode(testAddresses[5], "m5", "r1", "s0");
+ addNode(testAddresses[6], "m6", "r2", "s0");
+ addNode(testAddresses[7], "m7", "r2", "s0");
+ addNode(testAddresses[8], "m8", "r2", "s0");
+
+ assertAllLocationsWithRebalance(1);
+ assertAllLocationsWithRebalance(2);
+ }
+
private void assertAllLocationsWithRebalance(int numOwners) {
+ ch = chf.create(new MurmurHash3(), numOwners, numSegments, chMembers);
+ assertAllLocations(numOwners, chMembers);
+ assertDistribution(numOwners, chMembers);
+
ch = chf.create(new MurmurHash3(), numOwners, numSegments, chMembers.subList(0, 1));
- assertAllLocations(1, 1, 1, 1);
+ assertAllLocations(numOwners, chMembers.subList(0, 1));
- for (int i = 2; i < chMembers.size(); i++) {
+ for (int i = 2; i <= chMembers.size(); i++) {
List<Address> currentMembers = chMembers.subList(0, i);
- int expectedOwners = Math.min(numOwners, i);
- int expectedMachines = Math.min(expectedOwners, countMachines(currentMembers));
- int expectedRacks = Math.min(expectedOwners, countRacks(currentMembers));
- int expectedSites = Math.min(expectedOwners, countSites(currentMembers));
+ log.debugf("Created CH with numOwners %d, members %s", numOwners, currentMembers);
ch = chf.updateMembers(ch, currentMembers);
ch = chf.rebalance(ch);
- assertAllLocations(expectedOwners, expectedMachines, expectedRacks, expectedSites);
+
+ assertAllLocations(numOwners, currentMembers);
+ assertDistribution(numOwners, currentMembers);
+ }
+ }
+
+ protected void assertDistribution(int numOwners, List<Address> currentMembers) {
+ TopologyAwareOwnershipStatistics stats = new TopologyAwareOwnershipStatistics(ch);
+ log.tracef("Ownership stats: %s", stats);
+ int maxPrimarySegments = numSegments / currentMembers.size() + 1;
+ for (Address node : currentMembers) {
+ int maxSegments = stats.computeMaxSegments(numSegments, numOwners, node);
+ assertTrue(maxPrimarySegments - 1 <= stats.getPrimaryOwned(node), "Too few primary segments for node " + node);
+ assertTrue(stats.getPrimaryOwned(node) <= maxPrimarySegments, "Too many primary segments for node " + node);
+ assertTrue(maxSegments * 0.7 <= stats.getOwned(node), "Too few segments for node " + node);
+ assertTrue(stats.getOwned(node) <= maxSegments * 1.2, "Too many segments for node " + node);
}
- log.debugf("Created CH with members %s", chMembers);
}
private int countMachines(List<Address> addresses) {
@@ -337,7 +369,12 @@ private int countSites(List<Address> addresses) {
return sites.size();
}
- private void assertAllLocations(int expectedOwners, int expectedMachines, int expectedRacks, int expectedSites) {
+ private void assertAllLocations(int numOwners, List<Address> currentMembers) {
+ int expectedOwners = Math.min(numOwners, currentMembers.size());
+ int expectedMachines = Math.min(expectedOwners, countMachines(currentMembers));
+ int expectedRacks = Math.min(expectedOwners, countRacks(currentMembers));
+ int expectedSites = Math.min(expectedOwners, countSites(currentMembers));
+
for (int segment = 0; segment < numSegments; segment++) {
assertSegmentLocation(segment, expectedOwners, expectedMachines, expectedRacks, expectedSites);
}
@@ -355,9 +392,12 @@ public void testConsistencyWhenNodeLeaves() {
addNode(testAddresses[8], "m0", "r0", "s2");
addNode(testAddresses[9], "m0", "r0", "s0");
- updateConsistentHash(3);
+ int numOwners = 3;
+ updateConsistentHash(numOwners);
+ assertAllLocations(numOwners, chMembers);
+ assertDistribution(numOwners, chMembers);
- for (Address addr: chMembers) {
+ for (Address addr : chMembers) {
log.debugf("Removing node %s" + addr);
List<Address> addressCopy = new ArrayList<Address>(chMembers);
addressCopy.remove(addr);
@@ -368,9 +408,9 @@ public void testConsistencyWhenNodeLeaves() {
// generates extra moves trying to balance the CH.
AtomicInteger movedSegmentsCount = new AtomicInteger(0);
for (int segment = 0; segment < numSegments; segment++) {
- checkConsistency(segment, 3, ch.locateOwnersForSegment(segment), addr, newCH, movedSegmentsCount);
+ checkConsistency(segment, numOwners, ch.locateOwnersForSegment(segment), addr, newCH, movedSegmentsCount);
}
- assert movedSegmentsCount.get() < 5 :
+ assert movedSegmentsCount.get() <= numOwners * numSegments * 0.1 :
String.format("Too many moved segments after leave: %d. CH after leave is: %s\nPrevious: %s",
movedSegmentsCount.get(), newCH, ch);
}
@@ -432,6 +472,102 @@ private void addNode(TestTopologyAwareAddress address,
protected void updateConsistentHash(int numOwners) {
ch = chf.create(new MurmurHash3(), numOwners, numSegments, chMembers);
- log.debugf("Created CH with members %s", chMembers);
+ log.debugf("Created CH with numOwners %d, members %s", numOwners, chMembers);
+ }
+}
+
+
+class TopologyAwareOwnershipStatistics {
+ private final DefaultConsistentHash ch;
+ TopologyInfo topologyInfo;
+ OwnershipStatistics stats;
+
+ public TopologyAwareOwnershipStatistics(DefaultConsistentHash ch) {
+ this.ch = ch;
+ topologyInfo = new TopologyInfo(ch.getMembers());
+ stats = new OwnershipStatistics(ch, ch.getMembers());
+ }
+
+ public int getSiteOwned(String site) {
+ int count = 0;
+ for (Address node : topologyInfo.getSiteNodes(site)) {
+ count += stats.getOwned(node);
+ }
+ return count;
+ }
+
+ public int getSitePrimaryOwned(String site) {
+ int count = 0;
+ for (Address node : topologyInfo.getSiteNodes(site)) {
+ count += stats.getPrimaryOwned(node);
+ }
+ return count;
+ }
+
+ public int getRackOwned(String site, String rack) {
+ int count = 0;
+ for (Address node : topologyInfo.getRackNodes(site, rack)) {
+ count += stats.getOwned(node);
+ }
+ return count;
+ }
+
+ public int getRackPrimaryOwned(String site, String rack) {
+ int count = 0;
+ for (Address node : topologyInfo.getRackNodes(site, rack)) {
+ count += stats.getPrimaryOwned(node);
+ }
+ return count;
+ }
+
+ public int getMachineOwned(String site, String rack, String machine) {
+ int count = 0;
+ for (Address node : topologyInfo.getMachineNodes(site, rack, machine)) {
+ count += stats.getOwned(node);
+ }
+ return count;
+ }
+
+ public int getMachinePrimaryOwned(String site, String rack, String machine) {
+ int count = 0;
+ for (Address node : topologyInfo.getMachineNodes(site, rack, machine)) {
+ count += stats.getPrimaryOwned(node);
+ }
+ return count;
+ }
+
+ public int getOwned(Address node) {
+ return stats.getOwned(node);
+ }
+
+ public int getPrimaryOwned(Address node) {
+ return stats.getPrimaryOwned(node);
+ }
+
+ public int computeMaxSegments(int numSegments, int numOwners, Address node) {
+ return topologyInfo.computeMaxSegments(numSegments, numOwners, node);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("TopologyAwareOwnershipStatistics{\n");
+ for (String site : topologyInfo.getAllSites()) {
+ sb.append(String.format(" %s: %d/%d\n", site, getSitePrimaryOwned(site), getSiteOwned(site)));
+ for (String rack : topologyInfo.getSiteRacks(site)) {
+ sb.append(String.format(" %s: %d/%d\n", rack, getRackPrimaryOwned(site, rack),
+ getRackOwned(site, rack)));
+ for (String machine : topologyInfo.getRackMachines(site, rack)) {
+ sb.append(String.format(" %s: %d/%d\n", machine,
+ getMachinePrimaryOwned(site, rack, machine),
+ getMachineOwned(site, rack, machine)));
+ for (Address node : topologyInfo.getMachineNodes(site, rack, machine)) {
+ sb.append(String.format(" %s: %d/%d (%d)\n", node, stats.getPrimaryOwned(node),
+ stats.getOwned(node), topologyInfo.computeMaxSegments(ch.getNumSegments(), ch.getNumOwners(), node)));
+ }
+ }
+ }
+ }
+ sb.append('}');
+ return sb.toString();
}
}
View
25 ...org/infinispan/distribution/topologyaware/TopologyAwareSyncConsistentHashFactoryTest.java
@@ -22,11 +22,18 @@
*/
package org.infinispan.distribution.topologyaware;
+import java.util.List;
+
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.distribution.ch.DefaultConsistentHash;
import org.infinispan.distribution.ch.TopologyAwareSyncConsistentHashFactory;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertTrue;
+
/**
* @author Mircea.Markus@jboss.com
* @author Dan Berindei
@@ -35,6 +42,8 @@
@Test(groups = "unit", testName = "topologyaware.TopologyAwareSyncConsistentHashFactoryTest")
public class TopologyAwareSyncConsistentHashFactoryTest extends TopologyAwareConsistentHashFactoryTest {
+ private Log log = LogFactory.getLog(TopologyAwareSyncConsistentHashFactoryTest.class);
+
public TopologyAwareSyncConsistentHashFactoryTest() {
// Increase the number of segments to eliminate collisions (which would cause extra segment movements,
// causing testConsistencyAfterLeave to fail.)
@@ -45,4 +54,20 @@ public TopologyAwareSyncConsistentHashFactoryTest() {
protected ConsistentHashFactory<DefaultConsistentHash> createConsistentHashFactory() {
return new TopologyAwareSyncConsistentHashFactory();
}
+
+ @Override
+ protected void assertDistribution(int numOwners, List<Address> currentMembers) {
+ TopologyAwareOwnershipStatistics stats = new TopologyAwareOwnershipStatistics(ch);
+ log.tracef("Ownership stats: " + stats);
+ int maxPrimarySegments = numSegments / currentMembers.size() + 1;
+ for (Address node : currentMembers) {
+ int maxSegments = stats.computeMaxSegments(numSegments, numOwners, node);
+ log.tracef("Primary segments ratio: %f, total segments ratio: %f",
+ stats.getPrimaryOwned(node) / maxPrimarySegments, stats.getOwned(node) / maxSegments);
+ assertTrue(maxPrimarySegments * 0.4 <= stats.getPrimaryOwned(node));
+ assertTrue(stats.getPrimaryOwned(node) <= maxPrimarySegments * 2);
+ assertTrue(maxSegments * 0.4 <= stats.getOwned(node));
+ assertTrue(stats.getOwned(node) <= maxSegments * 2);
+ }
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.