Skip to content

Commit

Permalink
Correctness fixes and significant refactoring of Fetch*StreamRequestH…
Browse files Browse the repository at this point in the history
…andlers. Expanded AdminFetchTest.

Added more common helper methods to common base class of all fetchers FetchStreamRequestHandler.

Added abstract base classes for partition-based fetching and non-partition-based fetching:
- FetchPartitionStreamRequestHandler (partition-based)
- FetchItemsStreamRequestHandler (non-partition-based)

Refactored some code up to abstract base classes and made implementations as similar as possible (without heroic efforts) across all fetchers:
- FetchEntriesStreamRequestHandler
- FetchKeysStreamRequestHandler
- FetchPartitionEntriesStreamRequestHandler
- FetchPartitionKeysStreamRequestHandler

Significant better test coverage in AdminFetchTest
- tests fetching keys as well as fetching entries
- tests partition-aware and non-partition-aware servers
- tests per-partition limits on entries/keys fetched

All of this clean up and additional testing lead to minor correctness fixes.

Minor other clean ups of comments, override annotations, and fixes for KeySamplerCLI.
  • Loading branch information
jayjwylie committed Mar 20, 2013
1 parent 5aca5c0 commit 96d1513
Show file tree
Hide file tree
Showing 13 changed files with 641 additions and 248 deletions.
5 changes: 3 additions & 2 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -1614,9 +1614,10 @@ public Iterator<Pair<ByteArray, Versioned<byte[]>>> fetchEntries(int nodeId,
return fetchEntries(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0);
}

// TODO: " HashMap<Integer, List<Integer>> replicaToPartitionList," is a
// TODO: "HashMap<Integer, List<Integer>> replicaToPartitionList" is a
// confusing/opaque argument. Can this be made a type, or even
// unrolled/simplified?
// unrolled/simplified? The replicaType is pretty much meaningless
// anyhow.

// TODO: The use of "Pair" in the return for a fundamental type is
// awkward. We should have a core KeyValue type that effectively wraps
Expand Down
6 changes: 6 additions & 0 deletions src/java/voldemort/client/protocol/pb/ProtoUtils.java
Expand Up @@ -147,6 +147,12 @@ public static List<PartitionTuple> encodePartitionTuple(HashMap<Integer, List<In
return tuples;
}

