Skip to content

Commit

Permalink
Zone N ary helpers + StoreRoutingPlan tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Apr 24, 2013
1 parent cc89784 commit 626ea52
Show file tree
Hide file tree
Showing 22 changed files with 386 additions and 80 deletions.
2 changes: 1 addition & 1 deletion src/java/voldemort/cluster/Zone.java
Expand Up @@ -18,7 +18,7 @@ public Zone(int zoneId, LinkedList<Integer> proximityList) {
}

public Zone() {
this.zoneId = 0;
this.zoneId = DEFAULT_ZONE_ID;
this.proximityList = new LinkedList<Integer>();
}

Expand Down
5 changes: 5 additions & 0 deletions src/java/voldemort/routing/RoutingStrategy.java
Expand Up @@ -49,6 +49,11 @@ public interface RoutingStrategy {
/**
* Get the partition list for the given key.
*
* TODO: The naming of this method is confusing.. it is simply a wrapper
* around {@link RoutingStrategy#getReplicatingPartitionList(int)} that
* takes a key. So, would be good to rename this also as
* getReplicatingPartitionList
*
* @param key The key the operation is operating on
* @return The partition list for the given key
*/
Expand Down
Expand Up @@ -14,7 +14,7 @@
* the License.
*/

package voldemort.utils;
package voldemort.routing;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -24,35 +24,37 @@

import voldemort.VoldemortException;
import voldemort.cluster.Cluster;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.routing.RoutingStrategyType;
import voldemort.cluster.Node;
import voldemort.store.StoreDefinition;
import voldemort.utils.ByteUtils;
import voldemort.utils.ClusterUtils;
import voldemort.utils.NodeUtils;
import voldemort.utils.Pair;
import voldemort.utils.Utils;

import com.google.common.collect.Lists;

// TODO: Add StoreInstanceTest unit test for these helper methods.

/**
* This class wraps up a Cluster object and a StoreDefinition. The methods are
* effectively helper or util style methods for analyzing partitions and so on
* which are a function of both Cluster and StoreDefinition.
* effectively helper or util style methods for querying the routing plan that
* will be generated for a given routing strategy upon store and cluster
* topology information.
*/
public class StoreInstance {

// TODO: (refactor) Improve upon the name "StoreInstance". Object-oriented
// meaning of 'instance' is too easily confused with system notion of an
// "instance of a cluster" (the intended usage in this class name).
public class StoreRoutingPlan {

private final Cluster cluster;
private final StoreDefinition storeDefinition;

private final Map<Integer, Integer> partitionIdToNodeIdMap;
private final RoutingStrategy routingStrategy;

public StoreInstance(Cluster cluster, StoreDefinition storeDefinition) {
public StoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) {
this.cluster = cluster;
this.storeDefinition = storeDefinition;

partitionIdToNodeIdMap = ClusterUtils.getCurrentPartitionMapping(cluster);
this.partitionIdToNodeIdMap = ClusterUtils.getCurrentPartitionMapping(cluster);
this.routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition,
cluster);
}

public Cluster getCluster() {
Expand All @@ -69,29 +71,28 @@ public StoreDefinition getStoreDefinition() {
* @param masterPartitionId
* @return List of partition IDs that replicate the master partition ID.
*/
public List<Integer> getReplicationPartitionList(int masterPartitionId) {
return new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, cluster)
.getReplicatingPartitionList(masterPartitionId);
public List<Integer> getReplicatingPartitionList(int masterPartitionId) {
return this.routingStrategy.getReplicatingPartitionList(masterPartitionId);
}

/**
* Determines list of partition IDs that replicate the key.
*
* @param key
* @return List of partition IDs that replicate the partition ID.
* @return List of partition IDs that replicate the given key
*/
public List<Integer> getReplicationPartitionList(final byte[] key) {
return getReplicationPartitionList(getMasterPartitionId(key));
public List<Integer> getReplicatingPartitionList(final byte[] key) {
return this.routingStrategy.getPartitionList(key);
}

/**
* Determines the list of nodes that the key replicates to
*
* @param key
* @return
* @return list of nodes that key replicates to
*/
public List<Integer> getReplicationNodeList(final byte[] key) {
return getNodeIdListForPartitionIdList(getReplicationPartitionList(key));
return NodeUtils.getNodeIds(this.routingStrategy.routeRequest(key));
}

/**
Expand All @@ -101,8 +102,7 @@ public List<Integer> getReplicationNodeList(final byte[] key) {
* @return
*/
public int getMasterPartitionId(final byte[] key) {
return new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, cluster)
.getMasterPartition(key);
return this.routingStrategy.getMasterPartition(key);
}

/**
Expand All @@ -123,8 +123,11 @@ public int getNodeIdForPartitionId(int partitionId) {
* @return partitionId if found, otherwise null.
*/
public Integer getNodesPartitionIdForKey(int nodeId, final byte[] key) {
List<Integer> partitionIds = getReplicationPartitionList(key);
// this is all the partitions the key replicates to.
List<Integer> partitionIds = getReplicatingPartitionList(key);
for(Integer partitionId: partitionIds) {
// check which of the replicating partitions belongs to the node in
// question
if(getNodeIdForPartitionId(partitionId) == nodeId) {
return partitionId;
}
Expand Down Expand Up @@ -157,8 +160,88 @@ private List<Integer> getNodeIdListForPartitionIdList(List<Integer> partitionIds
return nodeIds;
}

/**
* Returns the list of node ids this partition replicates to.
*
* TODO ideally the {@link RoutingStrategy} should house a routeRequest(int
* partition) method
*
* @param partitionId
* @return
* @throws VoldemortException
*/
public List<Integer> getReplicationNodeList(int partitionId) throws VoldemortException {
return getNodeIdListForPartitionIdList(getReplicationPartitionList(partitionId));
return getNodeIdListForPartitionIdList(getReplicatingPartitionList(partitionId));
}

/**
* Given a key that belong to a given node, returns a number n (< zone
* replication factor), such that the given node holds the key as the nth
* replica of the given zone
*
* eg: if the method returns 1, then given node hosts the key as the zone
* secondary in the given zone
*
* @param zoneId
* @param nodeId
* @param key
* @return
*/
public int getZoneReplicaType(int zoneId, int nodeId, byte[] key) {
List<Node> replicatingNodes = this.routingStrategy.routeRequest(key);
int zoneReplicaType = -1;
for(Node node: replicatingNodes) {
// bump up the replica number once you encounter a node in the given
// zone
if(node.getZoneId() == zoneId) {
zoneReplicaType++;
}
// we are done when we find the given node
if(node.getId() == nodeId) {
return zoneReplicaType;
}
}
if(zoneReplicaType > 0) {
throw new VoldemortException("Node " + nodeId + " not a replica for the key "
+ ByteUtils.toHexString(key) + " in given zone " + zoneId);
} else {
throw new VoldemortException("Could not find any replicas for the key "
+ ByteUtils.toHexString(key) + " in given zone " + zoneId);
}
}

/**
* Given a key and a replica type n (< zone replication factor), figure out
* the node that contains the key as the nth replica in the given zone.
*
* @param zoneId
* @param zoneReplicaType
* @param key
* @return
*/
public int getZoneReplicaNode(int zoneId, int zoneReplicaType, byte[] key) {
List<Node> replicatingNodes = this.routingStrategy.routeRequest(key);
int zoneReplicaTypeCounter = -1;
for(Node node: replicatingNodes) {
// bump up the counter if we encounter a replica in the given zone
if(node.getZoneId() == zoneId) {
zoneReplicaTypeCounter++;
}
// when the counter matches up with the replicaNumber we need, we
// are done.
if(zoneReplicaTypeCounter == zoneReplicaType) {
return node.getId();
}
}
if(zoneReplicaTypeCounter == 0) {
throw new VoldemortException("Could not find any replicas for the key "
+ ByteUtils.toHexString(key) + " in given zone " + zoneId);
} else {
throw new VoldemortException("Could not find " + zoneReplicaType
+ " replicas for the key " + ByteUtils.toHexString(key)
+ " in given zone " + zoneId + ". Only found "
+ zoneReplicaTypeCounter);
}
}

// TODO: (refactor) Move from static methods to non-static methods that use
Expand Down Expand Up @@ -222,9 +305,9 @@ public static boolean checkKeyBelongsToPartition(int nodeId,
cluster)
.getPartitionList(key);
List<Integer> nodePartitions = cluster.getNodeById(nodeId).getPartitionIds();
checkResult = StoreInstance.checkKeyBelongsToPartition(keyPartitions,
nodePartitions,
replicaToPartitionList);
checkResult = StoreRoutingPlan.checkKeyBelongsToPartition(keyPartitions,
nodePartitions,
replicaToPartitionList);
}
return checkResult;
}
Expand Down Expand Up @@ -276,9 +359,9 @@ public static List<Integer> checkKeyBelongsToPartition(byte[] key,
for(Pair<Integer, HashMap<Integer, List<Integer>>> stealNodeToMap: stealerNodeToMappingTuples) {
List<Integer> nodePartitions = cluster.getNodeById(stealNodeToMap.getFirst())
.getPartitionIds();
if(StoreInstance.checkKeyBelongsToPartition(keyPartitions,
nodePartitions,
stealNodeToMap.getSecond())) {
if(StoreRoutingPlan.checkKeyBelongsToPartition(keyPartitions,
nodePartitions,
stealNodeToMap.getSecond())) {
nodesToPush.add(stealNodeToMap.getFirst());
}
}
Expand Down
Expand Up @@ -41,6 +41,7 @@
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.cluster.Cluster;
import voldemort.common.nio.ByteBufferBackedInputStream;
import voldemort.routing.StoreRoutingPlan;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.RequestHandler;
Expand Down Expand Up @@ -68,7 +69,6 @@
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.ReflectUtils;
import voldemort.utils.StoreInstance;
import voldemort.utils.Utils;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.VectorClock;
Expand Down Expand Up @@ -1088,7 +1088,7 @@ public VAdminProto.DeletePartitionEntriesResponse handleDeletePartitionEntries(V
ByteArray key = entry.getFirst();
Versioned<byte[]> value = entry.getSecond();
throttler.maybeThrottle(key.length() + valueSize(value));
if(StoreInstance.checkKeyBelongsToPartition(metadataStore.getNodeId(),
if(StoreRoutingPlan.checkKeyBelongsToPartition(metadataStore.getNodeId(),
key.get(),
replicaToPartitionList,
request.hasInitialCluster() ? new ClusterMapper().readCluster(new StringReader(request.getInitialCluster()))
Expand Down
Expand Up @@ -30,6 +30,7 @@
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.cluster.Cluster;
import voldemort.routing.StoreRoutingPlan;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.StreamRequestHandler;
Expand All @@ -42,7 +43,6 @@
import voldemort.utils.ByteArray;
import voldemort.utils.EventThrottler;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.StoreInstance;
import voldemort.utils.Time;
import voldemort.xml.ClusterMapper;

Expand Down Expand Up @@ -90,7 +90,7 @@ public abstract class FetchStreamRequestHandler implements StreamRequestHandler

protected boolean fetchOrphaned;

protected final StoreInstance storeInstance;
protected final StoreRoutingPlan storeInstance;

protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
Expand Down Expand Up @@ -119,7 +119,7 @@ protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest req
} else {
this.initialCluster = metadataStore.getCluster();
}
this.storeInstance = new StoreInstance(this.initialCluster, this.storeDef);
this.storeInstance = new StoreRoutingPlan(this.initialCluster, this.storeDef);

this.throttler = new EventThrottler(voldemortConfig.getStreamMaxReadBytesPerSec());
if(request.hasFilter()) {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;

import voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest;
import voldemort.routing.StoreRoutingPlan;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.store.ErrorCodeMapper;
Expand All @@ -31,7 +32,6 @@
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.StoreInstance;
import voldemort.utils.Utils;

/**
Expand Down Expand Up @@ -102,7 +102,7 @@ private Integer getKeyPartitionId(byte[] key) {
* @return true iff key is needed.
*/
protected boolean isKeyNeeded(byte[] key) {
if(!StoreInstance.checkKeyBelongsToPartition(nodeId,
if(!StoreRoutingPlan.checkKeyBelongsToPartition(nodeId,
key,
replicaToPartitionList,
initialCluster,
Expand Down Expand Up @@ -134,7 +134,7 @@ protected boolean isItemAccepted(byte[] key) {
entryAccepted = true;
}
} else {
if(!StoreInstance.checkKeyBelongsToNode(key, nodeId, initialCluster, storeDef)) {
if(!StoreRoutingPlan.checkKeyBelongsToNode(key, nodeId, initialCluster, storeDef)) {
entryAccepted = true;
}
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest;
import voldemort.routing.StoreRoutingPlan;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.store.ErrorCodeMapper;
Expand All @@ -32,7 +33,6 @@
import voldemort.utils.ClosableIterator;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.Pair;
import voldemort.utils.StoreInstance;
import voldemort.versioning.Versioned;

import com.google.protobuf.Message;
Expand Down Expand Up @@ -86,7 +86,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// Check the current node contains the partition as the
// requested replicatype
if(!fetchedPartitions.contains(currentPartition)
&& StoreInstance.checkPartitionBelongsToNode(currentPartition,
&& StoreRoutingPlan.checkPartitionBelongsToNode(currentPartition,
currentReplicaType,
nodeId,
initialCluster,
Expand Down
Expand Up @@ -23,6 +23,7 @@
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest;
import voldemort.routing.StoreRoutingPlan;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.store.ErrorCodeMapper;
Expand All @@ -31,7 +32,6 @@
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.StoreInstance;

import com.google.protobuf.Message;

Expand Down Expand Up @@ -84,7 +84,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// Check the current node contains the partition as the
// requested replicatype
if(!fetchedPartitions.contains(currentPartition)
&& StoreInstance.checkPartitionBelongsToNode(currentPartition,
&& StoreRoutingPlan.checkPartitionBelongsToNode(currentPartition,
currentReplicaType,
nodeId,
initialCluster,
Expand Down

0 comments on commit 626ea52

Please sign in to comment.