Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added KeySampler and KeyVersionSampler tools as a first step towards …

…replacing "entropy" tool. Added another argument to bulk fetch operations that specifies maxRecords so that server can fetch a subset of a partition.

src/java/voldemort/utils/KeySamplerCLI.java
- Samples keys from a cluster

src/java/voldemort/utils/KeyVersionSamplerCLI.java
- Given file that lists keys per store, samples versions from each "responsible node" for that key

src/java/voldemort/client/protocol/admin/AdminClient.java
- passed maxRecords through
- TODO for future clean up of some types

src/java/voldemort/client/protocol/pb/VAdminProto.java
- auto generated!

src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
- white space

src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java
src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java
src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java
- handle maxRecords

src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java
src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java
- handle maxRecords
- fixed usage of skipRecords

src/java/voldemort/utils/Entropy.java
- added maxRecords

src/proto/voldemort-admin.proto
- added mac_records to protobuff definition

test/unit/voldemort/client/AdminFetchTest.java
- added maxRecords field to test
  • Loading branch information...
commit 7669d86f0463dca2af3a59a1f4e97941a3ef4959 1 parent e50ad0a
@jayjwylie jayjwylie authored
View
38 src/java/voldemort/client/protocol/admin/AdminClient.java
@@ -1432,7 +1432,8 @@ private void initiateFetchRequest(DataOutputStream outputStream,
boolean fetchValues,
boolean fetchMasterEntries,
Cluster initialCluster,
- long skipRecords) throws IOException {
+ long skipRecords,
+ long maxRecords) throws IOException {
HashMap<Integer, List<Integer>> filteredReplicaToPartitionList = Maps.newHashMap();
if(fetchMasterEntries) {
if(!replicaToPartitionList.containsKey(0)) {
@@ -1447,7 +1448,8 @@ private void initiateFetchRequest(DataOutputStream outputStream,
.setFetchValues(fetchValues)
.addAllReplicaToPartition(ProtoUtils.encodePartitionTuple(filteredReplicaToPartitionList))
.setStore(storeName)
- .setSkipRecords(skipRecords);
+ .setSkipRecords(skipRecords)
+ .setMaxRecords(maxRecords);
try {
if(filter != null) {
@@ -1574,14 +1576,16 @@ private void initiateFetchRequest(DataOutputStream outputStream,
List<Integer> partitionList,
VoldemortFilter filter,
boolean fetchMasterEntries,
- long skipRecords) {
+ long skipRecords,
+ long maxRecords) {
return fetchEntries(nodeId,
storeName,
helperOps.getReplicaToPartitionMap(nodeId, storeName, partitionList),
filter,
fetchMasterEntries,
null,
- skipRecords);
+ skipRecords,
+ maxRecords);
}
/**
@@ -1603,9 +1607,13 @@ private void initiateFetchRequest(DataOutputStream outputStream,
List<Integer> partitionList,
VoldemortFilter filter,
boolean fetchMasterEntries) {
- return fetchEntries(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0);
+ return fetchEntries(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0, 0);
}
+ // TODO: " HashMap<Integer, List<Integer>> replicaToPartitionList," is a
+ // confusing/opaque argument. Can this be made a type, or even
+ // unrolled/simplified?
+
// TODO: The use of "Pair" in the return for a fundamental type is
// awkward. We should have a core KeyValue type that effectively wraps
// up a ByteArray and a Versioned<byte[]>.
@@ -1645,7 +1653,8 @@ private void initiateFetchRequest(DataOutputStream outputStream,
VoldemortFilter filter,
boolean fetchMasterEntries,
Cluster initialCluster,
- long skipRecords) {
+ long skipRecords,
+ long maxRecords) {
Node node = AdminClient.this.getAdminClientCluster().getNodeById(nodeId);
final SocketDestination destination = new SocketDestination(node.getHost(),
@@ -1663,7 +1672,8 @@ private void initiateFetchRequest(DataOutputStream outputStream,
true,
fetchMasterEntries,
initialCluster,
- skipRecords);
+ skipRecords,
+ maxRecords);
} catch(IOException e) {
helperOps.close(sands.getSocket());
socketPool.checkin(destination, sands);
@@ -1790,14 +1800,16 @@ public ByteArray computeNext() {
List<Integer> partitionList,
VoldemortFilter filter,
boolean fetchMasterEntries,
- long skipRecords) {
+ long skipRecords,
+ long maxRecords) {
return fetchKeys(nodeId,
storeName,
helperOps.getReplicaToPartitionMap(nodeId, storeName, partitionList),
filter,
fetchMasterEntries,
null,
- skipRecords);
+ skipRecords,
+ maxRecords);
}
/**
@@ -1819,7 +1831,7 @@ public ByteArray computeNext() {
List<Integer> partitionList,
VoldemortFilter filter,
boolean fetchMasterEntries) {
- return fetchKeys(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0);
+ return fetchKeys(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0, 0);
}
/**
@@ -1843,7 +1855,8 @@ public ByteArray computeNext() {
VoldemortFilter filter,
boolean fetchMasterEntries,
Cluster initialCluster,
- long skipRecords) {
+ long skipRecords,
+ long maxRecords) {
Node node = AdminClient.this.getAdminClientCluster().getNodeById(nodeId);
final SocketDestination destination = new SocketDestination(node.getHost(),
node.getAdminPort(),
@@ -1860,7 +1873,8 @@ public ByteArray computeNext() {
false,
fetchMasterEntries,
initialCluster,
- skipRecords);
+ skipRecords,
+ maxRecords);
} catch(IOException e) {
helperOps.close(sands.getSocket());
socketPool.checkin(destination, sands);
View
368 src/java/voldemort/client/protocol/pb/VAdminProto.java
@@ -4507,6 +4507,13 @@ public FetchPartitionEntriesRequest getDefaultInstanceForType() {
public boolean hasFetchOrphaned() { return hasFetchOrphaned; }
public boolean getFetchOrphaned() { return fetchOrphaned_; }
+ // optional int64 max_records = 8;
+ public static final int MAX_RECORDS_FIELD_NUMBER = 8;
+ private boolean hasMaxRecords;
+ private long maxRecords_ = 0L;
+ public boolean hasMaxRecords() { return hasMaxRecords; }
+ public long getMaxRecords() { return maxRecords_; }
+
private void initFields() {
filter_ = voldemort.client.protocol.pb.VAdminProto.VoldemortFilter.getDefaultInstance();
}
@@ -4545,6 +4552,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output)
if (hasFetchOrphaned()) {
output.writeBool(7, getFetchOrphaned());
}
+ if (hasMaxRecords()) {
+ output.writeInt64(8, getMaxRecords());
+ }
getUnknownFields().writeTo(output);
}
@@ -4582,6 +4592,10 @@ public int getSerializedSize() {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(7, getFetchOrphaned());
}
+ if (hasMaxRecords()) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(8, getMaxRecords());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -4768,6 +4782,9 @@ public Builder mergeFrom(voldemort.client.protocol.pb.VAdminProto.FetchPartition
if (other.hasFetchOrphaned()) {
setFetchOrphaned(other.getFetchOrphaned());
}
+ if (other.hasMaxRecords()) {
+ setMaxRecords(other.getMaxRecords());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -4828,6 +4845,10 @@ public Builder mergeFrom(
setFetchOrphaned(input.readBool());
break;
}
+ case 64: {
+ setMaxRecords(input.readInt64());
+ break;
+ }
}
}
}
@@ -5017,6 +5038,24 @@ public Builder clearFetchOrphaned() {
return this;
}
+ // optional int64 max_records = 8;
+ public boolean hasMaxRecords() {
+ return result.hasMaxRecords();
+ }
+ public long getMaxRecords() {
+ return result.getMaxRecords();
+ }
+ public Builder setMaxRecords(long value) {
+ result.hasMaxRecords = true;
+ result.maxRecords_ = value;
+ return this;
+ }
+ public Builder clearMaxRecords() {
+ result.hasMaxRecords = false;
+ result.maxRecords_ = 0L;
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:voldemort.FetchPartitionEntriesRequest)
}
@@ -23124,174 +23163,175 @@ public Builder clearReserveMemory() {
"e\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"d\n\032Fe" +
"tchPartitionFilesRequest\022\r\n\005store\030\001 \002(\t\022" +
"7\n\024replica_to_partition\030\002 \003(\0132\031.voldemor" +
- "t.PartitionTuple\"\357\001\n\034FetchPartitionEntri" +
+ "t.PartitionTuple\"\204\002\n\034FetchPartitionEntri" +
"esRequest\0227\n\024replica_to_partition\030\001 \003(\0132" +
"\031.voldemort.PartitionTuple\022\r\n\005store\030\002 \002(" +
"\t\022*\n\006filter\030\003 \001(\0132\032.voldemort.VoldemortF",
"ilter\022\024\n\014fetch_values\030\004 \001(\010\022\024\n\014skip_reco" +
"rds\030\005 \001(\003\022\027\n\017initial_cluster\030\006 \001(\t\022\026\n\016fe" +
- "tch_orphaned\030\007 \001(\010\"\201\001\n\035FetchPartitionEnt" +
- "riesResponse\0222\n\017partition_entry\030\001 \001(\0132\031." +
- "voldemort.PartitionEntry\022\013\n\003key\030\002 \001(\014\022\037\n" +
- "\005error\030\003 \001(\0132\020.voldemort.Error\"\254\001\n\035Delet" +
- "ePartitionEntriesRequest\022\r\n\005store\030\001 \002(\t\022" +
- "7\n\024replica_to_partition\030\002 \003(\0132\031.voldemor" +
- "t.PartitionTuple\022*\n\006filter\030\003 \001(\0132\032.volde" +
- "mort.VoldemortFilter\022\027\n\017initial_cluster\030",
- "\004 \001(\t\"P\n\036DeletePartitionEntriesResponse\022" +
- "\r\n\005count\030\001 \001(\003\022\037\n\005error\030\002 \001(\0132\020.voldemor" +
- "t.Error\"\317\001\n\035InitiateFetchAndUpdateReques" +
- "t\022\017\n\007node_id\030\001 \002(\005\022\r\n\005store\030\002 \002(\t\022*\n\006fil" +
- "ter\030\003 \001(\0132\032.voldemort.VoldemortFilter\0227\n" +
- "\024replica_to_partition\030\004 \003(\0132\031.voldemort." +
- "PartitionTuple\022\027\n\017initial_cluster\030\005 \001(\t\022" +
- "\020\n\010optimize\030\006 \001(\010\"1\n\033AsyncOperationStatu" +
- "sRequest\022\022\n\nrequest_id\030\001 \002(\005\"/\n\031AsyncOpe" +
- "rationStopRequest\022\022\n\nrequest_id\030\001 \002(\005\"=\n",
- "\032AsyncOperationStopResponse\022\037\n\005error\030\001 \001" +
- "(\0132\020.voldemort.Error\"2\n\031AsyncOperationLi" +
- "stRequest\022\025\n\rshow_complete\030\002 \002(\010\"R\n\032Asyn" +
- "cOperationListResponse\022\023\n\013request_ids\030\001 " +
- "\003(\005\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error\":\n\016" +
- "PartitionTuple\022\024\n\014replica_type\030\001 \002(\005\022\022\n\n" +
- "partitions\030\002 \003(\005\"e\n\026PerStorePartitionTup" +
- "le\022\022\n\nstore_name\030\001 \002(\t\0227\n\024replica_to_par" +
- "tition\030\002 \003(\0132\031.voldemort.PartitionTuple\"" +
- "\370\001\n\031RebalancePartitionInfoMap\022\022\n\nstealer",
- "_id\030\001 \002(\005\022\020\n\010donor_id\030\002 \002(\005\022\017\n\007attempt\030\003" +
- " \002(\005\022C\n\030replica_to_add_partition\030\004 \003(\0132!" +
- ".voldemort.PerStorePartitionTuple\022F\n\033rep" +
- "lica_to_delete_partition\030\005 \003(\0132!.voldemo" +
- "rt.PerStorePartitionTuple\022\027\n\017initial_clu" +
- "ster\030\006 \002(\t\"f\n\034InitiateRebalanceNodeReque" +
- "st\022F\n\030rebalance_partition_info\030\001 \002(\0132$.v" +
- "oldemort.RebalancePartitionInfoMap\"m\n#In" +
- "itiateRebalanceNodeOnDonorRequest\022F\n\030reb" +
- "alance_partition_info\030\001 \003(\0132$.voldemort.",
- "RebalancePartitionInfoMap\"\212\001\n\034AsyncOpera" +
- "tionStatusResponse\022\022\n\nrequest_id\030\001 \001(\005\022\023" +
- "\n\013description\030\002 \001(\t\022\016\n\006status\030\003 \001(\t\022\020\n\010c" +
- "omplete\030\004 \001(\010\022\037\n\005error\030\005 \001(\0132\020.voldemort" +
- ".Error\"\'\n\026TruncateEntriesRequest\022\r\n\005stor" +
- "e\030\001 \002(\t\":\n\027TruncateEntriesResponse\022\037\n\005er" +
- "ror\030\001 \001(\0132\020.voldemort.Error\"*\n\017AddStoreR" +
- "equest\022\027\n\017storeDefinition\030\001 \002(\t\"3\n\020AddSt" +
- "oreResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.E" +
- "rror\"\'\n\022DeleteStoreRequest\022\021\n\tstoreName\030",
- "\001 \002(\t\"6\n\023DeleteStoreResponse\022\037\n\005error\030\001 " +
- "\001(\0132\020.voldemort.Error\"P\n\021FetchStoreReque" +
- "st\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030\002 \002(" +
- "\t\022\024\n\014push_version\030\003 \001(\003\"9\n\020SwapStoreRequ" +
- "est\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030\002 \002" +
- "(\t\"P\n\021SwapStoreResponse\022\037\n\005error\030\001 \001(\0132\020" +
- ".voldemort.Error\022\032\n\022previous_store_dir\030\002" +
- " \001(\t\"@\n\024RollbackStoreRequest\022\022\n\nstore_na" +
- "me\030\001 \002(\t\022\024\n\014push_version\030\002 \002(\003\"8\n\025Rollba" +
- "ckStoreResponse\022\037\n\005error\030\001 \001(\0132\020.voldemo",
- "rt.Error\"&\n\020RepairJobRequest\022\022\n\nstore_na" +
- "me\030\001 \001(\t\"4\n\021RepairJobResponse\022\037\n\005error\030\001" +
- " \001(\0132\020.voldemort.Error\"=\n\024ROStoreVersion" +
- "DirMap\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030" +
- "\002 \002(\t\"/\n\031GetROMaxVersionDirRequest\022\022\n\nst" +
- "ore_name\030\001 \003(\t\"y\n\032GetROMaxVersionDirResp" +
- "onse\022:\n\021ro_store_versions\030\001 \003(\0132\037.voldem" +
- "ort.ROStoreVersionDirMap\022\037\n\005error\030\002 \001(\0132" +
- "\020.voldemort.Error\"3\n\035GetROCurrentVersion" +
- "DirRequest\022\022\n\nstore_name\030\001 \003(\t\"}\n\036GetROC",
- "urrentVersionDirResponse\022:\n\021ro_store_ver" +
- "sions\030\001 \003(\0132\037.voldemort.ROStoreVersionDi" +
- "rMap\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error\"/\n" +
- "\031GetROStorageFormatRequest\022\022\n\nstore_name" +
- "\030\001 \003(\t\"y\n\032GetROStorageFormatResponse\022:\n\021" +
- "ro_store_versions\030\001 \003(\0132\037.voldemort.ROSt" +
- "oreVersionDirMap\022\037\n\005error\030\002 \001(\0132\020.voldem" +
- "ort.Error\"@\n\027FailedFetchStoreRequest\022\022\n\n" +
- "store_name\030\001 \002(\t\022\021\n\tstore_dir\030\002 \002(\t\";\n\030F" +
- "ailedFetchStoreResponse\022\037\n\005error\030\001 \001(\0132\020",
- ".voldemort.Error\"\346\001\n\033RebalanceStateChang" +
- "eRequest\022K\n\035rebalance_partition_info_lis" +
- "t\030\001 \003(\0132$.voldemort.RebalancePartitionIn" +
- "foMap\022\026\n\016cluster_string\030\002 \002(\t\022\017\n\007swap_ro" +
- "\030\003 \002(\010\022\037\n\027change_cluster_metadata\030\004 \002(\010\022" +
- "\036\n\026change_rebalance_state\030\005 \002(\010\022\020\n\010rollb" +
- "ack\030\006 \002(\010\"?\n\034RebalanceStateChangeRespons" +
- "e\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"G\n De" +
- "leteStoreRebalanceStateRequest\022\022\n\nstore_" +
- "name\030\001 \002(\t\022\017\n\007node_id\030\002 \002(\005\"D\n!DeleteSto",
- "reRebalanceStateResponse\022\037\n\005error\030\001 \001(\0132" +
- "\020.voldemort.Error\"h\n\023NativeBackupRequest" +
- "\022\022\n\nstore_name\030\001 \002(\t\022\022\n\nbackup_dir\030\002 \002(\t" +
- "\022\024\n\014verify_files\030\003 \002(\010\022\023\n\013incremental\030\004 " +
- "\002(\010\">\n\024ReserveMemoryRequest\022\022\n\nstore_nam" +
- "e\030\001 \002(\t\022\022\n\nsize_in_mb\030\002 \002(\003\"8\n\025ReserveMe" +
- "moryResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort." +
- "Error\"\360\016\n\025VoldemortAdminRequest\022)\n\004type\030" +
- "\001 \002(\0162\033.voldemort.AdminRequestType\0223\n\014ge" +
- "t_metadata\030\002 \001(\0132\035.voldemort.GetMetadata",
- "Request\0229\n\017update_metadata\030\003 \001(\0132 .volde" +
- "mort.UpdateMetadataRequest\022J\n\030update_par" +
- "tition_entries\030\004 \001(\0132(.voldemort.UpdateP" +
- "artitionEntriesRequest\022H\n\027fetch_partitio" +
- "n_entries\030\005 \001(\0132\'.voldemort.FetchPartiti" +
- "onEntriesRequest\022J\n\030delete_partition_ent" +
- "ries\030\006 \001(\0132(.voldemort.DeletePartitionEn" +
- "triesRequest\022K\n\031initiate_fetch_and_updat" +
- "e\030\007 \001(\0132(.voldemort.InitiateFetchAndUpda" +
- "teRequest\022F\n\026async_operation_status\030\010 \001(",
- "\0132&.voldemort.AsyncOperationStatusReques" +
- "t\022H\n\027initiate_rebalance_node\030\t \001(\0132\'.vol" +
- "demort.InitiateRebalanceNodeRequest\022B\n\024a" +
- "sync_operation_stop\030\n \001(\0132$.voldemort.As" +
- "yncOperationStopRequest\022B\n\024async_operati" +
- "on_list\030\013 \001(\0132$.voldemort.AsyncOperation" +
- "ListRequest\022;\n\020truncate_entries\030\014 \001(\0132!." +
- "voldemort.TruncateEntriesRequest\022-\n\tadd_" +
- "store\030\r \001(\0132\032.voldemort.AddStoreRequest\022" +
- "3\n\014delete_store\030\016 \001(\0132\035.voldemort.Delete",
- "StoreRequest\0221\n\013fetch_store\030\017 \001(\0132\034.vold" +
- "emort.FetchStoreRequest\022/\n\nswap_store\030\020 " +
- "\001(\0132\033.voldemort.SwapStoreRequest\0227\n\016roll" +
- "back_store\030\021 \001(\0132\037.voldemort.RollbackSto" +
- "reRequest\022D\n\026get_ro_max_version_dir\030\022 \001(" +
- "\0132$.voldemort.GetROMaxVersionDirRequest\022" +
- "L\n\032get_ro_current_version_dir\030\023 \001(\0132(.vo" +
- "ldemort.GetROCurrentVersionDirRequest\022D\n" +
- "\025fetch_partition_files\030\024 \001(\0132%.voldemort" +
- ".FetchPartitionFilesRequest\022@\n\023update_sl",
- "op_entries\030\026 \001(\0132#.voldemort.UpdateSlopE" +
- "ntriesRequest\022>\n\022failed_fetch_store\030\030 \001(" +
- "\0132\".voldemort.FailedFetchStoreRequest\022C\n" +
- "\025get_ro_storage_format\030\031 \001(\0132$.voldemort" +
- ".GetROStorageFormatRequest\022F\n\026rebalance_" +
- "state_change\030\032 \001(\0132&.voldemort.Rebalance" +
- "StateChangeRequest\022/\n\nrepair_job\030\033 \001(\0132\033" +
- ".voldemort.RepairJobRequest\022X\n initiate_" +
- "rebalance_node_on_donor\030\034 \001(\0132..voldemor" +
- "t.InitiateRebalanceNodeOnDonorRequest\022Q\n",
- "\034delete_store_rebalance_state\030\035 \001(\0132+.vo" +
- "ldemort.DeleteStoreRebalanceStateRequest" +
- "\0225\n\rnative_backup\030\036 \001(\0132\036.voldemort.Nati" +
- "veBackupRequest\0227\n\016reserve_memory\030\037 \001(\0132" +
- "\037.voldemort.ReserveMemoryRequest*\310\005\n\020Adm" +
- "inRequestType\022\020\n\014GET_METADATA\020\000\022\023\n\017UPDAT" +
- "E_METADATA\020\001\022\034\n\030UPDATE_PARTITION_ENTRIES" +
- "\020\002\022\033\n\027FETCH_PARTITION_ENTRIES\020\003\022\034\n\030DELET" +
- "E_PARTITION_ENTRIES\020\004\022\035\n\031INITIATE_FETCH_" +
- "AND_UPDATE\020\005\022\032\n\026ASYNC_OPERATION_STATUS\020\006",
- "\022\033\n\027INITIATE_REBALANCE_NODE\020\007\022\030\n\024ASYNC_O" +
- "PERATION_STOP\020\010\022\030\n\024ASYNC_OPERATION_LIST\020" +
- "\t\022\024\n\020TRUNCATE_ENTRIES\020\n\022\r\n\tADD_STORE\020\013\022\020" +
- "\n\014DELETE_STORE\020\014\022\017\n\013FETCH_STORE\020\r\022\016\n\nSWA" +
- "P_STORE\020\016\022\022\n\016ROLLBACK_STORE\020\017\022\032\n\026GET_RO_" +
- "MAX_VERSION_DIR\020\020\022\036\n\032GET_RO_CURRENT_VERS" +
- "ION_DIR\020\021\022\031\n\025FETCH_PARTITION_FILES\020\022\022\027\n\023" +
- "UPDATE_SLOP_ENTRIES\020\024\022\026\n\022FAILED_FETCH_ST" +
- "ORE\020\026\022\031\n\025GET_RO_STORAGE_FORMAT\020\027\022\032\n\026REBA" +
- "LANCE_STATE_CHANGE\020\030\022\016\n\nREPAIR_JOB\020\031\022$\n ",
- "INITIATE_REBALANCE_NODE_ON_DONOR\020\032\022 \n\034DE" +
- "LETE_STORE_REBALANCE_STATE\020\033\022\021\n\rNATIVE_B" +
- "ACKUP\020\034\022\022\n\016RESERVE_MEMORY\020\035B-\n\034voldemort" +
- ".client.protocol.pbB\013VAdminProtoH\001"
+ "tch_orphaned\030\007 \001(\010\022\023\n\013max_records\030\010 \001(\003\"" +
+ "\201\001\n\035FetchPartitionEntriesResponse\0222\n\017par" +
+ "tition_entry\030\001 \001(\0132\031.voldemort.Partition" +
+ "Entry\022\013\n\003key\030\002 \001(\014\022\037\n\005error\030\003 \001(\0132\020.vold" +
+ "emort.Error\"\254\001\n\035DeletePartitionEntriesRe" +
+ "quest\022\r\n\005store\030\001 \002(\t\0227\n\024replica_to_parti" +
+ "tion\030\002 \003(\0132\031.voldemort.PartitionTuple\022*\n" +
+ "\006filter\030\003 \001(\0132\032.voldemort.VoldemortFilte",
+ "r\022\027\n\017initial_cluster\030\004 \001(\t\"P\n\036DeletePart" +
+ "itionEntriesResponse\022\r\n\005count\030\001 \001(\003\022\037\n\005e" +
+ "rror\030\002 \001(\0132\020.voldemort.Error\"\317\001\n\035Initiat" +
+ "eFetchAndUpdateRequest\022\017\n\007node_id\030\001 \002(\005\022" +
+ "\r\n\005store\030\002 \002(\t\022*\n\006filter\030\003 \001(\0132\032.voldemo" +
+ "rt.VoldemortFilter\0227\n\024replica_to_partiti" +
+ "on\030\004 \003(\0132\031.voldemort.PartitionTuple\022\027\n\017i" +
+ "nitial_cluster\030\005 \001(\t\022\020\n\010optimize\030\006 \001(\010\"1" +
+ "\n\033AsyncOperationStatusRequest\022\022\n\nrequest" +
+ "_id\030\001 \002(\005\"/\n\031AsyncOperationStopRequest\022\022",
+ "\n\nrequest_id\030\001 \002(\005\"=\n\032AsyncOperationStop" +
+ "Response\022\037\n\005error\030\001 \001(\0132\020.voldemort.Erro" +
+ "r\"2\n\031AsyncOperationListRequest\022\025\n\rshow_c" +
+ "omplete\030\002 \002(\010\"R\n\032AsyncOperationListRespo" +
+ "nse\022\023\n\013request_ids\030\001 \003(\005\022\037\n\005error\030\002 \001(\0132" +
+ "\020.voldemort.Error\":\n\016PartitionTuple\022\024\n\014r" +
+ "eplica_type\030\001 \002(\005\022\022\n\npartitions\030\002 \003(\005\"e\n" +
+ "\026PerStorePartitionTuple\022\022\n\nstore_name\030\001 " +
+ "\002(\t\0227\n\024replica_to_partition\030\002 \003(\0132\031.vold" +
+ "emort.PartitionTuple\"\370\001\n\031RebalancePartit",
+ "ionInfoMap\022\022\n\nstealer_id\030\001 \002(\005\022\020\n\010donor_" +
+ "id\030\002 \002(\005\022\017\n\007attempt\030\003 \002(\005\022C\n\030replica_to_" +
+ "add_partition\030\004 \003(\0132!.voldemort.PerStore" +
+ "PartitionTuple\022F\n\033replica_to_delete_part" +
+ "ition\030\005 \003(\0132!.voldemort.PerStorePartitio" +
+ "nTuple\022\027\n\017initial_cluster\030\006 \002(\t\"f\n\034Initi" +
+ "ateRebalanceNodeRequest\022F\n\030rebalance_par" +
+ "tition_info\030\001 \002(\0132$.voldemort.RebalanceP" +
+ "artitionInfoMap\"m\n#InitiateRebalanceNode" +
+ "OnDonorRequest\022F\n\030rebalance_partition_in",
+ "fo\030\001 \003(\0132$.voldemort.RebalancePartitionI" +
+ "nfoMap\"\212\001\n\034AsyncOperationStatusResponse\022" +
+ "\022\n\nrequest_id\030\001 \001(\005\022\023\n\013description\030\002 \001(\t" +
+ "\022\016\n\006status\030\003 \001(\t\022\020\n\010complete\030\004 \001(\010\022\037\n\005er" +
+ "ror\030\005 \001(\0132\020.voldemort.Error\"\'\n\026TruncateE" +
+ "ntriesRequest\022\r\n\005store\030\001 \002(\t\":\n\027Truncate" +
+ "EntriesResponse\022\037\n\005error\030\001 \001(\0132\020.voldemo" +
+ "rt.Error\"*\n\017AddStoreRequest\022\027\n\017storeDefi" +
+ "nition\030\001 \002(\t\"3\n\020AddStoreResponse\022\037\n\005erro" +
+ "r\030\001 \001(\0132\020.voldemort.Error\"\'\n\022DeleteStore",
+ "Request\022\021\n\tstoreName\030\001 \002(\t\"6\n\023DeleteStor" +
+ "eResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Err" +
+ "or\"P\n\021FetchStoreRequest\022\022\n\nstore_name\030\001 " +
+ "\002(\t\022\021\n\tstore_dir\030\002 \002(\t\022\024\n\014push_version\030\003" +
+ " \001(\003\"9\n\020SwapStoreRequest\022\022\n\nstore_name\030\001" +
+ " \002(\t\022\021\n\tstore_dir\030\002 \002(\t\"P\n\021SwapStoreResp" +
+ "onse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\022\032\n" +
+ "\022previous_store_dir\030\002 \001(\t\"@\n\024RollbackSto" +
+ "reRequest\022\022\n\nstore_name\030\001 \002(\t\022\024\n\014push_ve" +
+ "rsion\030\002 \002(\003\"8\n\025RollbackStoreResponse\022\037\n\005",
+ "error\030\001 \001(\0132\020.voldemort.Error\"&\n\020RepairJ" +
+ "obRequest\022\022\n\nstore_name\030\001 \001(\t\"4\n\021RepairJ" +
+ "obResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Er" +
+ "ror\"=\n\024ROStoreVersionDirMap\022\022\n\nstore_nam" +
+ "e\030\001 \002(\t\022\021\n\tstore_dir\030\002 \002(\t\"/\n\031GetROMaxVe" +
+ "rsionDirRequest\022\022\n\nstore_name\030\001 \003(\t\"y\n\032G" +
+ "etROMaxVersionDirResponse\022:\n\021ro_store_ve" +
+ "rsions\030\001 \003(\0132\037.voldemort.ROStoreVersionD" +
+ "irMap\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error\"3" +
+ "\n\035GetROCurrentVersionDirRequest\022\022\n\nstore",
+ "_name\030\001 \003(\t\"}\n\036GetROCurrentVersionDirRes" +
+ "ponse\022:\n\021ro_store_versions\030\001 \003(\0132\037.volde" +
+ "mort.ROStoreVersionDirMap\022\037\n\005error\030\002 \001(\013" +
+ "2\020.voldemort.Error\"/\n\031GetROStorageFormat" +
+ "Request\022\022\n\nstore_name\030\001 \003(\t\"y\n\032GetROStor" +
+ "ageFormatResponse\022:\n\021ro_store_versions\030\001" +
+ " \003(\0132\037.voldemort.ROStoreVersionDirMap\022\037\n" +
+ "\005error\030\002 \001(\0132\020.voldemort.Error\"@\n\027Failed" +
+ "FetchStoreRequest\022\022\n\nstore_name\030\001 \002(\t\022\021\n" +
+ "\tstore_dir\030\002 \002(\t\";\n\030FailedFetchStoreResp",
+ "onse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"\346\001" +
+ "\n\033RebalanceStateChangeRequest\022K\n\035rebalan" +
+ "ce_partition_info_list\030\001 \003(\0132$.voldemort" +
+ ".RebalancePartitionInfoMap\022\026\n\016cluster_st" +
+ "ring\030\002 \002(\t\022\017\n\007swap_ro\030\003 \002(\010\022\037\n\027change_cl" +
+ "uster_metadata\030\004 \002(\010\022\036\n\026change_rebalance" +
+ "_state\030\005 \002(\010\022\020\n\010rollback\030\006 \002(\010\"?\n\034Rebala" +
+ "nceStateChangeResponse\022\037\n\005error\030\001 \001(\0132\020." +
+ "voldemort.Error\"G\n DeleteStoreRebalanceS" +
+ "tateRequest\022\022\n\nstore_name\030\001 \002(\t\022\017\n\007node_",
+ "id\030\002 \002(\005\"D\n!DeleteStoreRebalanceStateRes" +
+ "ponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"h" +
+ "\n\023NativeBackupRequest\022\022\n\nstore_name\030\001 \002(" +
+ "\t\022\022\n\nbackup_dir\030\002 \002(\t\022\024\n\014verify_files\030\003 " +
+ "\002(\010\022\023\n\013incremental\030\004 \002(\010\">\n\024ReserveMemor" +
+ "yRequest\022\022\n\nstore_name\030\001 \002(\t\022\022\n\nsize_in_" +
+ "mb\030\002 \002(\003\"8\n\025ReserveMemoryResponse\022\037\n\005err" +
+ "or\030\001 \001(\0132\020.voldemort.Error\"\360\016\n\025Voldemort" +
+ "AdminRequest\022)\n\004type\030\001 \002(\0162\033.voldemort.A" +
+ "dminRequestType\0223\n\014get_metadata\030\002 \001(\0132\035.",
+ "voldemort.GetMetadataRequest\0229\n\017update_m" +
+ "etadata\030\003 \001(\0132 .voldemort.UpdateMetadata" +
+ "Request\022J\n\030update_partition_entries\030\004 \001(" +
+ "\0132(.voldemort.UpdatePartitionEntriesRequ" +
+ "est\022H\n\027fetch_partition_entries\030\005 \001(\0132\'.v" +
+ "oldemort.FetchPartitionEntriesRequest\022J\n" +
+ "\030delete_partition_entries\030\006 \001(\0132(.voldem" +
+ "ort.DeletePartitionEntriesRequest\022K\n\031ini" +
+ "tiate_fetch_and_update\030\007 \001(\0132(.voldemort" +
+ ".InitiateFetchAndUpdateRequest\022F\n\026async_",
+ "operation_status\030\010 \001(\0132&.voldemort.Async" +
+ "OperationStatusRequest\022H\n\027initiate_rebal" +
+ "ance_node\030\t \001(\0132\'.voldemort.InitiateReba" +
+ "lanceNodeRequest\022B\n\024async_operation_stop" +
+ "\030\n \001(\0132$.voldemort.AsyncOperationStopReq" +
+ "uest\022B\n\024async_operation_list\030\013 \001(\0132$.vol" +
+ "demort.AsyncOperationListRequest\022;\n\020trun" +
+ "cate_entries\030\014 \001(\0132!.voldemort.TruncateE" +
+ "ntriesRequest\022-\n\tadd_store\030\r \001(\0132\032.volde" +
+ "mort.AddStoreRequest\0223\n\014delete_store\030\016 \001",
+ "(\0132\035.voldemort.DeleteStoreRequest\0221\n\013fet" +
+ "ch_store\030\017 \001(\0132\034.voldemort.FetchStoreReq" +
+ "uest\022/\n\nswap_store\030\020 \001(\0132\033.voldemort.Swa" +
+ "pStoreRequest\0227\n\016rollback_store\030\021 \001(\0132\037." +
+ "voldemort.RollbackStoreRequest\022D\n\026get_ro" +
+ "_max_version_dir\030\022 \001(\0132$.voldemort.GetRO" +
+ "MaxVersionDirRequest\022L\n\032get_ro_current_v" +
+ "ersion_dir\030\023 \001(\0132(.voldemort.GetROCurren" +
+ "tVersionDirRequest\022D\n\025fetch_partition_fi" +
+ "les\030\024 \001(\0132%.voldemort.FetchPartitionFile",
+ "sRequest\022@\n\023update_slop_entries\030\026 \001(\0132#." +
+ "voldemort.UpdateSlopEntriesRequest\022>\n\022fa" +
+ "iled_fetch_store\030\030 \001(\0132\".voldemort.Faile" +
+ "dFetchStoreRequest\022C\n\025get_ro_storage_for" +
+ "mat\030\031 \001(\0132$.voldemort.GetROStorageFormat" +
+ "Request\022F\n\026rebalance_state_change\030\032 \001(\0132" +
+ "&.voldemort.RebalanceStateChangeRequest\022" +
+ "/\n\nrepair_job\030\033 \001(\0132\033.voldemort.RepairJo" +
+ "bRequest\022X\n initiate_rebalance_node_on_d" +
+ "onor\030\034 \001(\0132..voldemort.InitiateRebalance",
+ "NodeOnDonorRequest\022Q\n\034delete_store_rebal" +
+ "ance_state\030\035 \001(\0132+.voldemort.DeleteStore" +
+ "RebalanceStateRequest\0225\n\rnative_backup\030\036" +
+ " \001(\0132\036.voldemort.NativeBackupRequest\0227\n\016" +
+ "reserve_memory\030\037 \001(\0132\037.voldemort.Reserve" +
+ "MemoryRequest*\310\005\n\020AdminRequestType\022\020\n\014GE" +
+ "T_METADATA\020\000\022\023\n\017UPDATE_METADATA\020\001\022\034\n\030UPD" +
+ "ATE_PARTITION_ENTRIES\020\002\022\033\n\027FETCH_PARTITI" +
+ "ON_ENTRIES\020\003\022\034\n\030DELETE_PARTITION_ENTRIES" +
+ "\020\004\022\035\n\031INITIATE_FETCH_AND_UPDATE\020\005\022\032\n\026ASY",
+ "NC_OPERATION_STATUS\020\006\022\033\n\027INITIATE_REBALA" +
+ "NCE_NODE\020\007\022\030\n\024ASYNC_OPERATION_STOP\020\010\022\030\n\024" +
+ "ASYNC_OPERATION_LIST\020\t\022\024\n\020TRUNCATE_ENTRI" +
+ "ES\020\n\022\r\n\tADD_STORE\020\013\022\020\n\014DELETE_STORE\020\014\022\017\n" +
+ "\013FETCH_STORE\020\r\022\016\n\nSWAP_STORE\020\016\022\022\n\016ROLLBA" +
+ "CK_STORE\020\017\022\032\n\026GET_RO_MAX_VERSION_DIR\020\020\022\036" +
+ "\n\032GET_RO_CURRENT_VERSION_DIR\020\021\022\031\n\025FETCH_" +
+ "PARTITION_FILES\020\022\022\027\n\023UPDATE_SLOP_ENTRIES" +
+ "\020\024\022\026\n\022FAILED_FETCH_STORE\020\026\022\031\n\025GET_RO_STO" +
+ "RAGE_FORMAT\020\027\022\032\n\026REBALANCE_STATE_CHANGE\020",
+ "\030\022\016\n\nREPAIR_JOB\020\031\022$\n INITIATE_REBALANCE_" +
+ "NODE_ON_DONOR\020\032\022 \n\034DELETE_STORE_REBALANC" +
+ "E_STATE\020\033\022\021\n\rNATIVE_BACKUP\020\034\022\022\n\016RESERVE_" +
+ "MEMORY\020\035B-\n\034voldemort.client.protocol.pb" +
+ "B\013VAdminProtoH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -23399,7 +23439,7 @@ public Builder clearReserveMemory() {
internal_static_voldemort_FetchPartitionEntriesRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_voldemort_FetchPartitionEntriesRequest_descriptor,
- new java.lang.String[] { "ReplicaToPartition", "Store", "Filter", "FetchValues", "SkipRecords", "InitialCluster", "FetchOrphaned", },
+ new java.lang.String[] { "ReplicaToPartition", "Store", "Filter", "FetchValues", "SkipRecords", "InitialCluster", "FetchOrphaned", "MaxRecords", },
voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest.class,
voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest.Builder.class);
internal_static_voldemort_FetchPartitionEntriesResponse_descriptor =
View
13 src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2008-2009 LinkedIn, Inc
+ * Copyright 2008-2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -974,6 +974,7 @@ public void operate() {
filter,
false,
initialCluster,
+ 0,
0);
long numTuples = 0;
long startTime = System.currentTimeMillis();
@@ -1089,11 +1090,11 @@ public void operate() {
Versioned<byte[]> value = entry.getSecond();
throttler.maybeThrottle(key.length() + valueSize(value));
if(StoreInstance.checkKeyBelongsToPartition(metadataStore.getNodeId(),
- key.get(),
- replicaToPartitionList,
- request.hasInitialCluster() ? new ClusterMapper().readCluster(new StringReader(request.getInitialCluster()))
- : metadataStore.getCluster(),
- metadataStore.getStoreDef(storeName))
+ key.get(),
+ replicaToPartitionList,
+ request.hasInitialCluster() ? new ClusterMapper().readCluster(new StringReader(request.getInitialCluster()))
+ : metadataStore.getCluster(),
+ metadataStore.getStoreDef(storeName))
&& filter.accept(key, value)) {
if(storageEngine.delete(key, value.getVersion())) {
deleteSuccess++;
View
26 src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2013 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package voldemort.server.protocol.admin;
import java.io.DataInputStream;
@@ -68,10 +84,10 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
if(!fetchOrphaned) {
// normal code path
if(StoreInstance.checkKeyBelongsToPartition(nodeId,
- key.get(),
- replicaToPartitionList,
- initialCluster,
- storeDef) && counter % skipRecords == 0) {
+ key.get(),
+ replicaToPartitionList,
+ initialCluster,
+ storeDef) && counter % skipRecords == 0) {
entryAccepted = true;
}
} else {
@@ -126,7 +142,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
+ " s");
}
- if(keyIterator.hasNext())
+ if(keyIterator.hasNext() && counter < maxRecords * skipRecords)
return StreamRequestHandlerState.WRITING;
else {
return StreamRequestHandlerState.COMPLETE;
View
26 src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2013 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package voldemort.server.protocol.admin;
import java.io.DataInputStream;
@@ -59,10 +75,10 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
if(!fetchOrphaned) {
// normal code path
if(StoreInstance.checkKeyBelongsToPartition(nodeId,
- key.get(),
- replicaToPartitionList,
- initialCluster,
- storeDef)
+ key.get(),
+ replicaToPartitionList,
+ initialCluster,
+ storeDef)
&& filter.accept(key, null)
&& counter % skipRecords == 0) {
keyAccepted = true;
@@ -100,7 +116,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
+ " s");
}
- if(keyIterator.hasNext())
+ if(keyIterator.hasNext() && counter < maxRecords * skipRecords)
return StreamRequestHandlerState.WRITING;
else {
return StreamRequestHandlerState.COMPLETE;
View
19 src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2008-2012 LinkedIn, Inc
+ * Copyright 2008-2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -92,8 +92,9 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// process the next partition
if(entriesPartitionIterator == null) {
- // we are finally done
- if(currentIndex == partitionList.size()) {
+ if(currentIndex == partitionList.size() || counter >= maxRecords * skipRecords) {
+ logger.info("Done fetching store " + storageEngine.getName() + " : " + counter
+ + " records processed.");
return StreamRequestHandlerState.COMPLETE;
}
@@ -107,10 +108,10 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// requested replicatype
if(!fetchedPartitions.contains(currentPartition)
&& StoreInstance.checkPartitionBelongsToNode(currentPartition,
- currentReplicaType,
- nodeId,
- initialCluster,
- storeDef)) {
+ currentReplicaType,
+ nodeId,
+ initialCluster,
+ storeDef)) {
fetchedPartitions.add(currentPartition);
found = true;
logger.info("Fetching [partition: " + currentPartition + ", replica type:"
@@ -124,11 +125,11 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// do a check before reading in case partition has 0 elements
if(entriesPartitionIterator.hasNext()) {
counter++;
+ Pair<ByteArray, Versioned<byte[]>> entry = entriesPartitionIterator.next();
// honor skipRecords
if(counter % skipRecords == 0) {
// do the filtering
- Pair<ByteArray, Versioned<byte[]>> entry = entriesPartitionIterator.next();
if(streamStats != null) {
streamStats.reportStorageTime(operation, System.nanoTime() - startNs);
streamStats.reportStreamingScan(operation);
@@ -173,7 +174,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
}
// reset the iterator if done with this partition
- if(!entriesPartitionIterator.hasNext()) {
+ if(!entriesPartitionIterator.hasNext() || counter >= maxRecords * skipRecords) {
entriesPartitionIterator.close();
entriesPartitionIterator = null;
}
View
22 src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2008-2012 LinkedIn, Inc
+ * Copyright 2008-2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -90,8 +90,9 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// process the next partition
if(keysPartitionIterator == null) {
- if(currentIndex == partitionList.size()) {
- // we are finally done
+ if(currentIndex == partitionList.size() || counter >= maxRecords * skipRecords) {
+ logger.info("Done fetching store " + storageEngine.getName() + " : " + counter
+ + " records processed.");
return StreamRequestHandlerState.COMPLETE;
}
@@ -105,10 +106,10 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// requested replicatype
if(!fetchedPartitions.contains(currentPartition)
&& StoreInstance.checkPartitionBelongsToNode(currentPartition,
- currentReplicaType,
- nodeId,
- initialCluster,
- storeDef)) {
+ currentReplicaType,
+ nodeId,
+ initialCluster,
+ storeDef)) {
fetchedPartitions.add(currentPartition);
found = true;
logger.info("Fetching [partition: " + currentPartition + ", replica type:"
@@ -122,11 +123,11 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// do a check before reading in case partition has 0 elements
if(keysPartitionIterator.hasNext()) {
counter++;
+ ByteArray key = keysPartitionIterator.next();
// honor skipRecords
if(counter % skipRecords == 0) {
// do the filtering
- ByteArray key = keysPartitionIterator.next();
if(streamStats != null) {
streamStats.reportStorageTime(operation, System.nanoTime() - startNs);
streamStats.reportStreamingScan(operation);
@@ -163,8 +164,9 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
}
}
- // reset the iterator if done with this partition
- if(!keysPartitionIterator.hasNext()) {
+ // reset the iterator if done with this partition or fetched enough
+ // records
+ if(!keysPartitionIterator.hasNext() || counter >= maxRecords * skipRecords) {
keysPartitionIterator.close();
keysPartitionIterator = null;
}
View
22 src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2013 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package voldemort.server.protocol.admin;
import java.io.DataOutputStream;
@@ -54,6 +70,8 @@
protected long skipRecords;
+ protected long maxRecords;
+
protected int fetched;
protected final long startTime;
@@ -108,6 +126,10 @@ protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest req
if(request.hasSkipRecords() && request.getSkipRecords() >= 0) {
this.skipRecords = request.getSkipRecords() + 1;
}
+ this.maxRecords = Long.MAX_VALUE;
+ if(request.hasMaxRecords() && request.getMaxRecords() > 0) {
+ this.maxRecords = request.getMaxRecords();
+ }
this.fetchOrphaned = request.hasFetchOrphaned() && request.getFetchOrphaned();
}
View
35 src/java/voldemort/utils/Entropy.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2013 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package voldemort.utils;
import java.io.File;
@@ -222,13 +238,18 @@ public void generateEntropy(Cluster cluster,
int numKeysPerNode = (int) Math.floor(numKeys
/ cluster.getNumberOfNodes());
+ int numKeysStored = 0;
for(Node node: cluster.getNodes()) {
+ System.out.println("Fetching " + numKeysPerNode
+ + " keys from node " + node.getHost());
keys = adminClient.bulkFetchOps.fetchKeys(node.getId(),
storeDef.getName(),
cluster.getNodeById(node.getId())
.getPartitionIds(),
null,
- false);
+ false,
+ 0,
+ numKeysPerNode);
for(long keyId = 0; keyId < numKeysPerNode && keys.hasNext(); keyId++) {
ByteArray key = keys.next();
// entropy returns distinct keys from each
@@ -239,9 +260,11 @@ public void generateEntropy(Cluster cluster,
.contains(node.getId())) {
writer.write(key.length());
writer.write(key.get());
+ numKeysStored++;
}
}
}
+ System.out.println("Fetched a total of " + numKeysStored + " keys.");
} else {
List<Integer> partitions = cluster.getNodeById(nodeId)
.getPartitionIds();
@@ -257,7 +280,9 @@ public void generateEntropy(Cluster cluster,
storeDef.getName(),
partitions,
null,
- false);
+ false,
+ 0,
+ numKeysPerPartition);
while(keys.hasNext() && numKeysStored < numKeys) {
ByteArray key = keys.next();
// entropy returns distinct keys from each
@@ -321,6 +346,7 @@ public void generateEntropy(Cluster cluster,
long deletedKeys = 0L;
long foundKeys = 0L;
long totalKeys = 0L;
+ long keysRead = 0L;
try {
reader = new FileInputStream(storesKeyFile);
@@ -328,12 +354,14 @@ public void generateEntropy(Cluster cluster,
int size = reader.read();
if(size <= 0) {
+ System.out.println("End of file reached.");
break;
}
// Read the key
byte[] key = new byte[size];
reader.read(key);
+ keysRead++;
List<Node> responsibleNodes = strategy.routeRequest(key);
@@ -378,7 +406,8 @@ public void generateEntropy(Cluster cluster,
}
if(!negativeTest) {
- System.out.println("Found = " + foundKeys + " Total = " + totalKeys);
+ System.out.println("Found = " + foundKeys + ", Total = " + totalKeys
+ + ", Keys read = " + keysRead);
if(foundKeys > 0 && totalKeys > 0) {
System.out.println("%age found - " + 100.0 * (double) foundKeys
/ totalKeys);
View
1  src/proto/voldemort-admin.proto
@@ -74,6 +74,7 @@ message FetchPartitionEntriesRequest {
optional int64 skip_records = 5;
optional string initial_cluster = 6;
optional bool fetch_orphaned = 7;
+ optional int64 max_records = 8;
}
message FetchPartitionEntriesResponse {
View
20 test/unit/voldemort/client/AdminFetchTest.java
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2013 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package voldemort.client;
import static junit.framework.Assert.assertEquals;
@@ -138,6 +154,7 @@ public void testFetchPartitionPrimaryEntries() {
null,
false,
cluster,
+ 0,
0);
// gather all the keys obtained
Set<String> fetchedKeys = getEntries(entriesItr);
@@ -161,6 +178,7 @@ public void testFetchPartitionSecondaryEntries() {
null,
false,
cluster,
+ 0,
0);
// gather all the keys obtained
Set<String> fetchedKeys = getEntries(entriesItr);
@@ -184,6 +202,7 @@ public void testFetchNonExistentEntriesPrimary() {
null,
false,
cluster,
+ 0,
0);
// gather all the keys obtained
Set<String> fetchedKeys = getEntries(entriesItr);
@@ -202,6 +221,7 @@ public void testFetchNonExistentEntriesSecondary() {
null,
false,
cluster,
+ 0,
0);
// gather all the keys obtained
Set<String> fetchedKeys = getEntries(entriesItr);
Please sign in to comment.
Something went wrong with that request. Please try again.