/**
*
* @param partitionTuples
* @return HashMap of replica type (Integer) to list of partition IDs
* (Integer)
*/
public static HashMap<Integer, List<Integer>> decodePartitionTuple(List<PartitionTuple> partitionTuples) {
HashMap<Integer, List<Integer>> replicaToPartitionList = Maps.newHashMap();
for(PartitionTuple tuple: partitionTuples) {
Expand Down
Expand Up @@ -1344,6 +1344,7 @@ public VAdminProto.AddStoreResponse handleAddStore(VAdminProto.AddStoreRequest r
* returns
* @return True if the buffer holds a complete request, false otherwise
*/
@Override
public boolean isCompleteRequest(ByteBuffer buffer) {
DataInputStream inputStream = new DataInputStream(new ByteBufferBackedInputStream(buffer));

Expand Down
Expand Up @@ -30,25 +30,20 @@
import voldemort.store.metadata.MetadataStore;
import voldemort.store.stats.StreamingStats.Operation;
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.StoreInstance;
import voldemort.utils.Time;
import voldemort.versioning.Versioned;

import com.google.protobuf.Message;

/**
* FetchEntries fetches and return key/value entry.
* Fetches entries by scanning entire storage engine in storage-order.
* <p>
* For performance reason use storageEngine.keys() iterator to filter out
* unwanted keys and then call storageEngine.get() for valid keys.
* <p>
*/

public class FetchEntriesStreamRequestHandler extends FetchStreamRequestHandler {

protected final ClosableIterator<ByteArray> keyIterator;
public class FetchEntriesStreamRequestHandler extends FetchItemsStreamRequestHandler {

public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
Expand All @@ -63,7 +58,6 @@ public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request,
storeRepository,
networkClassLoader,
Operation.FETCH_ENTRIES);
this.keyIterator = storageEngine.keys();
logger.info("Starting fetch entries for store '" + storageEngine.getName()
+ "' with replica to partition mapping " + replicaToPartitionList);
}
Expand All @@ -72,83 +66,73 @@ public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request,
public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
DataOutputStream outputStream)
throws IOException {
if(!keyIterator.hasNext())
if(!keyIterator.hasNext()) {
return StreamRequestHandlerState.COMPLETE;
}

// NOTE: Storage time is accounted for somewhat incorrectly because
// .hasNext() is invoked at end of method for the common case.

// Since key reading (keyIterator.next()) is done separately from entry
// fetching (storageEngine.get()), must be careful about when to invoke
// reportStorageOpTime and when to invoke maybeThrottle().
long startNs = System.nanoTime();
ByteArray key = keyIterator.next();
if(streamStats != null)
streamStats.reportStreamingScan(operation);

boolean entryAccepted = false;
// Cannot invoke 'throttler.maybeThrottle(key.length());' here since
// that would affect timing measurements of storage operations.

boolean entryAccepted = false;
if(!fetchOrphaned) {
// normal code path
if(StoreInstance.checkKeyBelongsToPartition(nodeId,
key.get(),
replicaToPartitionList,
initialCluster,
storeDef)) {
if(keyIsNeeded(key.get())) {
entryAccepted = true;
}
} else {
// orphaned fetches
if(!StoreInstance.checkKeyBelongsToNode(key.get(), nodeId, initialCluster, storeDef)) {
entryAccepted = true;
}
}

if(entryAccepted) {
List<Versioned<byte[]>> values = storageEngine.get(key, null);
if(streamStats != null)
streamStats.reportStorageTime(operation, System.nanoTime() - startNs);
reportStorageOpTime(startNs);
throttler.maybeThrottle(key.length());
for(Versioned<byte[]> value: values) {
throttler.maybeThrottle(key.length());

if(filter.accept(key, value)) {
fetched++;
if(streamStats != null)
streamStats.reportStreamingFetch(operation);
VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
keyFetched(key.get());

VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder()
.setKey(ProtoUtils.encodeBytes(key))
.setVersioned(ProtoUtils.encodeVersioned(value))
.build();
response.setPartitionEntry(partitionEntry);

Message message = response.build();

startNs = System.nanoTime();
ProtoUtils.writeMessage(outputStream, message);
if(streamStats != null)
streamStats.reportNetworkTime(operation, System.nanoTime() - startNs);
sendMessage(outputStream, message);

throttler.maybeThrottle(AdminServiceRequestHandler.valueSize(value));
}
}
} else {
if(streamStats != null)
streamStats.reportStorageTime(operation, System.nanoTime() - startNs);
reportStorageOpTime(startNs);
throttler.maybeThrottle(key.length());
}

// log progress
counter++;

if(0 == counter % STAT_RECORDS_INTERVAL) {
long totalTime = (System.currentTimeMillis() - startTime) / Time.MS_PER_SECOND;

logger.info("Fetch entries scanned " + counter + " entries, fetched " + fetched
+ " entries for store '" + storageEngine.getName()
+ "' replicaToPartitionList:" + replicaToPartitionList + " in " + totalTime
+ " s");
scanned++;
if(0 == scanned % STAT_RECORDS_INTERVAL) {
progressInfoMessage("Fetch entries (progress)");
}

// TODO: Add logic to FetchKeys and FetchEntries to count keys per
// partition correctly.

if(keyIterator.hasNext() && counter < recordsPerPartition)
if(keyIterator.hasNext() && !fetchedEnough()) {
return StreamRequestHandlerState.WRITING;
else {
} else {
logger.info("Finished fetch entries for store '" + storageEngine.getName()
+ "' with replica to partition mapping " + replicaToPartitionList);
progressInfoMessage("Fetch entries (end of scan)");

return StreamRequestHandlerState.COMPLETE;
}
}
Expand Down
@@ -0,0 +1,157 @@
package voldemort.server.protocol.admin;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.stats.StreamingStats;
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.StoreInstance;
import voldemort.utils.Utils;

/**
* Base class for key/entry stream fetching handlers that do not rely on PID
* layout.
*
*/
public abstract class FetchItemsStreamRequestHandler extends FetchStreamRequestHandler {

protected final ClosableIterator<ByteArray> keyIterator;

// PartitionId to count of fetches on that partition.
protected Map<Integer, Long> partitionFetches;

public FetchItemsStreamRequestHandler(FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
ErrorCodeMapper errorCodeMapper,
VoldemortConfig voldemortConfig,
StoreRepository storeRepository,
NetworkClassLoader networkClassLoader,
StreamingStats.Operation operation) {
super(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
operation);

this.keyIterator = storageEngine.keys();

this.partitionFetches = new HashMap<Integer, Long>();
for(Integer replicaType: replicaToPartitionList.keySet()) {
if(replicaToPartitionList.get(replicaType) != null) {
for(Integer partitionId: replicaToPartitionList.get(replicaType)) {
this.partitionFetches.put(partitionId, new Long(0));
}
}
}
}

/**
* Given the key, figures out which partition on the local node hosts the
* key based on contents of the "replica to partition list" data structure.
*
* @param key
* @return
*/
private Integer getKeyPartitionId(byte[] key) {
StoreInstance storeInstance = new StoreInstance(initialCluster, storeDef);
Integer keyPartitionId = null;
for(Integer partitionId: storeInstance.getReplicationPartitionList(key)) {
for(Map.Entry<Integer, List<Integer>> rtps: replicaToPartitionList.entrySet()) {
if(rtps.getValue().contains(partitionId)) {
keyPartitionId = partitionId;
break;
}
}
}
Utils.notNull(keyPartitionId);
return keyPartitionId;
}

/**
* Determines if the key is needed. To be 'needed', a key must (i) belong to
* a partition being requested and (ii) be necessary to meet
* recordsPerPartition constraint, if any.
*
* @param nodeId
* @param key
* @param replicaToPartitionList
* @param cluster
* @param storeDef
* @return true iff key is needed.
*/
protected boolean keyIsNeeded(byte[] key) {
if(!StoreInstance.checkKeyBelongsToPartition(nodeId,
key,
replicaToPartitionList,
initialCluster,
storeDef)) {
return false;
}

if(recordsPerPartition <= 0) {
return true;
}

Integer keyPartitionId = getKeyPartitionId(key);
Long partitionFetch = partitionFetches.get(keyPartitionId);
Utils.notNull(partitionFetch);

if(partitionFetch >= recordsPerPartition) {
return false;
}

return true;
}

/**
* Account for key being fetched.
*
* @param key
*/
protected void keyFetched(byte[] key) {
fetched++;
if(streamStats != null) {
streamStats.reportStreamingFetch(operation);
}

if(recordsPerPartition <= 0) {
return;
}

Integer keyPartitionId = getKeyPartitionId(key);
Long partitionFetch = partitionFetches.get(keyPartitionId);
Utils.notNull(partitionFetch);

partitionFetches.put(keyPartitionId, partitionFetch + 1);
}

/**
* True iff enough partitions have been fetched relative to
* recordsPerPartition value.
*
* @param partitionFetched Records fetched for current partition
* @return
*/
protected boolean fetchedEnough() {
if(recordsPerPartition <= 0) {
return false;
}

for(Map.Entry<Integer, Long> partitionFetch: partitionFetches.entrySet()) {
if(partitionFetch.getValue() < recordsPerPartition) {
return false;
}
}
return true;
}
}

0 comments on commit 96d1513

Please sign in to comment.