Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

remove skipRecords from fetching API and protobuf

AFAIK skipRecords was never used. By inspection, the code that would have been exercised if it had been used has never been correct. Removing skipRecords from the code base.

Also:
- Added a number of TODOs to the code from the reviews
- Changed some variable names
  • Loading branch information...
commit a2d9ebb39e15fd3e7e85c44a9b128eb467535332 1 parent d486f2a
@jayjwylie jayjwylie authored
View
18 src/java/voldemort/client/protocol/admin/AdminClient.java
@@ -1444,7 +1444,6 @@ private void initiateFetchRequest(DataOutputStream outputStream,
boolean fetchValues,
boolean fetchMasterEntries,
Cluster initialCluster,
- long skipRecords,
long maxRecords) throws IOException {
HashMap<Integer, List<Integer>> filteredReplicaToPartitionList = Maps.newHashMap();
if(fetchMasterEntries) {
@@ -1460,7 +1459,6 @@ private void initiateFetchRequest(DataOutputStream outputStream,
.setFetchValues(fetchValues)
.addAllReplicaToPartition(ProtoUtils.encodePartitionTuple(filteredReplicaToPartitionList))
.setStore(storeName)
- .setSkipRecords(skipRecords)
.setMaxRecords(maxRecords);
try {
@@ -1576,7 +1574,6 @@ private void initiateFetchRequest(DataOutputStream outputStream,
* @param filter Custom filter implementation to filter out entries
* which should not be fetched.
* @param fetchMasterEntries Fetch an entry only if master replica
- * @param skipRecords Number of records to skip
* @return An iterator which allows entries to be streamed as they're
* being iterated over.
*/
@@ -1585,7 +1582,6 @@ private void initiateFetchRequest(DataOutputStream outputStream,
List<Integer> partitionList,
VoldemortFilter filter,
boolean fetchMasterEntries,
- long skipRecords,
long maxRecords) {
return fetchEntries(nodeId,
storeName,
@@ -1593,7 +1589,6 @@ private void initiateFetchRequest(DataOutputStream outputStream,
filter,
fetchMasterEntries,
null,
- skipRecords,
maxRecords);
}
@@ -1616,7 +1611,7 @@ private void initiateFetchRequest(DataOutputStream outputStream,
List<Integer> partitionList,
VoldemortFilter filter,
boolean fetchMasterEntries) {
- return fetchEntries(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0, 0);
+ return fetchEntries(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0);
}
// TODO: " HashMap<Integer, List<Integer>> replicaToPartitionList," is a
@@ -1652,7 +1647,6 @@ private void initiateFetchRequest(DataOutputStream outputStream,
* decision to fetch entries. This is important during
* rebalancing where-in we want to fetch keys using an older
* metadata compared to the new one.
- * @param skipRecords Number of records to skip
* @return An iterator which allows entries to be streamed as they're
* being iterated over.
*/
@@ -1662,7 +1656,6 @@ private void initiateFetchRequest(DataOutputStream outputStream,
VoldemortFilter filter,
boolean fetchMasterEntries,
Cluster initialCluster,
- long skipRecords,
long maxRecords) {
Node node = AdminClient.this.getAdminClientCluster().getNodeById(nodeId);
@@ -1681,7 +1674,6 @@ private void initiateFetchRequest(DataOutputStream outputStream,
true,
fetchMasterEntries,
initialCluster,
- skipRecords,
maxRecords);
} catch(IOException e) {
helperOps.close(sands.getSocket());
@@ -1800,7 +1792,6 @@ public ByteArray computeNext() {
* @param filter Custom filter implementation to filter out entries
* which should not be fetched.
* @param fetchMasterEntries Fetch a key only if master replica
- * @param skipRecords Number of keys to skip
* @return An iterator which allows keys to be streamed as they're being
* iterated over.
*/
@@ -1809,7 +1800,6 @@ public ByteArray computeNext() {
List<Integer> partitionList,
VoldemortFilter filter,
boolean fetchMasterEntries,
- long skipRecords,
long maxRecords) {
return fetchKeys(nodeId,
storeName,
@@ -1817,7 +1807,6 @@ public ByteArray computeNext() {
filter,
fetchMasterEntries,
null,
- skipRecords,
maxRecords);
}
@@ -1840,7 +1829,7 @@ public ByteArray computeNext() {
List<Integer> partitionList,
VoldemortFilter filter,
boolean fetchMasterEntries) {
- return fetchKeys(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0, 0);
+ return fetchKeys(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0);
}
/**
@@ -1855,7 +1844,6 @@ public ByteArray computeNext() {
* @param filter Custom filter
* @param initialCluster Cluster to use for selecting a key. If null,
* use the default metadata from the metadata store
- * @param skipRecords Number of records to skip [ Used for sampling ]
* @return Returns an iterator of the keys
*/
public Iterator<ByteArray> fetchKeys(int nodeId,
@@ -1864,7 +1852,6 @@ public ByteArray computeNext() {
VoldemortFilter filter,
boolean fetchMasterEntries,
Cluster initialCluster,
- long skipRecords,
long maxRecords) {
Node node = AdminClient.this.getAdminClientCluster().getNodeById(nodeId);
final SocketDestination destination = new SocketDestination(node.getHost(),
@@ -1882,7 +1869,6 @@ public ByteArray computeNext() {
false,
fetchMasterEntries,
initialCluster,
- skipRecords,
maxRecords);
} catch(IOException e) {
helperOps.close(sands.getSocket());
View
382 src/java/voldemort/client/protocol/pb/VAdminProto.java
@@ -4486,12 +4486,12 @@ public FetchPartitionEntriesRequest getDefaultInstanceForType() {
public boolean hasFetchValues() { return hasFetchValues; }
public boolean getFetchValues() { return fetchValues_; }
- // optional int64 skip_records = 5;
- public static final int SKIP_RECORDS_FIELD_NUMBER = 5;
- private boolean hasSkipRecords;
- private long skipRecords_ = 0L;
- public boolean hasSkipRecords() { return hasSkipRecords; }
- public long getSkipRecords() { return skipRecords_; }
+ // optional int64 OBSOLETE__DO_NOT_USE__skip_records = 5;
+ public static final int OBSOLETE__DO_NOT_USE__SKIP_RECORDS_FIELD_NUMBER = 5;
+ private boolean hasOBSOLETEDONOTUSESkipRecords;
+ private long oBSOLETEDONOTUSESkipRecords_ = 0L;
+ public boolean hasOBSOLETEDONOTUSESkipRecords() { return hasOBSOLETEDONOTUSESkipRecords; }
+ public long getOBSOLETEDONOTUSESkipRecords() { return oBSOLETEDONOTUSESkipRecords_; }
// optional string initial_cluster = 6;
public static final int INITIAL_CLUSTER_FIELD_NUMBER = 6;
@@ -4543,8 +4543,8 @@ public void writeTo(com.google.protobuf.CodedOutputStream output)
if (hasFetchValues()) {
output.writeBool(4, getFetchValues());
}
- if (hasSkipRecords()) {
- output.writeInt64(5, getSkipRecords());
+ if (hasOBSOLETEDONOTUSESkipRecords()) {
+ output.writeInt64(5, getOBSOLETEDONOTUSESkipRecords());
}
if (hasInitialCluster()) {
output.writeString(6, getInitialCluster());
@@ -4580,9 +4580,9 @@ public int getSerializedSize() {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(4, getFetchValues());
}
- if (hasSkipRecords()) {
+ if (hasOBSOLETEDONOTUSESkipRecords()) {
size += com.google.protobuf.CodedOutputStream
- .computeInt64Size(5, getSkipRecords());
+ .computeInt64Size(5, getOBSOLETEDONOTUSESkipRecords());
}
if (hasInitialCluster()) {
size += com.google.protobuf.CodedOutputStream
@@ -4773,8 +4773,8 @@ public Builder mergeFrom(voldemort.client.protocol.pb.VAdminProto.FetchPartition
if (other.hasFetchValues()) {
setFetchValues(other.getFetchValues());
}
- if (other.hasSkipRecords()) {
- setSkipRecords(other.getSkipRecords());
+ if (other.hasOBSOLETEDONOTUSESkipRecords()) {
+ setOBSOLETEDONOTUSESkipRecords(other.getOBSOLETEDONOTUSESkipRecords());
}
if (other.hasInitialCluster()) {
setInitialCluster(other.getInitialCluster());
@@ -4834,7 +4834,7 @@ public Builder mergeFrom(
break;
}
case 40: {
- setSkipRecords(input.readInt64());
+ setOBSOLETEDONOTUSESkipRecords(input.readInt64());
break;
}
case 50: {
@@ -4981,21 +4981,21 @@ public Builder clearFetchValues() {
return this;
}
- // optional int64 skip_records = 5;
- public boolean hasSkipRecords() {
- return result.hasSkipRecords();
+ // optional int64 OBSOLETE__DO_NOT_USE__skip_records = 5;
+ public boolean hasOBSOLETEDONOTUSESkipRecords() {
+ return result.hasOBSOLETEDONOTUSESkipRecords();
}
- public long getSkipRecords() {
- return result.getSkipRecords();
+ public long getOBSOLETEDONOTUSESkipRecords() {
+ return result.getOBSOLETEDONOTUSESkipRecords();
}
- public Builder setSkipRecords(long value) {
- result.hasSkipRecords = true;
- result.skipRecords_ = value;
+ public Builder setOBSOLETEDONOTUSESkipRecords(long value) {
+ result.hasOBSOLETEDONOTUSESkipRecords = true;
+ result.oBSOLETEDONOTUSESkipRecords_ = value;
return this;
}
- public Builder clearSkipRecords() {
- result.hasSkipRecords = false;
- result.skipRecords_ = 0L;
+ public Builder clearOBSOLETEDONOTUSESkipRecords() {
+ result.hasOBSOLETEDONOTUSESkipRecords = false;
+ result.oBSOLETEDONOTUSESkipRecords_ = 0L;
return this;
}
@@ -23163,175 +23163,175 @@ public Builder clearReserveMemory() {
"e\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"d\n\032Fe" +
"tchPartitionFilesRequest\022\r\n\005store\030\001 \002(\t\022" +
"7\n\024replica_to_partition\030\002 \003(\0132\031.voldemor" +
- "t.PartitionTuple\"\204\002\n\034FetchPartitionEntri" +
+ "t.PartitionTuple\"\232\002\n\034FetchPartitionEntri" +
"esRequest\0227\n\024replica_to_partition\030\001 \003(\0132" +
"\031.voldemort.PartitionTuple\022\r\n\005store\030\002 \002(" +
"\t\022*\n\006filter\030\003 \001(\0132\032.voldemort.VoldemortF",
- "ilter\022\024\n\014fetch_values\030\004 \001(\010\022\024\n\014skip_reco" +
- "rds\030\005 \001(\003\022\027\n\017initial_cluster\030\006 \001(\t\022\026\n\016fe" +
- "tch_orphaned\030\007 \001(\010\022\023\n\013max_records\030\010 \001(\003\"" +
- "\201\001\n\035FetchPartitionEntriesResponse\0222\n\017par" +
- "tition_entry\030\001 \001(\0132\031.voldemort.Partition" +
- "Entry\022\013\n\003key\030\002 \001(\014\022\037\n\005error\030\003 \001(\0132\020.vold" +
- "emort.Error\"\254\001\n\035DeletePartitionEntriesRe" +
- "quest\022\r\n\005store\030\001 \002(\t\0227\n\024replica_to_parti" +
- "tion\030\002 \003(\0132\031.voldemort.PartitionTuple\022*\n" +
- "\006filter\030\003 \001(\0132\032.voldemort.VoldemortFilte",
- "r\022\027\n\017initial_cluster\030\004 \001(\t\"P\n\036DeletePart" +
- "itionEntriesResponse\022\r\n\005count\030\001 \001(\003\022\037\n\005e" +
- "rror\030\002 \001(\0132\020.voldemort.Error\"\317\001\n\035Initiat" +
- "eFetchAndUpdateRequest\022\017\n\007node_id\030\001 \002(\005\022" +
- "\r\n\005store\030\002 \002(\t\022*\n\006filter\030\003 \001(\0132\032.voldemo" +
- "rt.VoldemortFilter\0227\n\024replica_to_partiti" +
- "on\030\004 \003(\0132\031.voldemort.PartitionTuple\022\027\n\017i" +
- "nitial_cluster\030\005 \001(\t\022\020\n\010optimize\030\006 \001(\010\"1" +
- "\n\033AsyncOperationStatusRequest\022\022\n\nrequest" +
- "_id\030\001 \002(\005\"/\n\031AsyncOperationStopRequest\022\022",
- "\n\nrequest_id\030\001 \002(\005\"=\n\032AsyncOperationStop" +
- "Response\022\037\n\005error\030\001 \001(\0132\020.voldemort.Erro" +
- "r\"2\n\031AsyncOperationListRequest\022\025\n\rshow_c" +
- "omplete\030\002 \002(\010\"R\n\032AsyncOperationListRespo" +
- "nse\022\023\n\013request_ids\030\001 \003(\005\022\037\n\005error\030\002 \001(\0132" +
- "\020.voldemort.Error\":\n\016PartitionTuple\022\024\n\014r" +
- "eplica_type\030\001 \002(\005\022\022\n\npartitions\030\002 \003(\005\"e\n" +
- "\026PerStorePartitionTuple\022\022\n\nstore_name\030\001 " +
- "\002(\t\0227\n\024replica_to_partition\030\002 \003(\0132\031.vold" +
- "emort.PartitionTuple\"\370\001\n\031RebalancePartit",
- "ionInfoMap\022\022\n\nstealer_id\030\001 \002(\005\022\020\n\010donor_" +
- "id\030\002 \002(\005\022\017\n\007attempt\030\003 \002(\005\022C\n\030replica_to_" +
- "add_partition\030\004 \003(\0132!.voldemort.PerStore" +
- "PartitionTuple\022F\n\033replica_to_delete_part" +
- "ition\030\005 \003(\0132!.voldemort.PerStorePartitio" +
- "nTuple\022\027\n\017initial_cluster\030\006 \002(\t\"f\n\034Initi" +
- "ateRebalanceNodeRequest\022F\n\030rebalance_par" +
- "tition_info\030\001 \002(\0132$.voldemort.RebalanceP" +
- "artitionInfoMap\"m\n#InitiateRebalanceNode" +
- "OnDonorRequest\022F\n\030rebalance_partition_in",
- "fo\030\001 \003(\0132$.voldemort.RebalancePartitionI" +
- "nfoMap\"\212\001\n\034AsyncOperationStatusResponse\022" +
- "\022\n\nrequest_id\030\001 \001(\005\022\023\n\013description\030\002 \001(\t" +
- "\022\016\n\006status\030\003 \001(\t\022\020\n\010complete\030\004 \001(\010\022\037\n\005er" +
- "ror\030\005 \001(\0132\020.voldemort.Error\"\'\n\026TruncateE" +
- "ntriesRequest\022\r\n\005store\030\001 \002(\t\":\n\027Truncate" +
- "EntriesResponse\022\037\n\005error\030\001 \001(\0132\020.voldemo" +
- "rt.Error\"*\n\017AddStoreRequest\022\027\n\017storeDefi" +
- "nition\030\001 \002(\t\"3\n\020AddStoreResponse\022\037\n\005erro" +
- "r\030\001 \001(\0132\020.voldemort.Error\"\'\n\022DeleteStore",
- "Request\022\021\n\tstoreName\030\001 \002(\t\"6\n\023DeleteStor" +
- "eResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Err" +
- "or\"P\n\021FetchStoreRequest\022\022\n\nstore_name\030\001 " +
- "\002(\t\022\021\n\tstore_dir\030\002 \002(\t\022\024\n\014push_version\030\003" +
- " \001(\003\"9\n\020SwapStoreRequest\022\022\n\nstore_name\030\001" +
- " \002(\t\022\021\n\tstore_dir\030\002 \002(\t\"P\n\021SwapStoreResp" +
- "onse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\022\032\n" +
- "\022previous_store_dir\030\002 \001(\t\"@\n\024RollbackSto" +
- "reRequest\022\022\n\nstore_name\030\001 \002(\t\022\024\n\014push_ve" +
- "rsion\030\002 \002(\003\"8\n\025RollbackStoreResponse\022\037\n\005",
- "error\030\001 \001(\0132\020.voldemort.Error\"&\n\020RepairJ" +
- "obRequest\022\022\n\nstore_name\030\001 \001(\t\"4\n\021RepairJ" +
- "obResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Er" +
- "ror\"=\n\024ROStoreVersionDirMap\022\022\n\nstore_nam" +
- "e\030\001 \002(\t\022\021\n\tstore_dir\030\002 \002(\t\"/\n\031GetROMaxVe" +
- "rsionDirRequest\022\022\n\nstore_name\030\001 \003(\t\"y\n\032G" +
- "etROMaxVersionDirResponse\022:\n\021ro_store_ve" +
- "rsions\030\001 \003(\0132\037.voldemort.ROStoreVersionD" +
- "irMap\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error\"3" +
- "\n\035GetROCurrentVersionDirRequest\022\022\n\nstore",
- "_name\030\001 \003(\t\"}\n\036GetROCurrentVersionDirRes" +
- "ponse\022:\n\021ro_store_versions\030\001 \003(\0132\037.volde" +
- "mort.ROStoreVersionDirMap\022\037\n\005error\030\002 \001(\013" +
- "2\020.voldemort.Error\"/\n\031GetROStorageFormat" +
- "Request\022\022\n\nstore_name\030\001 \003(\t\"y\n\032GetROStor" +
- "ageFormatResponse\022:\n\021ro_store_versions\030\001" +
- " \003(\0132\037.voldemort.ROStoreVersionDirMap\022\037\n" +
- "\005error\030\002 \001(\0132\020.voldemort.Error\"@\n\027Failed" +
- "FetchStoreRequest\022\022\n\nstore_name\030\001 \002(\t\022\021\n" +
- "\tstore_dir\030\002 \002(\t\";\n\030FailedFetchStoreResp",
- "onse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"\346\001" +
- "\n\033RebalanceStateChangeRequest\022K\n\035rebalan" +
- "ce_partition_info_list\030\001 \003(\0132$.voldemort" +
- ".RebalancePartitionInfoMap\022\026\n\016cluster_st" +
- "ring\030\002 \002(\t\022\017\n\007swap_ro\030\003 \002(\010\022\037\n\027change_cl" +
- "uster_metadata\030\004 \002(\010\022\036\n\026change_rebalance" +
- "_state\030\005 \002(\010\022\020\n\010rollback\030\006 \002(\010\"?\n\034Rebala" +
- "nceStateChangeResponse\022\037\n\005error\030\001 \001(\0132\020." +
- "voldemort.Error\"G\n DeleteStoreRebalanceS" +
- "tateRequest\022\022\n\nstore_name\030\001 \002(\t\022\017\n\007node_",
- "id\030\002 \002(\005\"D\n!DeleteStoreRebalanceStateRes" +
- "ponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"h" +
- "\n\023NativeBackupRequest\022\022\n\nstore_name\030\001 \002(" +
- "\t\022\022\n\nbackup_dir\030\002 \002(\t\022\024\n\014verify_files\030\003 " +
- "\002(\010\022\023\n\013incremental\030\004 \002(\010\">\n\024ReserveMemor" +
- "yRequest\022\022\n\nstore_name\030\001 \002(\t\022\022\n\nsize_in_" +
- "mb\030\002 \002(\003\"8\n\025ReserveMemoryResponse\022\037\n\005err" +
- "or\030\001 \001(\0132\020.voldemort.Error\"\360\016\n\025Voldemort" +
- "AdminRequest\022)\n\004type\030\001 \002(\0162\033.voldemort.A" +
- "dminRequestType\0223\n\014get_metadata\030\002 \001(\0132\035.",
- "voldemort.GetMetadataRequest\0229\n\017update_m" +
- "etadata\030\003 \001(\0132 .voldemort.UpdateMetadata" +
- "Request\022J\n\030update_partition_entries\030\004 \001(" +
- "\0132(.voldemort.UpdatePartitionEntriesRequ" +
- "est\022H\n\027fetch_partition_entries\030\005 \001(\0132\'.v" +
- "oldemort.FetchPartitionEntriesRequest\022J\n" +
- "\030delete_partition_entries\030\006 \001(\0132(.voldem" +
- "ort.DeletePartitionEntriesRequest\022K\n\031ini" +
- "tiate_fetch_and_update\030\007 \001(\0132(.voldemort" +
- ".InitiateFetchAndUpdateRequest\022F\n\026async_",
- "operation_status\030\010 \001(\0132&.voldemort.Async" +
- "OperationStatusRequest\022H\n\027initiate_rebal" +
- "ance_node\030\t \001(\0132\'.voldemort.InitiateReba" +
- "lanceNodeRequest\022B\n\024async_operation_stop" +
- "\030\n \001(\0132$.voldemort.AsyncOperationStopReq" +
- "uest\022B\n\024async_operation_list\030\013 \001(\0132$.vol" +
- "demort.AsyncOperationListRequest\022;\n\020trun" +
- "cate_entries\030\014 \001(\0132!.voldemort.TruncateE" +
- "ntriesRequest\022-\n\tadd_store\030\r \001(\0132\032.volde" +
- "mort.AddStoreRequest\0223\n\014delete_store\030\016 \001",
- "(\0132\035.voldemort.DeleteStoreRequest\0221\n\013fet" +
- "ch_store\030\017 \001(\0132\034.voldemort.FetchStoreReq" +
- "uest\022/\n\nswap_store\030\020 \001(\0132\033.voldemort.Swa" +
- "pStoreRequest\0227\n\016rollback_store\030\021 \001(\0132\037." +
- "voldemort.RollbackStoreRequest\022D\n\026get_ro" +
- "_max_version_dir\030\022 \001(\0132$.voldemort.GetRO" +
- "MaxVersionDirRequest\022L\n\032get_ro_current_v" +
- "ersion_dir\030\023 \001(\0132(.voldemort.GetROCurren" +
- "tVersionDirRequest\022D\n\025fetch_partition_fi" +
- "les\030\024 \001(\0132%.voldemort.FetchPartitionFile",
- "sRequest\022@\n\023update_slop_entries\030\026 \001(\0132#." +
- "voldemort.UpdateSlopEntriesRequest\022>\n\022fa" +
- "iled_fetch_store\030\030 \001(\0132\".voldemort.Faile" +
- "dFetchStoreRequest\022C\n\025get_ro_storage_for" +
- "mat\030\031 \001(\0132$.voldemort.GetROStorageFormat" +
- "Request\022F\n\026rebalance_state_change\030\032 \001(\0132" +
- "&.voldemort.RebalanceStateChangeRequest\022" +
- "/\n\nrepair_job\030\033 \001(\0132\033.voldemort.RepairJo" +
- "bRequest\022X\n initiate_rebalance_node_on_d" +
- "onor\030\034 \001(\0132..voldemort.InitiateRebalance",
- "NodeOnDonorRequest\022Q\n\034delete_store_rebal" +
- "ance_state\030\035 \001(\0132+.voldemort.DeleteStore" +
- "RebalanceStateRequest\0225\n\rnative_backup\030\036" +
- " \001(\0132\036.voldemort.NativeBackupRequest\0227\n\016" +
- "reserve_memory\030\037 \001(\0132\037.voldemort.Reserve" +
- "MemoryRequest*\310\005\n\020AdminRequestType\022\020\n\014GE" +
- "T_METADATA\020\000\022\023\n\017UPDATE_METADATA\020\001\022\034\n\030UPD" +
- "ATE_PARTITION_ENTRIES\020\002\022\033\n\027FETCH_PARTITI" +
- "ON_ENTRIES\020\003\022\034\n\030DELETE_PARTITION_ENTRIES" +
- "\020\004\022\035\n\031INITIATE_FETCH_AND_UPDATE\020\005\022\032\n\026ASY",
- "NC_OPERATION_STATUS\020\006\022\033\n\027INITIATE_REBALA" +
- "NCE_NODE\020\007\022\030\n\024ASYNC_OPERATION_STOP\020\010\022\030\n\024" +
- "ASYNC_OPERATION_LIST\020\t\022\024\n\020TRUNCATE_ENTRI" +
- "ES\020\n\022\r\n\tADD_STORE\020\013\022\020\n\014DELETE_STORE\020\014\022\017\n" +
- "\013FETCH_STORE\020\r\022\016\n\nSWAP_STORE\020\016\022\022\n\016ROLLBA" +
- "CK_STORE\020\017\022\032\n\026GET_RO_MAX_VERSION_DIR\020\020\022\036" +
- "\n\032GET_RO_CURRENT_VERSION_DIR\020\021\022\031\n\025FETCH_" +
- "PARTITION_FILES\020\022\022\027\n\023UPDATE_SLOP_ENTRIES" +
- "\020\024\022\026\n\022FAILED_FETCH_STORE\020\026\022\031\n\025GET_RO_STO" +
- "RAGE_FORMAT\020\027\022\032\n\026REBALANCE_STATE_CHANGE\020",
- "\030\022\016\n\nREPAIR_JOB\020\031\022$\n INITIATE_REBALANCE_" +
- "NODE_ON_DONOR\020\032\022 \n\034DELETE_STORE_REBALANC" +
- "E_STATE\020\033\022\021\n\rNATIVE_BACKUP\020\034\022\022\n\016RESERVE_" +
- "MEMORY\020\035B-\n\034voldemort.client.protocol.pb" +
- "B\013VAdminProtoH\001"
+ "ilter\022\024\n\014fetch_values\030\004 \001(\010\022*\n\"OBSOLETE_" +
+ "_DO_NOT_USE__skip_records\030\005 \001(\003\022\027\n\017initi" +
+ "al_cluster\030\006 \001(\t\022\026\n\016fetch_orphaned\030\007 \001(\010" +
+ "\022\023\n\013max_records\030\010 \001(\003\"\201\001\n\035FetchPartition" +
+ "EntriesResponse\0222\n\017partition_entry\030\001 \001(\013" +
+ "2\031.voldemort.PartitionEntry\022\013\n\003key\030\002 \001(\014" +
+ "\022\037\n\005error\030\003 \001(\0132\020.voldemort.Error\"\254\001\n\035De" +
+ "letePartitionEntriesRequest\022\r\n\005store\030\001 \002" +
+ "(\t\0227\n\024replica_to_partition\030\002 \003(\0132\031.volde" +
+ "mort.PartitionTuple\022*\n\006filter\030\003 \001(\0132\032.vo",
+ "ldemort.VoldemortFilter\022\027\n\017initial_clust" +
+ "er\030\004 \001(\t\"P\n\036DeletePartitionEntriesRespon" +
+ "se\022\r\n\005count\030\001 \001(\003\022\037\n\005error\030\002 \001(\0132\020.volde" +
+ "mort.Error\"\317\001\n\035InitiateFetchAndUpdateReq" +
+ "uest\022\017\n\007node_id\030\001 \002(\005\022\r\n\005store\030\002 \002(\t\022*\n\006" +
+ "filter\030\003 \001(\0132\032.voldemort.VoldemortFilter" +
+ "\0227\n\024replica_to_partition\030\004 \003(\0132\031.voldemo" +
+ "rt.PartitionTuple\022\027\n\017initial_cluster\030\005 \001" +
+ "(\t\022\020\n\010optimize\030\006 \001(\010\"1\n\033AsyncOperationSt" +
+ "atusRequest\022\022\n\nrequest_id\030\001 \002(\005\"/\n\031Async",
+ "OperationStopRequest\022\022\n\nrequest_id\030\001 \002(\005" +
+ "\"=\n\032AsyncOperationStopResponse\022\037\n\005error\030" +
+ "\001 \001(\0132\020.voldemort.Error\"2\n\031AsyncOperatio" +
+ "nListRequest\022\025\n\rshow_complete\030\002 \002(\010\"R\n\032A" +
+ "syncOperationListResponse\022\023\n\013request_ids" +
+ "\030\001 \003(\005\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error\"" +
+ ":\n\016PartitionTuple\022\024\n\014replica_type\030\001 \002(\005\022" +
+ "\022\n\npartitions\030\002 \003(\005\"e\n\026PerStorePartition" +
+ "Tuple\022\022\n\nstore_name\030\001 \002(\t\0227\n\024replica_to_" +
+ "partition\030\002 \003(\0132\031.voldemort.PartitionTup",
+ "le\"\370\001\n\031RebalancePartitionInfoMap\022\022\n\nstea" +
+ "ler_id\030\001 \002(\005\022\020\n\010donor_id\030\002 \002(\005\022\017\n\007attemp" +
+ "t\030\003 \002(\005\022C\n\030replica_to_add_partition\030\004 \003(" +
+ "\0132!.voldemort.PerStorePartitionTuple\022F\n\033" +
+ "replica_to_delete_partition\030\005 \003(\0132!.vold" +
+ "emort.PerStorePartitionTuple\022\027\n\017initial_" +
+ "cluster\030\006 \002(\t\"f\n\034InitiateRebalanceNodeRe" +
+ "quest\022F\n\030rebalance_partition_info\030\001 \002(\0132" +
+ "$.voldemort.RebalancePartitionInfoMap\"m\n" +
+ "#InitiateRebalanceNodeOnDonorRequest\022F\n\030",
+ "rebalance_partition_info\030\001 \003(\0132$.voldemo" +
+ "rt.RebalancePartitionInfoMap\"\212\001\n\034AsyncOp" +
+ "erationStatusResponse\022\022\n\nrequest_id\030\001 \001(" +
+ "\005\022\023\n\013description\030\002 \001(\t\022\016\n\006status\030\003 \001(\t\022\020" +
+ "\n\010complete\030\004 \001(\010\022\037\n\005error\030\005 \001(\0132\020.voldem" +
+ "ort.Error\"\'\n\026TruncateEntriesRequest\022\r\n\005s" +
+ "tore\030\001 \002(\t\":\n\027TruncateEntriesResponse\022\037\n" +
+ "\005error\030\001 \001(\0132\020.voldemort.Error\"*\n\017AddSto" +
+ "reRequest\022\027\n\017storeDefinition\030\001 \002(\t\"3\n\020Ad" +
+ "dStoreResponse\022\037\n\005error\030\001 \001(\0132\020.voldemor",
+ "t.Error\"\'\n\022DeleteStoreRequest\022\021\n\tstoreNa" +
+ "me\030\001 \002(\t\"6\n\023DeleteStoreResponse\022\037\n\005error" +
+ "\030\001 \001(\0132\020.voldemort.Error\"P\n\021FetchStoreRe" +
+ "quest\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030\002" +
+ " \002(\t\022\024\n\014push_version\030\003 \001(\003\"9\n\020SwapStoreR" +
+ "equest\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030" +
+ "\002 \002(\t\"P\n\021SwapStoreResponse\022\037\n\005error\030\001 \001(" +
+ "\0132\020.voldemort.Error\022\032\n\022previous_store_di" +
+ "r\030\002 \001(\t\"@\n\024RollbackStoreRequest\022\022\n\nstore" +
+ "_name\030\001 \002(\t\022\024\n\014push_version\030\002 \002(\003\"8\n\025Rol",
+ "lbackStoreResponse\022\037\n\005error\030\001 \001(\0132\020.vold" +
+ "emort.Error\"&\n\020RepairJobRequest\022\022\n\nstore" +
+ "_name\030\001 \001(\t\"4\n\021RepairJobResponse\022\037\n\005erro" +
+ "r\030\001 \001(\0132\020.voldemort.Error\"=\n\024ROStoreVers" +
+ "ionDirMap\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_d" +
+ "ir\030\002 \002(\t\"/\n\031GetROMaxVersionDirRequest\022\022\n" +
+ "\nstore_name\030\001 \003(\t\"y\n\032GetROMaxVersionDirR" +
+ "esponse\022:\n\021ro_store_versions\030\001 \003(\0132\037.vol" +
+ "demort.ROStoreVersionDirMap\022\037\n\005error\030\002 \001" +
+ "(\0132\020.voldemort.Error\"3\n\035GetROCurrentVers",
+ "ionDirRequest\022\022\n\nstore_name\030\001 \003(\t\"}\n\036Get" +
+ "ROCurrentVersionDirResponse\022:\n\021ro_store_" +
+ "versions\030\001 \003(\0132\037.voldemort.ROStoreVersio" +
+ "nDirMap\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error" +
+ "\"/\n\031GetROStorageFormatRequest\022\022\n\nstore_n" +
+ "ame\030\001 \003(\t\"y\n\032GetROStorageFormatResponse\022" +
+ ":\n\021ro_store_versions\030\001 \003(\0132\037.voldemort.R" +
+ "OStoreVersionDirMap\022\037\n\005error\030\002 \001(\0132\020.vol" +
+ "demort.Error\"@\n\027FailedFetchStoreRequest\022" +
+ "\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030\002 \002(\t\";",
+ "\n\030FailedFetchStoreResponse\022\037\n\005error\030\001 \001(" +
+ "\0132\020.voldemort.Error\"\346\001\n\033RebalanceStateCh" +
+ "angeRequest\022K\n\035rebalance_partition_info_" +
+ "list\030\001 \003(\0132$.voldemort.RebalancePartitio" +
+ "nInfoMap\022\026\n\016cluster_string\030\002 \002(\t\022\017\n\007swap" +
+ "_ro\030\003 \002(\010\022\037\n\027change_cluster_metadata\030\004 \002" +
+ "(\010\022\036\n\026change_rebalance_state\030\005 \002(\010\022\020\n\010ro" +
+ "llback\030\006 \002(\010\"?\n\034RebalanceStateChangeResp" +
+ "onse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"G\n" +
+ " DeleteStoreRebalanceStateRequest\022\022\n\nsto",
+ "re_name\030\001 \002(\t\022\017\n\007node_id\030\002 \002(\005\"D\n!Delete" +
+ "StoreRebalanceStateResponse\022\037\n\005error\030\001 \001" +
+ "(\0132\020.voldemort.Error\"h\n\023NativeBackupRequ" +
+ "est\022\022\n\nstore_name\030\001 \002(\t\022\022\n\nbackup_dir\030\002 " +
+ "\002(\t\022\024\n\014verify_files\030\003 \002(\010\022\023\n\013incremental" +
+ "\030\004 \002(\010\">\n\024ReserveMemoryRequest\022\022\n\nstore_" +
+ "name\030\001 \002(\t\022\022\n\nsize_in_mb\030\002 \002(\003\"8\n\025Reserv" +
+ "eMemoryResponse\022\037\n\005error\030\001 \001(\0132\020.voldemo" +
+ "rt.Error\"\360\016\n\025VoldemortAdminRequest\022)\n\004ty" +
+ "pe\030\001 \002(\0162\033.voldemort.AdminRequestType\0223\n",
+ "\014get_metadata\030\002 \001(\0132\035.voldemort.GetMetad" +
+ "ataRequest\0229\n\017update_metadata\030\003 \001(\0132 .vo" +
+ "ldemort.UpdateMetadataRequest\022J\n\030update_" +
+ "partition_entries\030\004 \001(\0132(.voldemort.Upda" +
+ "tePartitionEntriesRequest\022H\n\027fetch_parti" +
+ "tion_entries\030\005 \001(\0132\'.voldemort.FetchPart" +
+ "itionEntriesRequest\022J\n\030delete_partition_" +
+ "entries\030\006 \001(\0132(.voldemort.DeletePartitio" +
+ "nEntriesRequest\022K\n\031initiate_fetch_and_up" +
+ "date\030\007 \001(\0132(.voldemort.InitiateFetchAndU",
+ "pdateRequest\022F\n\026async_operation_status\030\010" +
+ " \001(\0132&.voldemort.AsyncOperationStatusReq" +
+ "uest\022H\n\027initiate_rebalance_node\030\t \001(\0132\'." +
+ "voldemort.InitiateRebalanceNodeRequest\022B" +
+ "\n\024async_operation_stop\030\n \001(\0132$.voldemort" +
+ ".AsyncOperationStopRequest\022B\n\024async_oper" +
+ "ation_list\030\013 \001(\0132$.voldemort.AsyncOperat" +
+ "ionListRequest\022;\n\020truncate_entries\030\014 \001(\013" +
+ "2!.voldemort.TruncateEntriesRequest\022-\n\ta" +
+ "dd_store\030\r \001(\0132\032.voldemort.AddStoreReque",
+ "st\0223\n\014delete_store\030\016 \001(\0132\035.voldemort.Del" +
+ "eteStoreRequest\0221\n\013fetch_store\030\017 \001(\0132\034.v" +
+ "oldemort.FetchStoreRequest\022/\n\nswap_store" +
+ "\030\020 \001(\0132\033.voldemort.SwapStoreRequest\0227\n\016r" +
+ "ollback_store\030\021 \001(\0132\037.voldemort.Rollback" +
+ "StoreRequest\022D\n\026get_ro_max_version_dir\030\022" +
+ " \001(\0132$.voldemort.GetROMaxVersionDirReque" +
+ "st\022L\n\032get_ro_current_version_dir\030\023 \001(\0132(" +
+ ".voldemort.GetROCurrentVersionDirRequest" +
+ "\022D\n\025fetch_partition_files\030\024 \001(\0132%.voldem",
+ "ort.FetchPartitionFilesRequest\022@\n\023update" +
+ "_slop_entries\030\026 \001(\0132#.voldemort.UpdateSl" +
+ "opEntriesRequest\022>\n\022failed_fetch_store\030\030" +
+ " \001(\0132\".voldemort.FailedFetchStoreRequest" +
+ "\022C\n\025get_ro_storage_format\030\031 \001(\0132$.voldem" +
+ "ort.GetROStorageFormatRequest\022F\n\026rebalan" +
+ "ce_state_change\030\032 \001(\0132&.voldemort.Rebala" +
+ "nceStateChangeRequest\022/\n\nrepair_job\030\033 \001(" +
+ "\0132\033.voldemort.RepairJobRequest\022X\n initia" +
+ "te_rebalance_node_on_donor\030\034 \001(\0132..volde",
+ "mort.InitiateRebalanceNodeOnDonorRequest" +
+ "\022Q\n\034delete_store_rebalance_state\030\035 \001(\0132+" +
+ ".voldemort.DeleteStoreRebalanceStateRequ" +
+ "est\0225\n\rnative_backup\030\036 \001(\0132\036.voldemort.N" +
+ "ativeBackupRequest\0227\n\016reserve_memory\030\037 \001" +
+ "(\0132\037.voldemort.ReserveMemoryRequest*\310\005\n\020" +
+ "AdminRequestType\022\020\n\014GET_METADATA\020\000\022\023\n\017UP" +
+ "DATE_METADATA\020\001\022\034\n\030UPDATE_PARTITION_ENTR" +
+ "IES\020\002\022\033\n\027FETCH_PARTITION_ENTRIES\020\003\022\034\n\030DE" +
+ "LETE_PARTITION_ENTRIES\020\004\022\035\n\031INITIATE_FET",
+ "CH_AND_UPDATE\020\005\022\032\n\026ASYNC_OPERATION_STATU" +
+ "S\020\006\022\033\n\027INITIATE_REBALANCE_NODE\020\007\022\030\n\024ASYN" +
+ "C_OPERATION_STOP\020\010\022\030\n\024ASYNC_OPERATION_LI" +
+ "ST\020\t\022\024\n\020TRUNCATE_ENTRIES\020\n\022\r\n\tADD_STORE\020" +
+ "\013\022\020\n\014DELETE_STORE\020\014\022\017\n\013FETCH_STORE\020\r\022\016\n\n" +
+ "SWAP_STORE\020\016\022\022\n\016ROLLBACK_STORE\020\017\022\032\n\026GET_" +
+ "RO_MAX_VERSION_DIR\020\020\022\036\n\032GET_RO_CURRENT_V" +
+ "ERSION_DIR\020\021\022\031\n\025FETCH_PARTITION_FILES\020\022\022" +
+ "\027\n\023UPDATE_SLOP_ENTRIES\020\024\022\026\n\022FAILED_FETCH" +
+ "_STORE\020\026\022\031\n\025GET_RO_STORAGE_FORMAT\020\027\022\032\n\026R",
+ "EBALANCE_STATE_CHANGE\020\030\022\016\n\nREPAIR_JOB\020\031\022" +
+ "$\n INITIATE_REBALANCE_NODE_ON_DONOR\020\032\022 \n" +
+ "\034DELETE_STORE_REBALANCE_STATE\020\033\022\021\n\rNATIV" +
+ "E_BACKUP\020\034\022\022\n\016RESERVE_MEMORY\020\035B-\n\034voldem" +
+ "ort.client.protocol.pbB\013VAdminProtoH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -23439,7 +23439,7 @@ public Builder clearReserveMemory() {
internal_static_voldemort_FetchPartitionEntriesRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_voldemort_FetchPartitionEntriesRequest_descriptor,
- new java.lang.String[] { "ReplicaToPartition", "Store", "Filter", "FetchValues", "SkipRecords", "InitialCluster", "FetchOrphaned", "MaxRecords", },
+ new java.lang.String[] { "ReplicaToPartition", "Store", "Filter", "FetchValues", "OBSOLETEDONOTUSESkipRecords", "InitialCluster", "FetchOrphaned", "MaxRecords", },
voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest.class,
voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest.Builder.class);
internal_static_voldemort_FetchPartitionEntriesResponse_descriptor =
View
1  src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
@@ -974,7 +974,6 @@ public void operate() {
filter,
false,
initialCluster,
- 0,
0);
long numTuples = 0;
long startTime = System.currentTimeMillis();
View
4 src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java
@@ -87,7 +87,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
key.get(),
replicaToPartitionList,
initialCluster,
- storeDef) && counter % skipRecords == 0) {
+ storeDef)) {
entryAccepted = true;
}
} else {
@@ -142,7 +142,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
+ " s");
}
- if(keyIterator.hasNext() && counter < maxRecords * skipRecords)
+ if(keyIterator.hasNext() && counter < maxRecords)
return StreamRequestHandlerState.WRITING;
else {
return StreamRequestHandlerState.COMPLETE;
View
25 src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java
@@ -57,6 +57,7 @@ public FetchKeysStreamRequestHandler(FetchPartitionEntriesRequest request,
+ "' with replica to partition mapping " + replicaToPartitionList);
}
+ @Override
public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
DataOutputStream outputStream)
throws IOException {
@@ -73,14 +74,11 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
throttler.maybeThrottle(key.length());
boolean keyAccepted = false;
if(!fetchOrphaned) {
- // normal code path
if(StoreInstance.checkKeyBelongsToPartition(nodeId,
key.get(),
replicaToPartitionList,
initialCluster,
- storeDef)
- && filter.accept(key, null)
- && counter % skipRecords == 0) {
+ storeDef) && filter.accept(key, null)) {
keyAccepted = true;
}
@@ -100,8 +98,13 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
startNs = System.nanoTime();
ProtoUtils.writeMessage(outputStream, message);
- if(streamStats != null)
+ if(streamStats != null) {
+ // TODO: The accounting for streaming reads should also
+ // move along with the next() call since we are indeed
+ // fetching from disk.. ---VChandar
+
streamStats.reportNetworkTime(operation, System.nanoTime() - startNs);
+ }
}
// log progress
@@ -116,7 +119,17 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
+ " s");
}
- if(keyIterator.hasNext() && counter < maxRecords * skipRecords)
+ // TODO: make usage clearer. Rename maxRecords to recordsPerPartition.
+ // And, make recordsPerPartition <=0 mean 'get them all'.
+
+ // TODO: Remove skipRecords from message and from code.
+
+ // TODO: Make sure the distinction between FetchKeysStream and
+ // FetchPartitionKeysStream is clear.
+
+ // TODO: Add logic to FetchKeys and FetchEntries to account for keys per
+ // partition.
+ if(keyIterator.hasNext() && (counter < maxRecords))
return StreamRequestHandlerState.WRITING;
else {
return StreamRequestHandlerState.COMPLETE;
View
63 src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java
@@ -92,7 +92,8 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// process the next partition
if(entriesPartitionIterator == null) {
- if(currentIndex == partitionList.size() || counter >= maxRecords * skipRecords) {
+ if(currentIndex == partitionList.size() || counter >= maxRecords) {
+ // TODO: Make .info consistent
logger.info("Done fetching store " + storageEngine.getName() + " : " + counter
+ " records processed.");
return StreamRequestHandlerState.COMPLETE;
@@ -127,39 +128,33 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
counter++;
Pair<ByteArray, Versioned<byte[]>> entry = entriesPartitionIterator.next();
- // honor skipRecords
- if(counter % skipRecords == 0) {
- // do the filtering
- if(streamStats != null) {
- streamStats.reportStorageTime(operation, System.nanoTime() - startNs);
- streamStats.reportStreamingScan(operation);
- }
- ByteArray key = entry.getFirst();
- Versioned<byte[]> value = entry.getSecond();
-
- throttler.maybeThrottle(key.length());
- if(filter.accept(key, value)) {
- fetched++;
- if(streamStats != null)
- streamStats.reportStreamingFetch(operation);
- VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
-
- VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder()
- .setKey(ProtoUtils.encodeBytes(key))
- .setVersioned(ProtoUtils.encodeVersioned(value))
- .build();
- response.setPartitionEntry(partitionEntry);
- Message message = response.build();
-
- startNs = System.nanoTime();
- ProtoUtils.writeMessage(outputStream, message);
- if(streamStats != null)
- streamStats.reportNetworkTime(operation, System.nanoTime() - startNs);
- throttler.maybeThrottle(AdminServiceRequestHandler.valueSize(value));
- }
- } else {
+ // do the filtering
+ if(streamStats != null) {
+ streamStats.reportStorageTime(operation, System.nanoTime() - startNs);
+ streamStats.reportStreamingScan(operation);
+ }
+ ByteArray key = entry.getFirst();
+ Versioned<byte[]> value = entry.getSecond();
+
+ throttler.maybeThrottle(key.length());
+ if(filter.accept(key, value)) {
+ fetched++;
+ if(streamStats != null)
+ streamStats.reportStreamingFetch(operation);
+ VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
+
+ VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder()
+ .setKey(ProtoUtils.encodeBytes(key))
+ .setVersioned(ProtoUtils.encodeVersioned(value))
+ .build();
+ response.setPartitionEntry(partitionEntry);
+ Message message = response.build();
+
+ startNs = System.nanoTime();
+ ProtoUtils.writeMessage(outputStream, message);
if(streamStats != null)
- streamStats.reportStorageTime(operation, System.nanoTime() - startNs);
+ streamStats.reportNetworkTime(operation, System.nanoTime() - startNs);
+ throttler.maybeThrottle(AdminServiceRequestHandler.valueSize(value));
}
// log progress
@@ -174,7 +169,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
}
// reset the iterator if done with this partition
- if(!entriesPartitionIterator.hasNext() || counter >= maxRecords * skipRecords) {
+ if(!entriesPartitionIterator.hasNext() || counter >= maxRecords) {
entriesPartitionIterator.close();
entriesPartitionIterator = null;
}
View
54 src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java
@@ -84,13 +84,16 @@ public FetchPartitionKeysStreamRequestHandler(FetchPartitionEntriesRequest reque
}
}
+ @Override
public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
DataOutputStream outputStream)
throws IOException {
// process the next partition
if(keysPartitionIterator == null) {
- if(currentIndex == partitionList.size() || counter >= maxRecords * skipRecords) {
+ if(currentIndex == partitionList.size() || counter >= maxRecords) {
+ // TODO: Make all .info messages consistent. "Records fetched"
+ // instead of "Done fetching".
logger.info("Done fetching store " + storageEngine.getName() + " : " + counter
+ " records processed.");
return StreamRequestHandlerState.COMPLETE;
@@ -125,32 +128,29 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
counter++;
ByteArray key = keysPartitionIterator.next();
- // honor skipRecords
- if(counter % skipRecords == 0) {
- // do the filtering
- if(streamStats != null) {
- streamStats.reportStorageTime(operation, System.nanoTime() - startNs);
- streamStats.reportStreamingScan(operation);
- }
- throttler.maybeThrottle(key.length());
- if(filter.accept(key, null)) {
-
- VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
- response.setKey(ProtoUtils.encodeBytes(key));
-
- fetched++;
- if(streamStats != null)
- streamStats.reportStreamingFetch(operation);
- Message message = response.build();
-
- startNs = System.nanoTime();
- ProtoUtils.writeMessage(outputStream, message);
- if(streamStats != null)
- streamStats.reportNetworkTime(operation, System.nanoTime() - startNs);
- }
- } else {
+ // do the filtering
+ if(streamStats != null) {
+ // TODO: The accounting for streaming reads should also
+ // move along with the next() call since we are indeed
+ // fetching from disk.. ---VChandar
+ streamStats.reportStorageTime(operation, System.nanoTime() - startNs);
+ streamStats.reportStreamingScan(operation);
+ }
+ throttler.maybeThrottle(key.length());
+ if(filter.accept(key, null)) {
+
+ VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder();
+ response.setKey(ProtoUtils.encodeBytes(key));
+
+ fetched++;
+ if(streamStats != null)
+ streamStats.reportStreamingFetch(operation);
+ Message message = response.build();
+
+ startNs = System.nanoTime();
+ ProtoUtils.writeMessage(outputStream, message);
if(streamStats != null)
- streamStats.reportStorageTime(operation, System.nanoTime() - startNs);
+ streamStats.reportNetworkTime(operation, System.nanoTime() - startNs);
}
// log progress
@@ -166,7 +166,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
// reset the iterator if done with this partition or fetched enough
// records
- if(!keysPartitionIterator.hasNext() || counter >= maxRecords * skipRecords) {
+ if(!keysPartitionIterator.hasNext() || (counter >= maxRecords)) {
keysPartitionIterator.close();
keysPartitionIterator = null;
}
View
8 src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java
@@ -68,8 +68,6 @@
protected long counter;
- protected long skipRecords;
-
protected long maxRecords;
protected int fetched;
@@ -122,10 +120,8 @@ protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest req
this.startTime = System.currentTimeMillis();
this.counter = 0;
- this.skipRecords = 1;
- if(request.hasSkipRecords() && request.getSkipRecords() >= 0) {
- this.skipRecords = request.getSkipRecords() + 1;
- }
+ // TODO: maxRecords should default to 0 for clarity imho. (And change to
+ // recordsPerPartition_
this.maxRecords = Long.MAX_VALUE;
if(request.hasMaxRecords() && request.getMaxRecords() > 0) {
this.maxRecords = request.getMaxRecords();
View
2  src/java/voldemort/utils/Entropy.java
@@ -250,7 +250,6 @@ public void generateEntropy(Cluster cluster,
.getPartitionIds(),
null,
false,
- 0,
numKeysPerNode);
for(long keyId = 0; keyId < numKeysPerNode && keys.hasNext(); keyId++) {
ByteArray key = keys.next();
@@ -283,7 +282,6 @@ public void generateEntropy(Cluster cluster,
partitions,
null,
false,
- 0,
numKeysPerPartition);
while(keys.hasNext() && numKeysStored < numKeys) {
ByteArray key = keys.next();
View
73 src/java/voldemort/utils/KeySamplerCLI.java
@@ -49,16 +49,14 @@
* cluster. A distinct file of sampled keys is generated for each store.
*
* By default, the "first" key of each partition is sampled. Optional arguments
- * control sampling more keys per partition, and skipping some keys on the
- * server while sampling.
+ * control sampling more keys per partition.
*/
public class KeySamplerCLI {
- private static Logger logger = Logger.getLogger(ConsistencyCheck.class);
+ private static Logger logger = Logger.getLogger(KeySamplerCLI.class);
- private final static int NODE_PARALLELISM = 8;
- private final static int MAX_RECORDS = 1;
- private final static int SKIP_RECORDS = 0;
+ private final static int DEFAULT_NODE_PARALLELISM = 8;
+ private final static int DEFAULT_MAX_RECORDS = 1;
private final AdminClient adminClient;
private final Cluster cluster;
@@ -69,13 +67,8 @@
private final ExecutorService nodeSamplerService;
private final int maxRecords;
- private final int skipRecords;
- public KeySamplerCLI(String url,
- String outDir,
- int nodeParallelism,
- int maxRecords,
- int skipRecords) {
+ public KeySamplerCLI(String url, String outDir, int nodeParallelism, int maxRecords) {
if(logger.isInfoEnabled()) {
logger.info("Connecting to bootstrap server: " + url);
}
@@ -92,7 +85,6 @@ public KeySamplerCLI(String url,
this.nodeSamplerService = Executors.newFixedThreadPool(nodeParallelism);
this.maxRecords = maxRecords;
- this.skipRecords = skipRecords;
}
public boolean sampleStores() {
@@ -112,6 +104,7 @@ public boolean sampleStores() {
NodeSampleResult(boolean success, String keyString) {
this.success = success;
+ // TODO: keysString versus keyString
this.keyString = keyString;
}
}
@@ -136,6 +129,9 @@ public NodeSampleResult call() throws Exception {
for(int partitionId: node.getPartitionIds()) {
success = false;
+ // TODO: real per-server throttling and/or make '100' a command
+ // line argument.
+
// Simple, lame throttling since thread is going at same node
// repeatedly
try {
@@ -158,13 +154,11 @@ public NodeSampleResult call() throws Exception {
while(attempts < 5 && !success) {
try {
Iterator<ByteArray> fetchIterator;
- // TODO: should fetchMasterEntries be true?
fetchIterator = adminClient.bulkFetchOps.fetchKeys(node.getId(),
storeName,
singlePartition,
null,
- false,
- skipRecords,
+ true,
maxRecords);
int keyCount = 0;
while(fetchIterator.hasNext()) {
@@ -176,6 +170,9 @@ public NodeSampleResult call() throws Exception {
if(keyCount < maxRecords) {
logger.warn("Fewer keys (" + keyCount + ") than requested ("
+ maxRecords + ") returned --- " + infoTag);
+ } else if(keyCount < maxRecords) {
+ logger.warn("More keys (" + keyCount + ") than requested ("
+ + maxRecords + ") returned --- " + infoTag);
}
success = true;
} catch(VoldemortException ve) {
@@ -289,19 +286,14 @@ private static OptionParser getParser() {
.describedAs("outputDirectory")
.ofType(String.class);
parser.accepts("parallelism",
- "Number of nodes to sample in parallel. [Default: " + NODE_PARALLELISM
- + " ]")
+ "Number of nodes to sample in parallel. [Default: "
+ + DEFAULT_NODE_PARALLELISM + " ]")
.withRequiredArg()
.describedAs("storeParallelism")
.ofType(Integer.class);
parser.accepts("max-records",
- "Number of keys sampled per partitoin. [Default: " + MAX_RECORDS + " ]")
- .withRequiredArg()
- .describedAs("maxRecords")
- .ofType(Integer.class);
- parser.accepts("skip-records",
- "Number of keys to skip between samples (per partition). [Default: "
- + SKIP_RECORDS + " ]")
+ "Number of keys sampled per partitoin. [Default: " + DEFAULT_MAX_RECORDS
+ + " ]")
.withRequiredArg()
.describedAs("maxRecords")
.ofType(Integer.class);
@@ -322,7 +314,6 @@ private static void printUsage() {
help.append(" Optional:\n");
help.append(" --parallelism <nodeParallelism>\n");
help.append(" --max-records <maxRecords>\n");
- help.append(" --skip-records <skipRecords>\n");
help.append(" --help\n");
System.out.print(help.toString());
}
@@ -332,11 +323,11 @@ private static void printUsageAndDie(String errMessage) {
Utils.croak("\n" + errMessage);
}
- // TODO: Add a "stores" option so that a subset of stores can be done
- // instead of all stores one-by-one.
+ // TODO: (if needed) Add a "stores" option so that a subset of stores can be
+ // done instead of all stores one-by-one.
- // TODO: Add a "partitions" option so that a subset of partitions can be
- // done instead of all partitions.
+ // TODO: (if needed) Add a "partitions" option so that a subset of
+ // partitions can be done instead of all partitions.
public static void main(String[] args) throws Exception {
OptionSet options = null;
@@ -362,36 +353,24 @@ public static void main(String[] args) throws Exception {
String outDir = (String) options.valueOf("out-dir");
Utils.mkdirs(new File(outDir));
- Integer nodeParallelism = NODE_PARALLELISM;
+ Integer nodeParallelism = DEFAULT_NODE_PARALLELISM;
if(options.hasArgument("parallelism")) {
nodeParallelism = (Integer) options.valueOf("parallelism");
}
- Integer maxRecords = MAX_RECORDS;
+ Integer maxRecords = DEFAULT_MAX_RECORDS;
if(options.hasArgument("max-records")) {
maxRecords = (Integer) options.valueOf("max-records");
}
- Integer skipRecords = SKIP_RECORDS;
- if(options.hasArgument("skip-records")) {
- skipRecords = (Integer) options.valueOf("skip-records");
- }
-
- // TODO: Add a '--pid-server' and a '--unordered-server' option and
- // require exactly one of them to be set. This forces the person
- // invoking the command to determine if the servers can do per-partition
- // sampling directly, or if many keys must be explicitly sampled so that
- // determination of partition coverage is done client-side.
+ // TODO: Assuming "right thing" happens server-side, then do not need
+ // the below warning...
logger.warn("This tool is hard-coded to take advantage of servers that "
+ "use PID style layout of data in BDB. "
+ "Use fo this tool against other types of servers is undefined.");
try {
- KeySamplerCLI sampler = new KeySamplerCLI(url,
- outDir,
- nodeParallelism,
- maxRecords,
- skipRecords);
+ KeySamplerCLI sampler = new KeySamplerCLI(url, outDir, nodeParallelism, maxRecords);
try {
if(!sampler.sampleStores()) {
logger.error("Some stores were not successfully sampled.");
View
5 src/java/voldemort/utils/KeyVersionSamplerCLI.java
@@ -53,9 +53,10 @@
* key-versions is generated.
*
*/
+// TODO: Rename KeyValueFetcher
public class KeyVersionSamplerCLI {
- private static Logger logger = Logger.getLogger(ConsistencyCheck.class);
+ private static Logger logger = Logger.getLogger(KeyVersionSamplerCLI.class);
private final static int KEY_PARALLELISM = 4;
@@ -270,6 +271,8 @@ private static void printUsageAndDie(String errMessage) {
Utils.croak("\n" + errMessage);
}
+ // TODO: Add option to fetch value in addition to version
+ // TODO: Add option to print human readable versions versus byte.hexstrings
public static void main(String[] args) throws Exception {
OptionSet options = null;
try {
View
3  src/proto/voldemort-admin.proto
@@ -76,10 +76,11 @@ message FetchPartitionEntriesRequest {
required string store = 2;
optional VoldemortFilter filter = 3;
optional bool fetch_values = 4;
- optional int64 skip_records = 5;
+ optional int64 OBSOLETE__DO_NOT_USE__skip_records = 5;
optional string initial_cluster = 6;
optional bool fetch_orphaned = 7;
optional int64 max_records = 8;
+ // optional int64 records_per_partition = 8;
}
message FetchPartitionEntriesResponse {
View
4 test/unit/voldemort/client/AdminFetchTest.java
@@ -153,7 +153,6 @@ public void testFetchPartitionPrimaryEntries() {
null,
false,
cluster,
- 0,
0);
// gather all the keys obtained
Set<String> fetchedKeys = getEntries(entriesItr);
@@ -177,7 +176,6 @@ public void testFetchPartitionSecondaryEntries() {
null,
false,
cluster,
- 0,
0);
// gather all the keys obtained
Set<String> fetchedKeys = getEntries(entriesItr);
@@ -201,7 +199,6 @@ public void testFetchNonExistentEntriesPrimary() {
null,
false,
cluster,
- 0,
0);
// gather all the keys obtained
Set<String> fetchedKeys = getEntries(entriesItr);
@@ -220,7 +217,6 @@ public void testFetchNonExistentEntriesSecondary() {
null,
false,
cluster,
- 0,
0);
// gather all the keys obtained
Set<String> fetchedKeys = getEntries(entriesItr);
Please sign in to comment.
Something went wrong with that request. Please try again.