Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Addressed all code review comments for KeySampler and KeyVersionFetch…

…er. Renamed many classes and methods related to FetchStreamRequestHandler.

- All sub-classes of FetchStreamRequestHandler have been renamed to have a more consistent nomenclature.
- Did some further refactoring in the FullScan* classes to move more work from leaf classes to FullScanFetchRequestHandler.java
- moved scan accounting to overall bae class
- Added getNodesPartitionIdForKey method to StoreInstance to help with some fetch logic
  • Loading branch information...
commit 1a0b8f7cac3c9a35cd178ece25448f75433cc1dd 1 parent 96862f2
@jayjwylie jayjwylie authored
View
8 src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
@@ -547,14 +547,14 @@ public StreamRequestHandler handleFetchPartitionEntries(VAdminProto.FetchPartiti
if(fetchValues) {
if(storageEngine.isPartitionScanSupported() && !fetchOrphaned)
- return new FetchPartitionEntriesStreamRequestHandler(request,
+ return new PartitionScanFetchEntriesRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader);
else
- return new FetchEntriesStreamRequestHandler(request,
+ return new FullScanFetchEntriesRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
@@ -562,14 +562,14 @@ public StreamRequestHandler handleFetchPartitionEntries(VAdminProto.FetchPartiti
networkClassLoader);
} else {
if(storageEngine.isPartitionScanSupported() && !fetchOrphaned)
- return new FetchPartitionKeysStreamRequestHandler(request,
+ return new PartitionScanFetchKeysRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader);
else
- return new FetchKeysStreamRequestHandler(request,
+ return new FullScanFetchKeysRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
View
20 src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java
@@ -42,6 +42,7 @@
import voldemort.utils.ByteArray;
import voldemort.utils.EventThrottler;
import voldemort.utils.NetworkClassLoader;
+import voldemort.utils.StoreInstance;
import voldemort.utils.Time;
import voldemort.xml.ClusterMapper;
@@ -85,10 +86,12 @@
protected int nodeId;
- protected StoreDefinition storeDef;
+ protected final StoreDefinition storeDef;
protected boolean fetchOrphaned;
+ protected final StoreInstance storeInstance;
+
protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
ErrorCodeMapper errorCodeMapper,
@@ -116,6 +119,8 @@ protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest req
} else {
this.initialCluster = metadataStore.getCluster();
}
+ this.storeInstance = new StoreInstance(this.initialCluster, this.storeDef);
+
this.throttler = new EventThrottler(voldemortConfig.getStreamMaxReadBytesPerSec());
if(request.hasFilter()) {
this.filter = AdminServiceRequestHandler.getFilterFromRequest(request.getFilter(),
@@ -189,6 +194,19 @@ protected void progressInfoMessage(final String tag) {
}
/**
+ * Account for item being scanned.
+ *
+ * @param itemTag mad libs style string to insert into progress message.
+ *
+ */
+ protected void accountForScanProgress(String itemTag) {
+ scanned++;
+ if(0 == scanned % STAT_RECORDS_INTERVAL) {
+ progressInfoMessage("Fetch " + itemTag + " (progress)");
+ }
+ }
+
+ /**
* Helper method to send message on outputStream and account for network
* time stats.
*
View
54 src/java/voldemort/server/protocol/admin/FullScanFetchEntriesRequestHandler.java
@@ -31,7 +31,6 @@
import voldemort.store.stats.StreamingStats.Operation;
import voldemort.utils.ByteArray;
import voldemort.utils.NetworkClassLoader;
-import voldemort.utils.StoreInstance;
import voldemort.versioning.Versioned;
import com.google.protobuf.Message;
@@ -43,14 +42,14 @@
* unwanted keys and then call storageEngine.get() for valid keys.
* <p>
*/
-public class FetchEntriesStreamRequestHandler extends FetchItemsStreamRequestHandler {
-
- public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request,
- MetadataStore metadataStore,
- ErrorCodeMapper errorCodeMapper,
- VoldemortConfig voldemortConfig,
- StoreRepository storeRepository,
- NetworkClassLoader networkClassLoader) {
+public class FullScanFetchEntriesRequestHandler extends FullScanFetchStreamRequestHandler {
+
+ public FullScanFetchEntriesRequestHandler(FetchPartitionEntriesRequest request,
+ MetadataStore metadataStore,
+ ErrorCodeMapper errorCodeMapper,
+ VoldemortConfig voldemortConfig,
+ StoreRepository storeRepository,
+ NetworkClassLoader networkClassLoader) {
super(request,
metadataStore,
errorCodeMapper,
@@ -82,25 +81,14 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// Cannot invoke 'throttler.maybeThrottle(key.length());' here since
// that would affect timing measurements of storage operations.
- boolean entryAccepted = false;
- if(!fetchOrphaned) {
- if(keyIsNeeded(key.get())) {
- entryAccepted = true;
- }
- } else {
- if(!StoreInstance.checkKeyBelongsToNode(key.get(), nodeId, initialCluster, storeDef)) {
- entryAccepted = true;
- }
- }
-
- if(entryAccepted) {
+ if(isItemAccepted(key.get())) {
List<Versioned<byte[]>> values = storageEngine.get(key, null);
reportStorageOpTime(startNs);
throttler.maybeThrottle(key.length());
for(Versioned<byte[]> value: values) {
if(filter.accept(key, value)) {
- keyFetched(key.get());
+ accountForFetchedKey(key.get());
VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder()
@@ -120,27 +108,9 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
throttler.maybeThrottle(key.length());
}
- // log progress
- scanned++;
- if(0 == scanned % STAT_RECORDS_INTERVAL) {
- progressInfoMessage("Fetch entries (progress)");
- }
-
- if(keyIterator.hasNext() && !fetchedEnough()) {
- return StreamRequestHandlerState.WRITING;
- } else {
- logger.info("Finished fetch entries for store '" + storageEngine.getName()
- + "' with replica to partition mapping " + replicaToPartitionList);
- progressInfoMessage("Fetch entries (end of scan)");
+ accountForScanProgress("entries");
- return StreamRequestHandlerState.COMPLETE;
- }
+ return determineRequestHandlerState("entries");
}
- @Override
- public final void close(DataOutputStream outputStream) throws IOException {
- if(null != keyIterator)
- keyIterator.close();
- super.close(outputStream);
- }
}
View
66 src/java/voldemort/server/protocol/admin/FullScanFetchKeysRequestHandler.java
@@ -30,7 +30,6 @@
import voldemort.store.stats.StreamingStats.Operation;
import voldemort.utils.ByteArray;
import voldemort.utils.NetworkClassLoader;
-import voldemort.utils.StoreInstance;
import com.google.protobuf.Message;
@@ -38,14 +37,14 @@
* Fetches keys by scanning entire storage engine in storage-order.
*
*/
-public class FetchKeysStreamRequestHandler extends FetchItemsStreamRequestHandler {
-
- public FetchKeysStreamRequestHandler(FetchPartitionEntriesRequest request,
- MetadataStore metadataStore,
- ErrorCodeMapper errorCodeMapper,
- VoldemortConfig voldemortConfig,
- StoreRepository storeRepository,
- NetworkClassLoader networkClassLoader) {
+public class FullScanFetchKeysRequestHandler extends FullScanFetchStreamRequestHandler {
+
+ public FullScanFetchKeysRequestHandler(FetchPartitionEntriesRequest request,
+ MetadataStore metadataStore,
+ ErrorCodeMapper errorCodeMapper,
+ VoldemortConfig voldemortConfig,
+ StoreRepository storeRepository,
+ NetworkClassLoader networkClassLoader) {
super(request,
metadataStore,
errorCodeMapper,
@@ -73,49 +72,20 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
throttler.maybeThrottle(key.length());
- boolean keyAccepted = false;
- if(!fetchOrphaned) {
- if(keyIsNeeded(key.get()) && filter.accept(key, null)) {
- keyAccepted = true;
- }
-
- } else {
- if(!StoreInstance.checkKeyBelongsToNode(key.get(), nodeId, initialCluster, storeDef)) {
- keyAccepted = true;
- }
- }
-
- if(keyAccepted) {
- keyFetched(key.get());
-
- VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
- response.setKey(ProtoUtils.encodeBytes(key));
- Message message = response.build();
+ if(isItemAccepted(key.get())) {
+ if(filter.accept(key, null)) {
+ accountForFetchedKey(key.get());
- sendMessage(outputStream, message);
- }
+ VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
+ response.setKey(ProtoUtils.encodeBytes(key));
+ Message message = response.build();
- // log progress
- scanned++;
- if(0 == scanned % STAT_RECORDS_INTERVAL) {
- progressInfoMessage("Fetch keys (progress)");
+ sendMessage(outputStream, message);
+ }
}
- if(keyIterator.hasNext() && !fetchedEnough()) {
- return StreamRequestHandlerState.WRITING;
- } else {
- logger.info("Finished fetch keys for store '" + storageEngine.getName()
- + "' with replica to partition mapping " + replicaToPartitionList);
- progressInfoMessage("Fetch keys (end of scan)");
+ accountForScanProgress("keys");
- return StreamRequestHandlerState.COMPLETE;
- }
- }
-
- @Override
- public final void close(DataOutputStream outputStream) throws IOException {
- if(null != keyIterator)
- keyIterator.close();
- super.close(outputStream);
+ return determineRequestHandlerState("keys");
}
}
View
134 src/java/voldemort/server/protocol/admin/FullScanFetchStreamRequestHandler.java
@@ -1,8 +1,11 @@
package voldemort.server.protocol.admin;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest;
import voldemort.server.StoreRepository;
@@ -17,24 +20,26 @@
import voldemort.utils.Utils;
/**
- * Base class for key/entry stream fetching handlers that do not rely on PID
- * layout.
+ * Base class for key/entry stream fetching handlers that do an unordered full
+ * scan to fetch items.
*
*/
-public abstract class FetchItemsStreamRequestHandler extends FetchStreamRequestHandler {
+public abstract class FullScanFetchStreamRequestHandler extends FetchStreamRequestHandler {
protected final ClosableIterator<ByteArray> keyIterator;
// PartitionId to count of fetches on that partition.
protected Map<Integer, Long> partitionFetches;
-
- public FetchItemsStreamRequestHandler(FetchPartitionEntriesRequest request,
- MetadataStore metadataStore,
- ErrorCodeMapper errorCodeMapper,
- VoldemortConfig voldemortConfig,
- StoreRepository storeRepository,
- NetworkClassLoader networkClassLoader,
- StreamingStats.Operation operation) {
+ // PartitionIds of partitions that still need more fetched...
+ protected Set<Integer> partitionsToFetch;
+
+ public FullScanFetchStreamRequestHandler(FetchPartitionEntriesRequest request,
+ MetadataStore metadataStore,
+ ErrorCodeMapper errorCodeMapper,
+ VoldemortConfig voldemortConfig,
+ StoreRepository storeRepository,
+ NetworkClassLoader networkClassLoader,
+ StreamingStats.Operation operation) {
super(request,
metadataStore,
errorCodeMapper,
@@ -53,26 +58,18 @@ public FetchItemsStreamRequestHandler(FetchPartitionEntriesRequest request,
}
}
}
+ this.partitionsToFetch = new HashSet<Integer>(partitionFetches.keySet());
}
/**
* Given the key, figures out which partition on the local node hosts the
- * key based on contents of the "replica to partition list" data structure.
+ * key.
*
* @param key
* @return
*/
private Integer getKeyPartitionId(byte[] key) {
- StoreInstance storeInstance = new StoreInstance(initialCluster, storeDef);
- Integer keyPartitionId = null;
- for(Integer partitionId: storeInstance.getReplicationPartitionList(key)) {
- for(Map.Entry<Integer, List<Integer>> rtps: replicaToPartitionList.entrySet()) {
- if(rtps.getValue().contains(partitionId)) {
- keyPartitionId = partitionId;
- break;
- }
- }
- }
+ Integer keyPartitionId = storeInstance.getNodesPartitionIdForKey(nodeId, key);
Utils.notNull(keyPartitionId);
return keyPartitionId;
}
@@ -89,7 +86,7 @@ private Integer getKeyPartitionId(byte[] key) {
* @param storeDef
* @return true iff key is needed.
*/
- protected boolean keyIsNeeded(byte[] key) {
+ protected boolean isKeyNeeded(byte[] key) {
if(!StoreInstance.checkKeyBelongsToPartition(nodeId,
key,
replicaToPartitionList,
@@ -101,16 +98,32 @@ protected boolean keyIsNeeded(byte[] key) {
if(recordsPerPartition <= 0) {
return true;
}
-
- Integer keyPartitionId = getKeyPartitionId(key);
- Long partitionFetch = partitionFetches.get(keyPartitionId);
- Utils.notNull(partitionFetch);
-
- if(partitionFetch >= recordsPerPartition) {
- return false;
+ if(partitionsToFetch.contains(getKeyPartitionId(key))) {
+ return true;
}
+ return false;
+ }
- return true;
+ /**
+ * Determines if entry is accepted. For normal usage, this means confirming
+ * that the key is needed. For orphan usage, this simply means confirming
+ * the key belongs to the node.
+ *
+ * @param key
+ * @return
+ */
+ protected boolean isItemAccepted(byte[] key) {
+ boolean entryAccepted = false;
+ if(!fetchOrphaned) {
+ if(isKeyNeeded(key)) {
+ entryAccepted = true;
+ }
+ } else {
+ if(!StoreInstance.checkKeyBelongsToNode(key, nodeId, initialCluster, storeDef)) {
+ entryAccepted = true;
+ }
+ }
+ return entryAccepted;
}
/**
@@ -118,7 +131,7 @@ protected boolean keyIsNeeded(byte[] key) {
*
* @param key
*/
- protected void keyFetched(byte[] key) {
+ protected void accountForFetchedKey(byte[] key) {
fetched++;
if(streamStats != null) {
streamStats.reportStreamingFetch(operation);
@@ -131,27 +144,62 @@ protected void keyFetched(byte[] key) {
Integer keyPartitionId = getKeyPartitionId(key);
Long partitionFetch = partitionFetches.get(keyPartitionId);
Utils.notNull(partitionFetch);
-
- partitionFetches.put(keyPartitionId, partitionFetch + 1);
+ partitionFetch++;
+
+ partitionFetches.put(keyPartitionId, partitionFetch);
+ if(partitionFetch == recordsPerPartition) {
+ if(partitionsToFetch.contains(keyPartitionId)) {
+ partitionsToFetch.remove(keyPartitionId);
+ } else {
+ logger.warn("Partitions to fetch did not contain expected partition ID: "
+ + keyPartitionId);
+ }
+ } else if(partitionFetch > recordsPerPartition) {
+ logger.warn("Partition fetch count larger than expected for partition ID "
+ + keyPartitionId + " : " + partitionFetch);
+ }
}
/**
- * True iff enough partitions have been fetched relative to
- * recordsPerPartition value.
+ * True iff enough items have been fetched for all partitions, where
+ * 'enough' is relative to recordsPerPartition value.
*
- * @param partitionFetched Records fetched for current partition
* @return
*/
- protected boolean fetchedEnough() {
+ protected boolean fetchedEnoughForAllPartitions() {
if(recordsPerPartition <= 0) {
return false;
}
- for(Map.Entry<Integer, Long> partitionFetch: partitionFetches.entrySet()) {
- if(partitionFetch.getValue() < recordsPerPartition) {
- return false;
- }
+ if(partitionsToFetch.size() > 0) {
+ return false;
}
return true;
}
+
+ /**
+ * Determines if still WRITING or COMPLETE.
+ *
+ * @param itemTag mad libs style string to insert into progress message.
+ * @return
+ */
+ protected StreamRequestHandlerState determineRequestHandlerState(String itemTag) {
+
+ if(keyIterator.hasNext() && !fetchedEnoughForAllPartitions()) {
+ return StreamRequestHandlerState.WRITING;
+ } else {
+ logger.info("Finished fetch " + itemTag + " for store '" + storageEngine.getName()
+ + "' with replica to partition mapping " + replicaToPartitionList);
+ progressInfoMessage("Fetch " + itemTag + " (end of scan)");
+
+ return StreamRequestHandlerState.COMPLETE;
+ }
+ }
+
+ @Override
+ public final void close(DataOutputStream outputStream) throws IOException {
+ if(null != keyIterator)
+ keyIterator.close();
+ super.close(outputStream);
+ }
}
View
24 src/java/voldemort/server/protocol/admin/PartitionScanFetchEntriesRequestHandler.java
@@ -42,16 +42,16 @@
* isPartitionScanSupported() is true for the storage engine to be scanned..
*
*/
-public class FetchPartitionEntriesStreamRequestHandler extends FetchPartitionStreamRequestHandler {
+public class PartitionScanFetchEntriesRequestHandler extends PartitionScanFetchStreamRequestHandler {
protected ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> entriesPartitionIterator;
- public FetchPartitionEntriesStreamRequestHandler(FetchPartitionEntriesRequest request,
- MetadataStore metadataStore,
- ErrorCodeMapper errorCodeMapper,
- VoldemortConfig voldemortConfig,
- StoreRepository storeRepository,
- NetworkClassLoader networkClassLoader) {
+ public PartitionScanFetchEntriesRequestHandler(FetchPartitionEntriesRequest request,
+ MetadataStore metadataStore,
+ ErrorCodeMapper errorCodeMapper,
+ VoldemortConfig voldemortConfig,
+ StoreRepository storeRepository,
+ NetworkClassLoader networkClassLoader) {
super(request,
metadataStore,
errorCodeMapper,
@@ -92,8 +92,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
initialCluster,
storeDef)) {
found = true;
- fetchedPartitions.add(currentPartition);
- partitionFetched = 0;
+ completedFetchingCurrentPartition();
entriesPartitionIterator = storageEngine.entries(currentPartition);
statusInfoMessage("Starting fetch entries");
}
@@ -126,13 +125,10 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
throttler.maybeThrottle(AdminServiceRequestHandler.valueSize(value));
}
- scanned++;
- if(0 == scanned % STAT_RECORDS_INTERVAL) {
- progressInfoMessage("Fetch entries (progress)");
- }
+ accountForScanProgress("entries");
}
- if(!entriesPartitionIterator.hasNext() || fetchedEnough(partitionFetched)) {
+ if(!entriesPartitionIterator.hasNext() || fetchedEnoughForCurrentPartition()) {
// Finished current partition. Reset iterator. Info status.
entriesPartitionIterator.close();
entriesPartitionIterator = null;
View
24 src/java/voldemort/server/protocol/admin/PartitionScanFetchKeysRequestHandler.java
@@ -40,16 +40,16 @@
* isPartitionScanSupported() is true for the storage engine to be scanned..
*
*/
-public class FetchPartitionKeysStreamRequestHandler extends FetchPartitionStreamRequestHandler {
+public class PartitionScanFetchKeysRequestHandler extends PartitionScanFetchStreamRequestHandler {
protected ClosableIterator<ByteArray> keysPartitionIterator;
- public FetchPartitionKeysStreamRequestHandler(FetchPartitionEntriesRequest request,
- MetadataStore metadataStore,
- ErrorCodeMapper errorCodeMapper,
- VoldemortConfig voldemortConfig,
- StoreRepository storeRepository,
- NetworkClassLoader networkClassLoader) {
+ public PartitionScanFetchKeysRequestHandler(FetchPartitionEntriesRequest request,
+ MetadataStore metadataStore,
+ ErrorCodeMapper errorCodeMapper,
+ VoldemortConfig voldemortConfig,
+ StoreRepository storeRepository,
+ NetworkClassLoader networkClassLoader) {
super(request,
metadataStore,
errorCodeMapper,
@@ -90,8 +90,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
initialCluster,
storeDef)) {
found = true;
- fetchedPartitions.add(currentPartition);
- partitionFetched = 0;
+ completedFetchingCurrentPartition();
keysPartitionIterator = storageEngine.keys(currentPartition);
statusInfoMessage("Starting fetch keys");
}
@@ -116,13 +115,10 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
sendMessage(outputStream, message);
}
- scanned++;
- if(0 == scanned % STAT_RECORDS_INTERVAL) {
- progressInfoMessage("Fetch keys (progress)");
- }
+ accountForScanProgress("keys");
}
- if(!keysPartitionIterator.hasNext() || fetchedEnough(partitionFetched)) {
+ if(!keysPartitionIterator.hasNext() || fetchedEnoughForCurrentPartition()) {
// Finished current partition. Reset iterator. Info status.
keysPartitionIterator.close();
keysPartitionIterator = null;
View
42 src/java/voldemort/server/protocol/admin/PartitionScanFetchStreamRequestHandler.java
@@ -34,7 +34,7 @@
* isPartitionScanSupported() is true for the storage engine to be scanned..
*
*/
-public abstract class FetchPartitionStreamRequestHandler extends FetchStreamRequestHandler {
+public abstract class PartitionScanFetchStreamRequestHandler extends FetchStreamRequestHandler {
protected Set<Integer> fetchedPartitions;
protected List<Integer> replicaTypeList;
@@ -43,15 +43,15 @@
protected Integer currentIndex;
protected Integer currentPartition;
protected Integer currentReplicaType;
- protected long partitionFetched;
+ protected long currentPartitionFetched;
- public FetchPartitionStreamRequestHandler(FetchPartitionEntriesRequest request,
- MetadataStore metadataStore,
- ErrorCodeMapper errorCodeMapper,
- VoldemortConfig voldemortConfig,
- StoreRepository storeRepository,
- NetworkClassLoader networkClassLoader,
- StreamingStats.Operation operation) {
+ public PartitionScanFetchStreamRequestHandler(FetchPartitionEntriesRequest request,
+ MetadataStore metadataStore,
+ ErrorCodeMapper errorCodeMapper,
+ VoldemortConfig voldemortConfig,
+ StoreRepository storeRepository,
+ NetworkClassLoader networkClassLoader,
+ StreamingStats.Operation operation) {
super(request,
metadataStore,
errorCodeMapper,
@@ -63,7 +63,6 @@ public FetchPartitionStreamRequestHandler(FetchPartitionEntriesRequest request,
fetchedPartitions = new HashSet<Integer>();
replicaTypeList = new ArrayList<Integer>();
partitionList = new ArrayList<Integer>();
- currentIndex = 0;
// flatten the replicatype to partition map
for(Integer replicaType: replicaToPartitionList.keySet()) {
@@ -75,9 +74,10 @@ public FetchPartitionStreamRequestHandler(FetchPartitionEntriesRequest request,
}
}
+ currentIndex = 0;
currentPartition = null;
currentReplicaType = null;
- partitionFetched = 0;
+ currentPartitionFetched = 0;
}
/**
@@ -89,23 +89,21 @@ public FetchPartitionStreamRequestHandler(FetchPartitionEntriesRequest request,
protected void statusInfoMessage(final String tag) {
if(logger.isInfoEnabled()) {
logger.info(tag + " : [partition: " + currentPartition + ", replica type:"
- + currentReplicaType + ", partitionFetched: " + partitionFetched
+ + currentReplicaType + ", partitionFetched: " + currentPartitionFetched
+ "] for store " + storageEngine.getName());
}
}
/**
- * True iff enough partitions have been fetched relative to
- * recordsPerPartition value.
+ * True iff enough items have been fetched for current partition
*
- * @param partitionFetched Records fetched for current partition
* @return
*/
- protected boolean fetchedEnough(long partitionFetched) {
+ protected boolean fetchedEnoughForCurrentPartition() {
if(recordsPerPartition <= 0) {
return false;
}
- return (partitionFetched >= recordsPerPartition);
+ return (currentPartitionFetched >= recordsPerPartition);
}
/**
@@ -115,9 +113,17 @@ protected boolean fetchedEnough(long partitionFetched) {
*/
protected void recordFetched() {
fetched++;
- partitionFetched++;
+ currentPartitionFetched++;
if(streamStats != null) {
streamStats.reportStreamingFetch(operation);
}
}
+
+ /**
+ * Called when current partition has been completely fetched.
+ */
+ protected void completedFetchingCurrentPartition() {
+ fetchedPartitions.add(currentPartition);
+ currentPartitionFetched = 0;
+ }
}
View
47 src/java/voldemort/utils/StoreInstance.java
@@ -35,6 +35,7 @@
* effectively helper or util style methods for analyzing partitions and so on
* which are a function of both Cluster and StoreDefinition.
*/
+// TODO: Add StoreInstanceTest unit test for these helper methods.
public class StoreInstance {
private final Cluster cluster;
@@ -58,35 +59,65 @@ public StoreDefinition getStoreDefinition() {
}
/**
+ * Determines list of partition IDs that replicate the master partition ID.
*
- * @param partitionId
- * @return List of partition IDs that replicate the partition ID.
+ * @param masterPartitionId
+ * @return List of partition IDs that replicate the master partition ID.
*/
- public List<Integer> getReplicationPartitionList(int partitionId) {
+ public List<Integer> getReplicationPartitionList(int masterPartitionId) {
return new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, cluster)
- .getReplicatingPartitionList(partitionId);
+ .getReplicatingPartitionList(masterPartitionId);
}
/**
+ * Determines list of partition IDs that replicate the key.
*
* @param key
* @return List of partition IDs that replicate the partition ID.
*/
public List<Integer> getReplicationPartitionList(final byte[] key) {
- return new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, cluster)
- .getReplicatingPartitionList(getMasterPartitionId(key));
+ return getReplicationPartitionList(getMasterPartitionId(key));
}
+ /**
+ * Determines master partition ID for the key.
+ *
+ * @param key
+ * @return
+ */
public int getMasterPartitionId(final byte[] key) {
return new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, cluster)
.getMasterPartition(key);
}
+ /**
+ * Determines node ID that hosts the specified partition ID.
+ *
+ * @param partitionId
+ * @return
+ */
public int getNodeIdForPartitionId(int partitionId) {
return partitionIdToNodeIdMap.get(partitionId);
}
/**
+ * Determines the partition ID that replicates the key on the given node.
+ *
+ * @param nodeId of the node
+ * @param key to look up.
+ * @return partitionId if found, otherwise null.
+ */
+ public Integer getNodesPartitionIdForKey(int nodeId, final byte[] key) {
+ List<Integer> partitionIds = getReplicationPartitionList(key);
+ for(Integer partitionId: partitionIds) {
+ if(getNodeIdForPartitionId(partitionId) == nodeId) {
+ return partitionId;
+ }
+ }
+ return null;
+ }
+
+ /**
* Converts from partitionId to nodeId. The list of partition IDs,
* partitionIds, is expected to be a "replicating partition list", i.e., the
* mapping from partition ID to node ID should be one to one.
@@ -187,7 +218,7 @@ public static boolean checkKeyBelongsToPartition(int nodeId,
*
* @return true if the partition belongs to the node with given replicatype
*/
- public static boolean checkPartitionBelongsToNode(int partition,
+ public static boolean checkPartitionBelongsToNode(int partitionId,
int replicaType,
int nodeId,
Cluster cluster,
@@ -196,7 +227,7 @@ public static boolean checkPartitionBelongsToNode(int partition,
List<Integer> nodePartitions = cluster.getNodeById(nodeId).getPartitionIds();
List<Integer> replicatingPartitions = new RoutingStrategyFactory().updateRoutingStrategy(storeDef,
cluster)
- .getReplicatingPartitionList(partition);
+ .getReplicatingPartitionList(partitionId);
// validate replicaType
if(replicaType < replicatingPartitions.size()) {
// check if the replicaType'th partition in the replicating list,
Please sign in to comment.
Something went wrong with that request. Please try again.