diff --git a/src/java/voldemort/cluster/Zone.java b/src/java/voldemort/cluster/Zone.java index 047aa1f35a..874db6e876 100644 --- a/src/java/voldemort/cluster/Zone.java +++ b/src/java/voldemort/cluster/Zone.java @@ -18,7 +18,7 @@ public Zone(int zoneId, LinkedList proximityList) { } public Zone() { - this.zoneId = 0; + this.zoneId = DEFAULT_ZONE_ID; this.proximityList = new LinkedList(); } diff --git a/src/java/voldemort/routing/RoutingStrategy.java b/src/java/voldemort/routing/RoutingStrategy.java index 827d861657..07c7b9050e 100644 --- a/src/java/voldemort/routing/RoutingStrategy.java +++ b/src/java/voldemort/routing/RoutingStrategy.java @@ -49,6 +49,11 @@ public interface RoutingStrategy { /** * Get the partition list for the given key. * + * TODO: The naming of this method is confusing.. it is simply a wrapper + * around {@link RoutingStrategy#getReplicatingPartitionList(int)} that + * takes a key. So, would be good to rename this also as + * getReplicatingPartitionList + * * @param key The key the operation is operating on * @return The partition list for the given key */ diff --git a/src/java/voldemort/utils/StoreInstance.java b/src/java/voldemort/routing/StoreRoutingPlan.java similarity index 70% rename from src/java/voldemort/utils/StoreInstance.java rename to src/java/voldemort/routing/StoreRoutingPlan.java index 3207b58860..dc798c86db 100644 --- a/src/java/voldemort/utils/StoreInstance.java +++ b/src/java/voldemort/routing/StoreRoutingPlan.java @@ -14,7 +14,7 @@ * the License. */ -package voldemort.utils; +package voldemort.routing; import java.util.ArrayList; import java.util.HashMap; @@ -24,9 +24,13 @@ import voldemort.VoldemortException; import voldemort.cluster.Cluster; -import voldemort.routing.RoutingStrategyFactory; -import voldemort.routing.RoutingStrategyType; +import voldemort.cluster.Node; import voldemort.store.StoreDefinition; +import voldemort.utils.ByteUtils; +import voldemort.utils.ClusterUtils; +import voldemort.utils.NodeUtils; +import voldemort.utils.Pair; +import voldemort.utils.Utils; import com.google.common.collect.Lists; @@ -34,25 +38,23 @@ /** * This class wraps up a Cluster object and a StoreDefinition. The methods are - * effectively helper or util style methods for analyzing partitions and so on - * which are a function of both Cluster and StoreDefinition. + * effectively helper or util style methods for querying the routing plan that + * will be generated for a given routing strategy upon store and cluster + * topology information. */ -public class StoreInstance { - - // TODO: (refactor) Improve upon the name "StoreInstance". Object-oriented - // meaning of 'instance' is too easily confused with system notion of an - // "instance of a cluster" (the intended usage in this class name). +public class StoreRoutingPlan { private final Cluster cluster; private final StoreDefinition storeDefinition; - private final Map partitionIdToNodeIdMap; + private final RoutingStrategy routingStrategy; - public StoreInstance(Cluster cluster, StoreDefinition storeDefinition) { + public StoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) { this.cluster = cluster; this.storeDefinition = storeDefinition; - - partitionIdToNodeIdMap = ClusterUtils.getCurrentPartitionMapping(cluster); + this.partitionIdToNodeIdMap = ClusterUtils.getCurrentPartitionMapping(cluster); + this.routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, + cluster); } public Cluster getCluster() { @@ -69,29 +71,28 @@ public StoreDefinition getStoreDefinition() { * @param masterPartitionId * @return List of partition IDs that replicate the master partition ID. */ - public List getReplicationPartitionList(int masterPartitionId) { - return new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, cluster) - .getReplicatingPartitionList(masterPartitionId); + public List getReplicatingPartitionList(int masterPartitionId) { + return this.routingStrategy.getReplicatingPartitionList(masterPartitionId); } /** * Determines list of partition IDs that replicate the key. * * @param key - * @return List of partition IDs that replicate the partition ID. + * @return List of partition IDs that replicate the given key */ - public List getReplicationPartitionList(final byte[] key) { - return getReplicationPartitionList(getMasterPartitionId(key)); + public List getReplicatingPartitionList(final byte[] key) { + return this.routingStrategy.getPartitionList(key); } /** * Determines the list of nodes that the key replicates to * * @param key - * @return + * @return list of nodes that key replicates to */ public List getReplicationNodeList(final byte[] key) { - return getNodeIdListForPartitionIdList(getReplicationPartitionList(key)); + return NodeUtils.getNodeIds(this.routingStrategy.routeRequest(key)); } /** @@ -101,8 +102,7 @@ public List getReplicationNodeList(final byte[] key) { * @return */ public int getMasterPartitionId(final byte[] key) { - return new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, cluster) - .getMasterPartition(key); + return this.routingStrategy.getMasterPartition(key); } /** @@ -123,8 +123,11 @@ public int getNodeIdForPartitionId(int partitionId) { * @return partitionId if found, otherwise null. */ public Integer getNodesPartitionIdForKey(int nodeId, final byte[] key) { - List partitionIds = getReplicationPartitionList(key); + // this is all the partitions the key replicates to. + List partitionIds = getReplicatingPartitionList(key); for(Integer partitionId: partitionIds) { + // check which of the replicating partitions belongs to the node in + // question if(getNodeIdForPartitionId(partitionId) == nodeId) { return partitionId; } @@ -157,8 +160,88 @@ private List getNodeIdListForPartitionIdList(List partitionIds return nodeIds; } + /** + * Returns the list of node ids this partition replicates to. + * + * TODO ideally the {@link RoutingStrategy} should house a routeRequest(int + * partition) method + * + * @param partitionId + * @return + * @throws VoldemortException + */ public List getReplicationNodeList(int partitionId) throws VoldemortException { - return getNodeIdListForPartitionIdList(getReplicationPartitionList(partitionId)); + return getNodeIdListForPartitionIdList(getReplicatingPartitionList(partitionId)); + } + + /** + * Given a key that belong to a given node, returns a number n (< zone + * replication factor), such that the given node holds the key as the nth + * replica of the given zone + * + * eg: if the method returns 1, then given node hosts the key as the zone + * secondary in the given zone + * + * @param zoneId + * @param nodeId + * @param key + * @return + */ + public int getZoneReplicaType(int zoneId, int nodeId, byte[] key) { + List replicatingNodes = this.routingStrategy.routeRequest(key); + int zoneReplicaType = -1; + for(Node node: replicatingNodes) { + // bump up the replica number once you encounter a node in the given + // zone + if(node.getZoneId() == zoneId) { + zoneReplicaType++; + } + // we are done when we find the given node + if(node.getId() == nodeId) { + return zoneReplicaType; + } + } + if(zoneReplicaType > 0) { + throw new VoldemortException("Node " + nodeId + " not a replica for the key " + + ByteUtils.toHexString(key) + " in given zone " + zoneId); + } else { + throw new VoldemortException("Could not find any replicas for the key " + + ByteUtils.toHexString(key) + " in given zone " + zoneId); + } + } + + /** + * Given a key and a replica type n (< zone replication factor), figure out + * the node that contains the key as the nth replica in the given zone. + * + * @param zoneId + * @param zoneReplicaType + * @param key + * @return + */ + public int getZoneReplicaNode(int zoneId, int zoneReplicaType, byte[] key) { + List replicatingNodes = this.routingStrategy.routeRequest(key); + int zoneReplicaTypeCounter = -1; + for(Node node: replicatingNodes) { + // bump up the counter if we encounter a replica in the given zone + if(node.getZoneId() == zoneId) { + zoneReplicaTypeCounter++; + } + // when the counter matches up with the replicaNumber we need, we + // are done. + if(zoneReplicaTypeCounter == zoneReplicaType) { + return node.getId(); + } + } + if(zoneReplicaTypeCounter == 0) { + throw new VoldemortException("Could not find any replicas for the key " + + ByteUtils.toHexString(key) + " in given zone " + zoneId); + } else { + throw new VoldemortException("Could not find " + zoneReplicaType + + " replicas for the key " + ByteUtils.toHexString(key) + + " in given zone " + zoneId + ". Only found " + + zoneReplicaTypeCounter); + } } // TODO: (refactor) Move from static methods to non-static methods that use @@ -222,9 +305,9 @@ public static boolean checkKeyBelongsToPartition(int nodeId, cluster) .getPartitionList(key); List nodePartitions = cluster.getNodeById(nodeId).getPartitionIds(); - checkResult = StoreInstance.checkKeyBelongsToPartition(keyPartitions, - nodePartitions, - replicaToPartitionList); + checkResult = StoreRoutingPlan.checkKeyBelongsToPartition(keyPartitions, + nodePartitions, + replicaToPartitionList); } return checkResult; } @@ -276,9 +359,9 @@ public static List checkKeyBelongsToPartition(byte[] key, for(Pair>> stealNodeToMap: stealerNodeToMappingTuples) { List nodePartitions = cluster.getNodeById(stealNodeToMap.getFirst()) .getPartitionIds(); - if(StoreInstance.checkKeyBelongsToPartition(keyPartitions, - nodePartitions, - stealNodeToMap.getSecond())) { + if(StoreRoutingPlan.checkKeyBelongsToPartition(keyPartitions, + nodePartitions, + stealNodeToMap.getSecond())) { nodesToPush.add(stealNodeToMap.getFirst()); } } diff --git a/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java b/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java index 109afc6276..f7e6e4f0eb 100644 --- a/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java @@ -41,6 +41,7 @@ import voldemort.client.rebalance.RebalancePartitionsInfo; import voldemort.cluster.Cluster; import voldemort.common.nio.ByteBufferBackedInputStream; +import voldemort.routing.StoreRoutingPlan; import voldemort.server.StoreRepository; import voldemort.server.VoldemortConfig; import voldemort.server.protocol.RequestHandler; @@ -68,7 +69,6 @@ import voldemort.utils.Pair; import voldemort.utils.RebalanceUtils; import voldemort.utils.ReflectUtils; -import voldemort.utils.StoreInstance; import voldemort.utils.Utils; import voldemort.versioning.ObsoleteVersionException; import voldemort.versioning.VectorClock; @@ -1088,7 +1088,7 @@ public VAdminProto.DeletePartitionEntriesResponse handleDeletePartitionEntries(V ByteArray key = entry.getFirst(); Versioned value = entry.getSecond(); throttler.maybeThrottle(key.length() + valueSize(value)); - if(StoreInstance.checkKeyBelongsToPartition(metadataStore.getNodeId(), + if(StoreRoutingPlan.checkKeyBelongsToPartition(metadataStore.getNodeId(), key.get(), replicaToPartitionList, request.hasInitialCluster() ? new ClusterMapper().readCluster(new StringReader(request.getInitialCluster())) diff --git a/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java index 4bb19ce66b..41e6b8651d 100644 --- a/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java @@ -30,6 +30,7 @@ import voldemort.client.protocol.pb.ProtoUtils; import voldemort.client.protocol.pb.VAdminProto; import voldemort.cluster.Cluster; +import voldemort.routing.StoreRoutingPlan; import voldemort.server.StoreRepository; import voldemort.server.VoldemortConfig; import voldemort.server.protocol.StreamRequestHandler; @@ -42,7 +43,6 @@ import voldemort.utils.ByteArray; import voldemort.utils.EventThrottler; import voldemort.utils.NetworkClassLoader; -import voldemort.utils.StoreInstance; import voldemort.utils.Time; import voldemort.xml.ClusterMapper; @@ -90,7 +90,7 @@ public abstract class FetchStreamRequestHandler implements StreamRequestHandler protected boolean fetchOrphaned; - protected final StoreInstance storeInstance; + protected final StoreRoutingPlan storeInstance; protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest request, MetadataStore metadataStore, @@ -119,7 +119,7 @@ protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest req } else { this.initialCluster = metadataStore.getCluster(); } - this.storeInstance = new StoreInstance(this.initialCluster, this.storeDef); + this.storeInstance = new StoreRoutingPlan(this.initialCluster, this.storeDef); this.throttler = new EventThrottler(voldemortConfig.getStreamMaxReadBytesPerSec()); if(request.hasFilter()) { diff --git a/src/java/voldemort/server/protocol/admin/FullScanFetchStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FullScanFetchStreamRequestHandler.java index 7d63e1baab..0fbbbf77d2 100644 --- a/src/java/voldemort/server/protocol/admin/FullScanFetchStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FullScanFetchStreamRequestHandler.java @@ -23,6 +23,7 @@ import java.util.Set; import voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest; +import voldemort.routing.StoreRoutingPlan; import voldemort.server.StoreRepository; import voldemort.server.VoldemortConfig; import voldemort.store.ErrorCodeMapper; @@ -31,7 +32,6 @@ import voldemort.utils.ByteArray; import voldemort.utils.ClosableIterator; import voldemort.utils.NetworkClassLoader; -import voldemort.utils.StoreInstance; import voldemort.utils.Utils; /** @@ -102,7 +102,7 @@ private Integer getKeyPartitionId(byte[] key) { * @return true iff key is needed. */ protected boolean isKeyNeeded(byte[] key) { - if(!StoreInstance.checkKeyBelongsToPartition(nodeId, + if(!StoreRoutingPlan.checkKeyBelongsToPartition(nodeId, key, replicaToPartitionList, initialCluster, @@ -134,7 +134,7 @@ protected boolean isItemAccepted(byte[] key) { entryAccepted = true; } } else { - if(!StoreInstance.checkKeyBelongsToNode(key, nodeId, initialCluster, storeDef)) { + if(!StoreRoutingPlan.checkKeyBelongsToNode(key, nodeId, initialCluster, storeDef)) { entryAccepted = true; } } diff --git a/src/java/voldemort/server/protocol/admin/PartitionScanFetchEntriesRequestHandler.java b/src/java/voldemort/server/protocol/admin/PartitionScanFetchEntriesRequestHandler.java index 0a1ff69d7b..b496f51574 100644 --- a/src/java/voldemort/server/protocol/admin/PartitionScanFetchEntriesRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/PartitionScanFetchEntriesRequestHandler.java @@ -23,6 +23,7 @@ import voldemort.client.protocol.pb.ProtoUtils; import voldemort.client.protocol.pb.VAdminProto; import voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest; +import voldemort.routing.StoreRoutingPlan; import voldemort.server.StoreRepository; import voldemort.server.VoldemortConfig; import voldemort.store.ErrorCodeMapper; @@ -32,7 +33,6 @@ import voldemort.utils.ClosableIterator; import voldemort.utils.NetworkClassLoader; import voldemort.utils.Pair; -import voldemort.utils.StoreInstance; import voldemort.versioning.Versioned; import com.google.protobuf.Message; @@ -86,7 +86,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, // Check the current node contains the partition as the // requested replicatype if(!fetchedPartitions.contains(currentPartition) - && StoreInstance.checkPartitionBelongsToNode(currentPartition, + && StoreRoutingPlan.checkPartitionBelongsToNode(currentPartition, currentReplicaType, nodeId, initialCluster, diff --git a/src/java/voldemort/server/protocol/admin/PartitionScanFetchKeysRequestHandler.java b/src/java/voldemort/server/protocol/admin/PartitionScanFetchKeysRequestHandler.java index 351335b4fc..afdd7f789f 100644 --- a/src/java/voldemort/server/protocol/admin/PartitionScanFetchKeysRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/PartitionScanFetchKeysRequestHandler.java @@ -23,6 +23,7 @@ import voldemort.client.protocol.pb.ProtoUtils; import voldemort.client.protocol.pb.VAdminProto; import voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest; +import voldemort.routing.StoreRoutingPlan; import voldemort.server.StoreRepository; import voldemort.server.VoldemortConfig; import voldemort.store.ErrorCodeMapper; @@ -31,7 +32,6 @@ import voldemort.utils.ByteArray; import voldemort.utils.ClosableIterator; import voldemort.utils.NetworkClassLoader; -import voldemort.utils.StoreInstance; import com.google.protobuf.Message; @@ -84,7 +84,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, // Check the current node contains the partition as the // requested replicatype if(!fetchedPartitions.contains(currentPartition) - && StoreInstance.checkPartitionBelongsToNode(currentPartition, + && StoreRoutingPlan.checkPartitionBelongsToNode(currentPartition, currentReplicaType, nodeId, initialCluster, diff --git a/src/java/voldemort/server/rebalance/RebalancerState.java b/src/java/voldemort/server/rebalance/RebalancerState.java index 6eb62d2ea7..5aa5588eb1 100644 --- a/src/java/voldemort/server/rebalance/RebalancerState.java +++ b/src/java/voldemort/server/rebalance/RebalancerState.java @@ -23,10 +23,10 @@ import java.util.Map; import voldemort.client.rebalance.RebalancePartitionsInfo; +import voldemort.routing.StoreRoutingPlan; import voldemort.serialization.json.JsonReader; import voldemort.serialization.json.JsonWriter; import voldemort.store.metadata.MetadataStore; -import voldemort.utils.StoreInstance; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -103,7 +103,7 @@ public RebalancePartitionsInfo find(String storeName, // If yes, check if the key belongs to one of the partitions // being moved - if(StoreInstance.checkKeyBelongsToPartition(keyPartitions, + if(StoreRoutingPlan.checkKeyBelongsToPartition(keyPartitions, nodePartitions, info.getReplicaToAddPartitionList(storeName))) { return info; diff --git a/src/java/voldemort/server/rebalance/async/DonorBasedRebalanceAsyncOperation.java b/src/java/voldemort/server/rebalance/async/DonorBasedRebalanceAsyncOperation.java index 3d94150ce7..c29be1396a 100644 --- a/src/java/voldemort/server/rebalance/async/DonorBasedRebalanceAsyncOperation.java +++ b/src/java/voldemort/server/rebalance/async/DonorBasedRebalanceAsyncOperation.java @@ -38,6 +38,7 @@ import voldemort.client.protocol.admin.AdminClient; import voldemort.client.rebalance.RebalancePartitionsInfo; import voldemort.cluster.Cluster; +import voldemort.routing.StoreRoutingPlan; import voldemort.server.StoreRepository; import voldemort.server.VoldemortConfig; import voldemort.server.rebalance.Rebalancer; @@ -51,7 +52,6 @@ import voldemort.utils.ClosableIterator; import voldemort.utils.Pair; import voldemort.utils.RebalanceUtils; -import voldemort.utils.StoreInstance; import voldemort.versioning.Versioned; import com.google.common.collect.HashMultimap; @@ -330,7 +330,7 @@ private void fetchEntriesForStealers(StorageEngine st while(running.get() && keys.hasNext()) { ByteArray key = keys.next(); scanned++; - List nodeIds = StoreInstance.checkKeyBelongsToPartition(key.get(), + List nodeIds = StoreRoutingPlan.checkKeyBelongsToPartition(key.get(), optimizedStealerNodeToMappingTuples, targetCluster, storeDef); @@ -378,7 +378,7 @@ private void fetchEntriesForStealersPartitionScan(StorageEngine value = entry.getSecond(); scanned++; - List nodeIds = StoreInstance.checkKeyBelongsToPartition(key.get(), + List nodeIds = StoreRoutingPlan.checkKeyBelongsToPartition(key.get(), optimizedStealerNodeToMappingTuples, targetCluster, storeDef); diff --git a/src/java/voldemort/utils/AbstractConsistencyFixer.java b/src/java/voldemort/utils/AbstractConsistencyFixer.java index c13dc1529a..d0cc8a7c3f 100644 --- a/src/java/voldemort/utils/AbstractConsistencyFixer.java +++ b/src/java/voldemort/utils/AbstractConsistencyFixer.java @@ -26,6 +26,7 @@ import voldemort.VoldemortException; import voldemort.client.protocol.admin.AdminClient; import voldemort.client.protocol.admin.QueryKeyResult; +import voldemort.routing.StoreRoutingPlan; import voldemort.store.routed.NodeValue; import voldemort.store.routed.ReadRepairer; import voldemort.utils.ConsistencyFix.BadKey; @@ -51,7 +52,7 @@ abstract class AbstractConsistencyFixer { private static final int fakeNodeID = Integer.MIN_VALUE; protected final BadKey badKey; - protected final StoreInstance storeInstance; + protected final StoreRoutingPlan storeInstance; protected final AdminClient adminClient; protected final QueryKeyResult orphanedValues; @@ -62,7 +63,7 @@ abstract class AbstractConsistencyFixer { * @param consistencyFix * @param badKeyQOut */ - AbstractConsistencyFixer(BadKey badKey, StoreInstance storeInstance, AdminClient adminClient) { + AbstractConsistencyFixer(BadKey badKey, StoreRoutingPlan storeInstance, AdminClient adminClient) { this(badKey, storeInstance, adminClient, null); } @@ -77,7 +78,7 @@ abstract class AbstractConsistencyFixer { * @param orphanedValues Set to null if no orphaned values to be included. */ AbstractConsistencyFixer(BadKey badKey, - StoreInstance storeInstance, + StoreRoutingPlan storeInstance, AdminClient adminClient, QueryKeyResult orphanedValues) { this.badKey = badKey; diff --git a/src/java/voldemort/utils/ClusterForkLiftTool.java b/src/java/voldemort/utils/ClusterForkLiftTool.java index 8f1d584085..cce77a1203 100644 --- a/src/java/voldemort/utils/ClusterForkLiftTool.java +++ b/src/java/voldemort/utils/ClusterForkLiftTool.java @@ -28,6 +28,7 @@ import voldemort.client.protocol.admin.StreamingClientConfig; import voldemort.cluster.Cluster; import voldemort.cluster.Node; +import voldemort.routing.StoreRoutingPlan; import voldemort.store.StoreDefinition; import voldemort.store.StoreUtils; import voldemort.versioning.ChainedResolver; @@ -223,10 +224,10 @@ abstract class SinglePartitionForkLiftTask { protected int partitionId; protected CountDownLatch latch; - protected StoreInstance storeInstance; + protected StoreRoutingPlan storeInstance; protected String workName; - SinglePartitionForkLiftTask(StoreInstance storeInstance, + SinglePartitionForkLiftTask(StoreRoutingPlan storeInstance, int partitionId, CountDownLatch latch) { this.partitionId = partitionId; @@ -249,7 +250,7 @@ abstract class SinglePartitionForkLiftTask { class SinglePartitionGloballyResolvingForkLiftTask extends SinglePartitionForkLiftTask implements Runnable { - SinglePartitionGloballyResolvingForkLiftTask(StoreInstance storeInstance, + SinglePartitionGloballyResolvingForkLiftTask(StoreRoutingPlan storeInstance, int partitionId, CountDownLatch latch) { super(storeInstance, partitionId, latch); @@ -350,7 +351,7 @@ private Map doReads(final List nodeIdList, class SinglePartitionPrimaryResolvingForkLiftTask extends SinglePartitionForkLiftTask implements Runnable { - SinglePartitionPrimaryResolvingForkLiftTask(StoreInstance storeInstance, + SinglePartitionPrimaryResolvingForkLiftTask(StoreRoutingPlan storeInstance, int partitionId, CountDownLatch latch) { super(storeInstance, partitionId, latch); @@ -434,7 +435,7 @@ public void run() { class SinglePartitionNoResolutionForkLiftTask extends SinglePartitionForkLiftTask implements Runnable { - SinglePartitionNoResolutionForkLiftTask(StoreInstance storeInstance, + SinglePartitionNoResolutionForkLiftTask(StoreRoutingPlan storeInstance, int partitionId, CountDownLatch latch) { super(storeInstance, partitionId, latch); @@ -501,8 +502,8 @@ public Object call() throws Exception { }, true); final CountDownLatch latch = new CountDownLatch(srcCluster.getNumberOfPartitions()); - StoreInstance storeInstance = new StoreInstance(srcCluster, - srcStoreDefMap.get(store)); + StoreRoutingPlan storeInstance = new StoreRoutingPlan(srcCluster, + srcStoreDefMap.get(store)); // submit work on every partition that is to be forklifted for(Integer partitionId: partitionList) { diff --git a/src/java/voldemort/utils/ClusterInstance.java b/src/java/voldemort/utils/ClusterInstance.java index c2808250e8..b9500d922c 100644 --- a/src/java/voldemort/utils/ClusterInstance.java +++ b/src/java/voldemort/utils/ClusterInstance.java @@ -23,6 +23,7 @@ import java.util.Set; import voldemort.cluster.Cluster; +import voldemort.routing.StoreRoutingPlan; import voldemort.store.StoreDefinition; import com.google.common.collect.Maps; @@ -146,7 +147,7 @@ public Pair analyzeBalanceVerbose() { } for(StoreDefinition storeDefinition: uniqueStores.keySet()) { - StoreInstance storeInstance = new StoreInstance(cluster, storeDefinition); + StoreRoutingPlan storeInstance = new StoreRoutingPlan(cluster, storeDefinition); builder.append("\n"); builder.append("Store exemplar: " + storeDefinition.getName() + "\n"); diff --git a/src/java/voldemort/utils/ConsistencyFix.java b/src/java/voldemort/utils/ConsistencyFix.java index 9615b0cdbc..ba8bdebcf5 100644 --- a/src/java/voldemort/utils/ConsistencyFix.java +++ b/src/java/voldemort/utils/ConsistencyFix.java @@ -44,6 +44,7 @@ import voldemort.client.protocol.admin.AdminClientConfig; import voldemort.client.protocol.admin.QueryKeyResult; import voldemort.cluster.Cluster; +import voldemort.routing.StoreRoutingPlan; import voldemort.store.StoreDefinition; import voldemort.versioning.ClockEntry; import voldemort.versioning.VectorClock; @@ -58,7 +59,7 @@ public class ConsistencyFix { private final String storeName; private final AdminClient adminClient; - private final StoreInstance storeInstance; + private final StoreRoutingPlan storeInstance; private final Stats stats; private final long perServerQPSLimit; private final ConcurrentMap putThrottlers; @@ -83,7 +84,7 @@ public class ConsistencyFix { storeName); logger.info("Store definition for store " + storeName + " has been determined."); - storeInstance = new StoreInstance(cluster, storeDefinition); + storeInstance = new StoreRoutingPlan(cluster, storeDefinition); stats = new Stats(progressBar); @@ -97,7 +98,7 @@ public String getStoreName() { return storeName; } - public StoreInstance getStoreInstance() { + public StoreRoutingPlan getStoreInstance() { return storeInstance; } diff --git a/src/java/voldemort/utils/KeyVersionFetcherCLI.java b/src/java/voldemort/utils/KeyVersionFetcherCLI.java index a9b608a4e7..5e587fd610 100644 --- a/src/java/voldemort/utils/KeyVersionFetcherCLI.java +++ b/src/java/voldemort/utils/KeyVersionFetcherCLI.java @@ -45,6 +45,7 @@ import voldemort.client.protocol.admin.AdminClient; import voldemort.client.protocol.admin.AdminClientConfig; import voldemort.cluster.Cluster; +import voldemort.routing.StoreRoutingPlan; import voldemort.store.StoreDefinition; import voldemort.versioning.Versioned; @@ -149,10 +150,10 @@ public void updateFetchProgress() { public class KeyVersionFetcher implements Callable { - private final StoreInstance storeInstance; + private final StoreRoutingPlan storeInstance; private final byte[] key; - KeyVersionFetcher(StoreInstance storeInstance, byte[] key) { + KeyVersionFetcher(StoreRoutingPlan storeInstance, byte[] key) { this.storeInstance = storeInstance; this.key = key; } @@ -199,7 +200,7 @@ public boolean sampleStore(StoreDefinition storeDefinition) { return true; } - StoreInstance storeInstance = new StoreInstance(cluster, storeDefinition); + StoreRoutingPlan storeInstance = new StoreRoutingPlan(cluster, storeDefinition); BufferedReader keyReader = null; BufferedWriter kvWriter = null; try { diff --git a/test/common/voldemort/ServerTestUtils.java b/test/common/voldemort/ServerTestUtils.java index dc9330b441..134109d5e3 100644 --- a/test/common/voldemort/ServerTestUtils.java +++ b/test/common/voldemort/ServerTestUtils.java @@ -302,6 +302,17 @@ public static Cluster getLocalCluster(int numberOfNodes, int[] ports, int[][] pa return new Cluster("test-cluster", nodes); } + public static Cluster getLocalZonedCluster(int numberOfNodes, + int numberOfZones, + int[] nodeToZoneMapping, + int[][] partitionMapping) { + return getLocalZonedCluster(numberOfNodes, + numberOfZones, + nodeToZoneMapping, + partitionMapping, + findFreePorts(3 * numberOfNodes)); + } + /** * Returns a cluster with numberOfNodes nodes in numberOfZones * zones. It is important that numberOfNodes be divisible by @@ -315,7 +326,8 @@ public static Cluster getLocalCluster(int numberOfNodes, int[] ports, int[][] pa public static Cluster getLocalZonedCluster(int numberOfNodes, int numberOfZones, int[] nodeToZoneMapping, - int[][] partitionMapping) { + int[][] partitionMapping, + int[] ports) { if(numberOfZones > 0 && numberOfNodes > 0 && numberOfNodes % numberOfZones != 0) { throw new VoldemortException("The number of nodes (" + numberOfNodes @@ -323,8 +335,6 @@ public static Cluster getLocalZonedCluster(int numberOfNodes, + numberOfZones + ")"); } - int[] ports = findFreePorts(3 * numberOfNodes); - List nodes = new ArrayList(); for(int i = 0; i < numberOfNodes; i++) { diff --git a/test/common/voldemort/TestUtils.java b/test/common/voldemort/TestUtils.java index 997d9611ac..f6148c15e0 100644 --- a/test/common/voldemort/TestUtils.java +++ b/test/common/voldemort/TestUtils.java @@ -25,8 +25,11 @@ import java.util.Calendar; import java.util.Collections; import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -35,6 +38,7 @@ import voldemort.cluster.Node; import voldemort.routing.RoutingStrategy; import voldemort.routing.RoutingStrategyFactory; +import voldemort.routing.StoreRoutingPlan; import voldemort.store.Store; import voldemort.store.StoreDefinition; import voldemort.utils.ByteArray; @@ -310,6 +314,39 @@ public static int getMissingPartitionsSize(Cluster orig, Cluster updated) { return diffPartition; } + /** + * Given a StoreRoutingPlan generates upto numKeysPerPartition keys per + * partition + * + * @param numKeysPerPartition + * @return a hashmap of partition to list of keys generated + */ + public static HashMap> createPartitionsKeys(StoreRoutingPlan routingPlan, + int numKeysPerPartition) { + HashMap> partitionToKeyList = new HashMap>(); + Set partitionsPending = new HashSet(routingPlan.getCluster() + .getNumberOfPartitions()); + for(int partition = 0; partition < routingPlan.getCluster().getNumberOfPartitions(); partition++) { + partitionsPending.add(partition); + partitionToKeyList.put(partition, new ArrayList(numKeysPerPartition)); + } + + for(int key = 0;; key++) { + byte[] keyBytes = ("key" + key).getBytes(); + int partition = routingPlan.getMasterPartitionId(keyBytes); + if(partitionToKeyList.get(partition).size() < numKeysPerPartition) { + partitionToKeyList.get(partition).add(keyBytes); + if(partitionToKeyList.get(partition).size() == numKeysPerPartition) { + partitionsPending.remove(partition); + } + } + if(partitionsPending.size() == 0) { + break; + } + } + return partitionToKeyList; + } + /** * Always uses UTF-8. */ diff --git a/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java index beef7000ad..3665173147 100644 --- a/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java @@ -56,6 +56,7 @@ import voldemort.routing.RoutingStrategy; import voldemort.routing.RoutingStrategyFactory; import voldemort.routing.RoutingStrategyType; +import voldemort.routing.StoreRoutingPlan; import voldemort.serialization.SerializerDefinition; import voldemort.serialization.json.JsonReader; import voldemort.server.VoldemortServer; @@ -75,7 +76,6 @@ import voldemort.utils.ByteArray; import voldemort.utils.ByteUtils; import voldemort.utils.RebalanceUtils; -import voldemort.utils.StoreInstance; import voldemort.versioning.ClockEntry; import voldemort.versioning.ObsoleteVersionException; import voldemort.versioning.VectorClock; @@ -1280,7 +1280,7 @@ protected void populateData(Cluster cluster, } - StoreInstance storeInstance = new StoreInstance(cluster, storeDef); + StoreRoutingPlan storeInstance = new StoreRoutingPlan(cluster, storeDef); for(Entry entry: testEntries.entrySet()) { ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(entry.getKey(), "UTF-8")); List preferenceNodes = storeInstance.getReplicationNodeList(keyBytes.get()); diff --git a/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java index ddad6a08f7..d2763de104 100644 --- a/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java @@ -37,6 +37,7 @@ import voldemort.cluster.Node; import voldemort.routing.RoutingStrategy; import voldemort.routing.RoutingStrategyFactory; +import voldemort.routing.StoreRoutingPlan; import voldemort.server.VoldemortConfig; import voldemort.server.VoldemortServer; import voldemort.store.Store; @@ -49,7 +50,6 @@ import voldemort.utils.ByteUtils; import voldemort.utils.Pair; import voldemort.utils.RebalanceUtils; -import voldemort.utils.StoreInstance; import voldemort.utils.Utils; import voldemort.versioning.VectorClock; import voldemort.versioning.Versioned; @@ -240,7 +240,7 @@ protected void checkGetEntries(Node node, List partitions = routing.getPartitionList(keyBytes.get()); - if(StoreInstance.checkKeyBelongsToPartition(partitions, + if(StoreRoutingPlan.checkKeyBelongsToPartition(partitions, node.getPartitionIds(), flattenedPresentTuples)) { List> values = store.get(keyBytes, null); diff --git a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java index dc030af10d..eb7664912c 100644 --- a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java @@ -55,6 +55,7 @@ import voldemort.routing.RoutingStrategy; import voldemort.routing.RoutingStrategyFactory; import voldemort.routing.RoutingStrategyType; +import voldemort.routing.StoreRoutingPlan; import voldemort.serialization.SerializerDefinition; import voldemort.server.VoldemortServer; import voldemort.store.InvalidMetadataException; @@ -68,7 +69,6 @@ import voldemort.utils.ByteArray; import voldemort.utils.ByteUtils; import voldemort.utils.RebalanceUtils; -import voldemort.utils.StoreInstance; import voldemort.versioning.ClockEntry; import voldemort.versioning.ObsoleteVersionException; import voldemort.versioning.VectorClock; @@ -807,7 +807,7 @@ protected void populateData(Cluster cluster, StoreDefinition storeDef) throws Ex RoutingStrategy routing = new RoutingStrategyFactory().updateRoutingStrategy(storeDef, cluster); - StoreInstance storeInstance = new StoreInstance(cluster, storeDef); + StoreRoutingPlan storeInstance = new StoreRoutingPlan(cluster, storeDef); for(Entry entry: testEntries.entrySet()) { ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(entry.getKey(), "UTF-8")); List preferenceNodes = storeInstance.getReplicationNodeList(keyBytes.get()); diff --git a/test/unit/voldemort/routing/StoreRoutingPlanTest.java b/test/unit/voldemort/routing/StoreRoutingPlanTest.java new file mode 100644 index 0000000000..2d35d99bc4 --- /dev/null +++ b/test/unit/voldemort/routing/StoreRoutingPlanTest.java @@ -0,0 +1,163 @@ +package voldemort.routing; + +import static org.junit.Assert.assertEquals; + +import java.util.HashMap; +import java.util.List; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import voldemort.ServerTestUtils; +import voldemort.TestUtils; +import voldemort.client.RoutingTier; +import voldemort.cluster.Cluster; +import voldemort.cluster.Zone; +import voldemort.serialization.SerializerDefinition; +import voldemort.store.StoreDefinition; +import voldemort.store.StoreDefinitionBuilder; +import voldemort.store.bdb.BdbStorageConfiguration; +import voldemort.store.slop.strategy.HintedHandoffStrategyType; + +import com.google.common.collect.Lists; + +public class StoreRoutingPlanTest { + + StoreRoutingPlan zonedRoutingPlan; + StoreRoutingPlan nonZonedRoutingPlan; + + public StoreRoutingPlanTest() {} + + @Before + public void setup() { + Cluster nonZonedCluster = ServerTestUtils.getLocalCluster(3, new int[] { 1000, 2000, 3000, + 1000, 2000, 3000, 1000, 2000, 3000 }, new int[][] { { 0 }, { 1, 3 }, { 2 } }); + StoreDefinition nonZoned211StoreDef = new StoreDefinitionBuilder().setName("non-zoned") + .setType(BdbStorageConfiguration.TYPE_NAME) + .setKeySerializer(new SerializerDefinition("string")) + .setValueSerializer(new SerializerDefinition("string")) + .setRoutingPolicy(RoutingTier.CLIENT) + .setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY) + .setReplicationFactor(2) + .setPreferredReads(1) + .setRequiredReads(1) + .setPreferredWrites(1) + .setRequiredWrites(1) + .build(); + nonZonedRoutingPlan = new StoreRoutingPlan(nonZonedCluster, nonZoned211StoreDef); + + int[] dummyZonedPorts = new int[] { 1000, 2000, 3000, 1000, 2000, 3000, 1000, 2000, 3000, + 1000, 2000, 3000, 1000, 2000, 3000, 1000, 2000, 3000 }; + Cluster zonedCluster = ServerTestUtils.getLocalZonedCluster(6, + 2, + new int[] { 0, 0, 0, 1, 1, 1 }, + new int[][] { { 0 }, { 1, 6 }, + { 2 }, { 3 }, { 4, 7 }, + { 5 } }, + dummyZonedPorts); + HashMap zrfRWStoreWithReplication = new HashMap(); + zrfRWStoreWithReplication.put(0, 2); + zrfRWStoreWithReplication.put(1, 2); + StoreDefinition zoned211StoreDef = new StoreDefinitionBuilder().setName("zoned") + .setType(BdbStorageConfiguration.TYPE_NAME) + .setKeySerializer(new SerializerDefinition("string")) + .setValueSerializer(new SerializerDefinition("string")) + .setRoutingPolicy(RoutingTier.CLIENT) + .setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY) + .setReplicationFactor(4) + .setPreferredReads(1) + .setRequiredReads(1) + .setPreferredWrites(1) + .setRequiredWrites(1) + .setZoneCountReads(0) + .setZoneCountWrites(0) + .setZoneReplicationFactor(zrfRWStoreWithReplication) + .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY) + .build(); + zonedRoutingPlan = new StoreRoutingPlan(zonedCluster, zoned211StoreDef); + } + + @Test + public void testZonedStoreRoutingPlan() { + HashMap> samplePartitionKeysMap = TestUtils.createPartitionsKeys(zonedRoutingPlan, + 1); + assertEquals("Node 1 does not contain p5?", + (Integer) 6, + zonedRoutingPlan.getNodesPartitionIdForKey(1, samplePartitionKeysMap.get(5) + .get(0))); + assertEquals("Node 4 does not contain p5?", + (Integer) 7, + zonedRoutingPlan.getNodesPartitionIdForKey(4, samplePartitionKeysMap.get(5) + .get(0))); + assertEquals("Replication list does not match up", + Lists.newArrayList(0, 1, 3, 4), + zonedRoutingPlan.getReplicationNodeList(0)); + + assertEquals("Zone replica type should be 1", + 1, + zonedRoutingPlan.getZoneReplicaType(0, 0, samplePartitionKeysMap.get(6).get(0))); + assertEquals("Zone replica type should be 0", + 0, + zonedRoutingPlan.getZoneReplicaType(0, 1, samplePartitionKeysMap.get(6).get(0))); + assertEquals("Zone replica type should be 1", + 1, + zonedRoutingPlan.getZoneReplicaType(1, 3, samplePartitionKeysMap.get(7).get(0))); + assertEquals("Zone replica type should be 0", + 0, + zonedRoutingPlan.getZoneReplicaType(1, 4, samplePartitionKeysMap.get(7).get(0))); + + assertEquals("Replica owner should be 1", + 1, + zonedRoutingPlan.getZoneReplicaNode(0, 1, samplePartitionKeysMap.get(2).get(0))); + assertEquals("Replica owner should be 1", + 1, + zonedRoutingPlan.getZoneReplicaNode(0, 0, samplePartitionKeysMap.get(3).get(0))); + assertEquals("Replica owner should be 4", + 4, + zonedRoutingPlan.getZoneReplicaNode(1, 1, samplePartitionKeysMap.get(1).get(0))); + assertEquals("Replica owner should be 3", + 3, + zonedRoutingPlan.getZoneReplicaNode(1, 0, samplePartitionKeysMap.get(2).get(0))); + } + + @Test + public void testNonZonedStoreRoutingPlan() { + HashMap> samplePartitionKeysMap = TestUtils.createPartitionsKeys(nonZonedRoutingPlan, + 1); + + assertEquals("Node 1 does not contain p2 as secondary?", + (Integer) 3, + nonZonedRoutingPlan.getNodesPartitionIdForKey(1, samplePartitionKeysMap.get(2) + .get(0))); + assertEquals("Replication list does not match up", + Lists.newArrayList(1, 2), + nonZonedRoutingPlan.getReplicationNodeList(1)); + + assertEquals("Zone replica type should be 1", + 1, + nonZonedRoutingPlan.getZoneReplicaType(Zone.DEFAULT_ZONE_ID, + 2, + samplePartitionKeysMap.get(1).get(0))); + assertEquals("Zone replica type should be 0", + 0, + nonZonedRoutingPlan.getZoneReplicaType(Zone.DEFAULT_ZONE_ID, + 1, + samplePartitionKeysMap.get(3).get(0))); + assertEquals("Replica owner should be 2", + 2, + nonZonedRoutingPlan.getZoneReplicaNode(Zone.DEFAULT_ZONE_ID, + 1, + samplePartitionKeysMap.get(1).get(0))); + assertEquals("Replica owner should be 1", + 1, + nonZonedRoutingPlan.getZoneReplicaNode(Zone.DEFAULT_ZONE_ID, + 0, + samplePartitionKeysMap.get(3).get(0))); + } + + @After + public void teardown() { + + } +} diff --git a/test/unit/voldemort/utils/ClusterForkLiftToolTest.java b/test/unit/voldemort/utils/ClusterForkLiftToolTest.java index a67e2112b2..71822cdccd 100644 --- a/test/unit/voldemort/utils/ClusterForkLiftToolTest.java +++ b/test/unit/voldemort/utils/ClusterForkLiftToolTest.java @@ -24,6 +24,7 @@ import voldemort.client.protocol.admin.AdminClientConfig; import voldemort.cluster.Cluster; import voldemort.cluster.Node; +import voldemort.routing.StoreRoutingPlan; import voldemort.server.VoldemortServer; import voldemort.store.StoreDefinition; import voldemort.store.StoreUtils; @@ -162,7 +163,8 @@ public void setUpClusters() { @Test public void testPrimaryResolvingForkLift() throws Exception { - StoreInstance srcStoreInstance = new StoreInstance(srcCluster, primaryResolvingStoreDef); + StoreRoutingPlan srcStoreInstance = new StoreRoutingPlan(srcCluster, + primaryResolvingStoreDef); // populate data on the source cluster.. for(Map.Entry entry: kvPairs.entrySet()) { @@ -228,7 +230,8 @@ public void testPrimaryResolvingForkLift() throws Exception { @Test public void testGloballyResolvingForkLift() throws Exception { - StoreInstance srcStoreInstance = new StoreInstance(srcCluster, globallyResolvingStoreDef); + StoreRoutingPlan srcStoreInstance = new StoreRoutingPlan(srcCluster, + globallyResolvingStoreDef); // populate data on the source cluster.. for(Map.Entry entry: kvPairs.entrySet()) { @@ -296,7 +299,7 @@ public void testNoresolutionForkLift() throws Exception { int versions = 0; - StoreInstance srcStoreInstance = new StoreInstance(srcCluster, nonResolvingStoreDef); + StoreRoutingPlan srcStoreInstance = new StoreRoutingPlan(srcCluster, nonResolvingStoreDef); // generate a conflict on the master partition int masterNode = srcStoreInstance.getNodeIdForPartitionId(srcStoreInstance.getMasterPartitionId(conflictKey.getBytes("UTF-8")));