From a2d9ebb39e15fd3e7e85c44a9b128eb467535332 Mon Sep 17 00:00:00 2001 From: Jay J Wylie Date: Wed, 13 Mar 2013 09:53:11 -0700 Subject: [PATCH] remove skipRecords from fetching API and protobuf AFAIK skipRecords was never used. By inspection, the code that would have been exercised if it had been used has never been correct. Removing skipRecords from the code base. Also: - Added a number of TODOs to the code from the reviews - Changed some variable names --- .../client/protocol/admin/AdminClient.java | 18 +- .../client/protocol/pb/VAdminProto.java | 382 +++++++++--------- .../admin/AdminServiceRequestHandler.java | 1 - .../FetchEntriesStreamRequestHandler.java | 4 +- .../admin/FetchKeysStreamRequestHandler.java | 25 +- ...hPartitionEntriesStreamRequestHandler.java | 63 ++- ...etchPartitionKeysStreamRequestHandler.java | 54 +-- .../admin/FetchStreamRequestHandler.java | 8 +- src/java/voldemort/utils/Entropy.java | 2 - src/java/voldemort/utils/KeySamplerCLI.java | 73 ++-- .../voldemort/utils/KeyVersionSamplerCLI.java | 5 +- src/proto/voldemort-admin.proto | 3 +- .../unit/voldemort/client/AdminFetchTest.java | 4 - 13 files changed, 304 insertions(+), 338 deletions(-) diff --git a/src/java/voldemort/client/protocol/admin/AdminClient.java b/src/java/voldemort/client/protocol/admin/AdminClient.java index 28b933d46c..69fe275b33 100644 --- a/src/java/voldemort/client/protocol/admin/AdminClient.java +++ b/src/java/voldemort/client/protocol/admin/AdminClient.java @@ -1444,7 +1444,6 @@ private void initiateFetchRequest(DataOutputStream outputStream, boolean fetchValues, boolean fetchMasterEntries, Cluster initialCluster, - long skipRecords, long maxRecords) throws IOException { HashMap> filteredReplicaToPartitionList = Maps.newHashMap(); if(fetchMasterEntries) { @@ -1460,7 +1459,6 @@ private void initiateFetchRequest(DataOutputStream outputStream, .setFetchValues(fetchValues) .addAllReplicaToPartition(ProtoUtils.encodePartitionTuple(filteredReplicaToPartitionList)) .setStore(storeName) - .setSkipRecords(skipRecords) .setMaxRecords(maxRecords); try { @@ -1576,7 +1574,6 @@ public Pair> computeNext() { * @param filter Custom filter implementation to filter out entries * which should not be fetched. * @param fetchMasterEntries Fetch an entry only if master replica - * @param skipRecords Number of records to skip * @return An iterator which allows entries to be streamed as they're * being iterated over. */ @@ -1585,7 +1582,6 @@ public Iterator>> fetchEntries(int nodeId, List partitionList, VoldemortFilter filter, boolean fetchMasterEntries, - long skipRecords, long maxRecords) { return fetchEntries(nodeId, storeName, @@ -1593,7 +1589,6 @@ public Iterator>> fetchEntries(int nodeId, filter, fetchMasterEntries, null, - skipRecords, maxRecords); } @@ -1616,7 +1611,7 @@ public Iterator>> fetchEntries(int nodeId, List partitionList, VoldemortFilter filter, boolean fetchMasterEntries) { - return fetchEntries(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0, 0); + return fetchEntries(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0); } // TODO: " HashMap> replicaToPartitionList," is a @@ -1652,7 +1647,6 @@ public Iterator>> fetchEntries(int nodeId, * decision to fetch entries. This is important during * rebalancing where-in we want to fetch keys using an older * metadata compared to the new one. - * @param skipRecords Number of records to skip * @return An iterator which allows entries to be streamed as they're * being iterated over. */ @@ -1662,7 +1656,6 @@ public Iterator>> fetchEntries(int nodeId, VoldemortFilter filter, boolean fetchMasterEntries, Cluster initialCluster, - long skipRecords, long maxRecords) { Node node = AdminClient.this.getAdminClientCluster().getNodeById(nodeId); @@ -1681,7 +1674,6 @@ public Iterator>> fetchEntries(int nodeId, true, fetchMasterEntries, initialCluster, - skipRecords, maxRecords); } catch(IOException e) { helperOps.close(sands.getSocket()); @@ -1800,7 +1792,6 @@ public ByteArray computeNext() { * @param filter Custom filter implementation to filter out entries * which should not be fetched. * @param fetchMasterEntries Fetch a key only if master replica - * @param skipRecords Number of keys to skip * @return An iterator which allows keys to be streamed as they're being * iterated over. */ @@ -1809,7 +1800,6 @@ public Iterator fetchKeys(int nodeId, List partitionList, VoldemortFilter filter, boolean fetchMasterEntries, - long skipRecords, long maxRecords) { return fetchKeys(nodeId, storeName, @@ -1817,7 +1807,6 @@ public Iterator fetchKeys(int nodeId, filter, fetchMasterEntries, null, - skipRecords, maxRecords); } @@ -1840,7 +1829,7 @@ public Iterator fetchKeys(int nodeId, List partitionList, VoldemortFilter filter, boolean fetchMasterEntries) { - return fetchKeys(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0, 0); + return fetchKeys(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0); } /** @@ -1855,7 +1844,6 @@ public Iterator fetchKeys(int nodeId, * @param filter Custom filter * @param initialCluster Cluster to use for selecting a key. If null, * use the default metadata from the metadata store - * @param skipRecords Number of records to skip [ Used for sampling ] * @return Returns an iterator of the keys */ public Iterator fetchKeys(int nodeId, @@ -1864,7 +1852,6 @@ public Iterator fetchKeys(int nodeId, VoldemortFilter filter, boolean fetchMasterEntries, Cluster initialCluster, - long skipRecords, long maxRecords) { Node node = AdminClient.this.getAdminClientCluster().getNodeById(nodeId); final SocketDestination destination = new SocketDestination(node.getHost(), @@ -1882,7 +1869,6 @@ public Iterator fetchKeys(int nodeId, false, fetchMasterEntries, initialCluster, - skipRecords, maxRecords); } catch(IOException e) { helperOps.close(sands.getSocket()); diff --git a/src/java/voldemort/client/protocol/pb/VAdminProto.java b/src/java/voldemort/client/protocol/pb/VAdminProto.java index a67c5b387e..31b1e4b5e4 100644 --- a/src/java/voldemort/client/protocol/pb/VAdminProto.java +++ b/src/java/voldemort/client/protocol/pb/VAdminProto.java @@ -4486,12 +4486,12 @@ public voldemort.client.protocol.pb.VAdminProto.PartitionTuple getReplicaToParti public boolean hasFetchValues() { return hasFetchValues; } public boolean getFetchValues() { return fetchValues_; } - // optional int64 skip_records = 5; - public static final int SKIP_RECORDS_FIELD_NUMBER = 5; - private boolean hasSkipRecords; - private long skipRecords_ = 0L; - public boolean hasSkipRecords() { return hasSkipRecords; } - public long getSkipRecords() { return skipRecords_; } + // optional int64 OBSOLETE__DO_NOT_USE__skip_records = 5; + public static final int OBSOLETE__DO_NOT_USE__SKIP_RECORDS_FIELD_NUMBER = 5; + private boolean hasOBSOLETEDONOTUSESkipRecords; + private long oBSOLETEDONOTUSESkipRecords_ = 0L; + public boolean hasOBSOLETEDONOTUSESkipRecords() { return hasOBSOLETEDONOTUSESkipRecords; } + public long getOBSOLETEDONOTUSESkipRecords() { return oBSOLETEDONOTUSESkipRecords_; } // optional string initial_cluster = 6; public static final int INITIAL_CLUSTER_FIELD_NUMBER = 6; @@ -4543,8 +4543,8 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (hasFetchValues()) { output.writeBool(4, getFetchValues()); } - if (hasSkipRecords()) { - output.writeInt64(5, getSkipRecords()); + if (hasOBSOLETEDONOTUSESkipRecords()) { + output.writeInt64(5, getOBSOLETEDONOTUSESkipRecords()); } if (hasInitialCluster()) { output.writeString(6, getInitialCluster()); @@ -4580,9 +4580,9 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeBoolSize(4, getFetchValues()); } - if (hasSkipRecords()) { + if (hasOBSOLETEDONOTUSESkipRecords()) { size += com.google.protobuf.CodedOutputStream - .computeInt64Size(5, getSkipRecords()); + .computeInt64Size(5, getOBSOLETEDONOTUSESkipRecords()); } if (hasInitialCluster()) { size += com.google.protobuf.CodedOutputStream @@ -4773,8 +4773,8 @@ public Builder mergeFrom(voldemort.client.protocol.pb.VAdminProto.FetchPartition if (other.hasFetchValues()) { setFetchValues(other.getFetchValues()); } - if (other.hasSkipRecords()) { - setSkipRecords(other.getSkipRecords()); + if (other.hasOBSOLETEDONOTUSESkipRecords()) { + setOBSOLETEDONOTUSESkipRecords(other.getOBSOLETEDONOTUSESkipRecords()); } if (other.hasInitialCluster()) { setInitialCluster(other.getInitialCluster()); @@ -4834,7 +4834,7 @@ public Builder mergeFrom( break; } case 40: { - setSkipRecords(input.readInt64()); + setOBSOLETEDONOTUSESkipRecords(input.readInt64()); break; } case 50: { @@ -4981,21 +4981,21 @@ public Builder clearFetchValues() { return this; } - // optional int64 skip_records = 5; - public boolean hasSkipRecords() { - return result.hasSkipRecords(); + // optional int64 OBSOLETE__DO_NOT_USE__skip_records = 5; + public boolean hasOBSOLETEDONOTUSESkipRecords() { + return result.hasOBSOLETEDONOTUSESkipRecords(); } - public long getSkipRecords() { - return result.getSkipRecords(); + public long getOBSOLETEDONOTUSESkipRecords() { + return result.getOBSOLETEDONOTUSESkipRecords(); } - public Builder setSkipRecords(long value) { - result.hasSkipRecords = true; - result.skipRecords_ = value; + public Builder setOBSOLETEDONOTUSESkipRecords(long value) { + result.hasOBSOLETEDONOTUSESkipRecords = true; + result.oBSOLETEDONOTUSESkipRecords_ = value; return this; } - public Builder clearSkipRecords() { - result.hasSkipRecords = false; - result.skipRecords_ = 0L; + public Builder clearOBSOLETEDONOTUSESkipRecords() { + result.hasOBSOLETEDONOTUSESkipRecords = false; + result.oBSOLETEDONOTUSESkipRecords_ = 0L; return this; } @@ -23163,175 +23163,175 @@ public Builder clearReserveMemory() { "e\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"d\n\032Fe" + "tchPartitionFilesRequest\022\r\n\005store\030\001 \002(\t\022" + "7\n\024replica_to_partition\030\002 \003(\0132\031.voldemor" + - "t.PartitionTuple\"\204\002\n\034FetchPartitionEntri" + + "t.PartitionTuple\"\232\002\n\034FetchPartitionEntri" + "esRequest\0227\n\024replica_to_partition\030\001 \003(\0132" + "\031.voldemort.PartitionTuple\022\r\n\005store\030\002 \002(" + "\t\022*\n\006filter\030\003 \001(\0132\032.voldemort.VoldemortF", - "ilter\022\024\n\014fetch_values\030\004 \001(\010\022\024\n\014skip_reco" + - "rds\030\005 \001(\003\022\027\n\017initial_cluster\030\006 \001(\t\022\026\n\016fe" + - "tch_orphaned\030\007 \001(\010\022\023\n\013max_records\030\010 \001(\003\"" + - "\201\001\n\035FetchPartitionEntriesResponse\0222\n\017par" + - "tition_entry\030\001 \001(\0132\031.voldemort.Partition" + - "Entry\022\013\n\003key\030\002 \001(\014\022\037\n\005error\030\003 \001(\0132\020.vold" + - "emort.Error\"\254\001\n\035DeletePartitionEntriesRe" + - "quest\022\r\n\005store\030\001 \002(\t\0227\n\024replica_to_parti" + - "tion\030\002 \003(\0132\031.voldemort.PartitionTuple\022*\n" + - "\006filter\030\003 \001(\0132\032.voldemort.VoldemortFilte", - "r\022\027\n\017initial_cluster\030\004 \001(\t\"P\n\036DeletePart" + - "itionEntriesResponse\022\r\n\005count\030\001 \001(\003\022\037\n\005e" + - "rror\030\002 \001(\0132\020.voldemort.Error\"\317\001\n\035Initiat" + - "eFetchAndUpdateRequest\022\017\n\007node_id\030\001 \002(\005\022" + - "\r\n\005store\030\002 \002(\t\022*\n\006filter\030\003 \001(\0132\032.voldemo" + - "rt.VoldemortFilter\0227\n\024replica_to_partiti" + - "on\030\004 \003(\0132\031.voldemort.PartitionTuple\022\027\n\017i" + - "nitial_cluster\030\005 \001(\t\022\020\n\010optimize\030\006 \001(\010\"1" + - "\n\033AsyncOperationStatusRequest\022\022\n\nrequest" + - "_id\030\001 \002(\005\"/\n\031AsyncOperationStopRequest\022\022", - "\n\nrequest_id\030\001 \002(\005\"=\n\032AsyncOperationStop" + - "Response\022\037\n\005error\030\001 \001(\0132\020.voldemort.Erro" + - "r\"2\n\031AsyncOperationListRequest\022\025\n\rshow_c" + - "omplete\030\002 \002(\010\"R\n\032AsyncOperationListRespo" + - "nse\022\023\n\013request_ids\030\001 \003(\005\022\037\n\005error\030\002 \001(\0132" + - "\020.voldemort.Error\":\n\016PartitionTuple\022\024\n\014r" + - "eplica_type\030\001 \002(\005\022\022\n\npartitions\030\002 \003(\005\"e\n" + - "\026PerStorePartitionTuple\022\022\n\nstore_name\030\001 " + - "\002(\t\0227\n\024replica_to_partition\030\002 \003(\0132\031.vold" + - "emort.PartitionTuple\"\370\001\n\031RebalancePartit", - "ionInfoMap\022\022\n\nstealer_id\030\001 \002(\005\022\020\n\010donor_" + - "id\030\002 \002(\005\022\017\n\007attempt\030\003 \002(\005\022C\n\030replica_to_" + - "add_partition\030\004 \003(\0132!.voldemort.PerStore" + - "PartitionTuple\022F\n\033replica_to_delete_part" + - "ition\030\005 \003(\0132!.voldemort.PerStorePartitio" + - "nTuple\022\027\n\017initial_cluster\030\006 \002(\t\"f\n\034Initi" + - "ateRebalanceNodeRequest\022F\n\030rebalance_par" + - "tition_info\030\001 \002(\0132$.voldemort.RebalanceP" + - "artitionInfoMap\"m\n#InitiateRebalanceNode" + - "OnDonorRequest\022F\n\030rebalance_partition_in", - "fo\030\001 \003(\0132$.voldemort.RebalancePartitionI" + - "nfoMap\"\212\001\n\034AsyncOperationStatusResponse\022" + - "\022\n\nrequest_id\030\001 \001(\005\022\023\n\013description\030\002 \001(\t" + - "\022\016\n\006status\030\003 \001(\t\022\020\n\010complete\030\004 \001(\010\022\037\n\005er" + - "ror\030\005 \001(\0132\020.voldemort.Error\"\'\n\026TruncateE" + - "ntriesRequest\022\r\n\005store\030\001 \002(\t\":\n\027Truncate" + - "EntriesResponse\022\037\n\005error\030\001 \001(\0132\020.voldemo" + - "rt.Error\"*\n\017AddStoreRequest\022\027\n\017storeDefi" + - "nition\030\001 \002(\t\"3\n\020AddStoreResponse\022\037\n\005erro" + - "r\030\001 \001(\0132\020.voldemort.Error\"\'\n\022DeleteStore", - "Request\022\021\n\tstoreName\030\001 \002(\t\"6\n\023DeleteStor" + - "eResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Err" + - "or\"P\n\021FetchStoreRequest\022\022\n\nstore_name\030\001 " + - "\002(\t\022\021\n\tstore_dir\030\002 \002(\t\022\024\n\014push_version\030\003" + - " \001(\003\"9\n\020SwapStoreRequest\022\022\n\nstore_name\030\001" + - " \002(\t\022\021\n\tstore_dir\030\002 \002(\t\"P\n\021SwapStoreResp" + - "onse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\022\032\n" + - "\022previous_store_dir\030\002 \001(\t\"@\n\024RollbackSto" + - "reRequest\022\022\n\nstore_name\030\001 \002(\t\022\024\n\014push_ve" + - "rsion\030\002 \002(\003\"8\n\025RollbackStoreResponse\022\037\n\005", - "error\030\001 \001(\0132\020.voldemort.Error\"&\n\020RepairJ" + - "obRequest\022\022\n\nstore_name\030\001 \001(\t\"4\n\021RepairJ" + - "obResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Er" + - "ror\"=\n\024ROStoreVersionDirMap\022\022\n\nstore_nam" + - "e\030\001 \002(\t\022\021\n\tstore_dir\030\002 \002(\t\"/\n\031GetROMaxVe" + - "rsionDirRequest\022\022\n\nstore_name\030\001 \003(\t\"y\n\032G" + - "etROMaxVersionDirResponse\022:\n\021ro_store_ve" + - "rsions\030\001 \003(\0132\037.voldemort.ROStoreVersionD" + - "irMap\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error\"3" + - "\n\035GetROCurrentVersionDirRequest\022\022\n\nstore", - "_name\030\001 \003(\t\"}\n\036GetROCurrentVersionDirRes" + - "ponse\022:\n\021ro_store_versions\030\001 \003(\0132\037.volde" + - "mort.ROStoreVersionDirMap\022\037\n\005error\030\002 \001(\013" + - "2\020.voldemort.Error\"/\n\031GetROStorageFormat" + - "Request\022\022\n\nstore_name\030\001 \003(\t\"y\n\032GetROStor" + - "ageFormatResponse\022:\n\021ro_store_versions\030\001" + - " \003(\0132\037.voldemort.ROStoreVersionDirMap\022\037\n" + - "\005error\030\002 \001(\0132\020.voldemort.Error\"@\n\027Failed" + - "FetchStoreRequest\022\022\n\nstore_name\030\001 \002(\t\022\021\n" + - "\tstore_dir\030\002 \002(\t\";\n\030FailedFetchStoreResp", - "onse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"\346\001" + - "\n\033RebalanceStateChangeRequest\022K\n\035rebalan" + - "ce_partition_info_list\030\001 \003(\0132$.voldemort" + - ".RebalancePartitionInfoMap\022\026\n\016cluster_st" + - "ring\030\002 \002(\t\022\017\n\007swap_ro\030\003 \002(\010\022\037\n\027change_cl" + - "uster_metadata\030\004 \002(\010\022\036\n\026change_rebalance" + - "_state\030\005 \002(\010\022\020\n\010rollback\030\006 \002(\010\"?\n\034Rebala" + - "nceStateChangeResponse\022\037\n\005error\030\001 \001(\0132\020." + - "voldemort.Error\"G\n DeleteStoreRebalanceS" + - "tateRequest\022\022\n\nstore_name\030\001 \002(\t\022\017\n\007node_", - "id\030\002 \002(\005\"D\n!DeleteStoreRebalanceStateRes" + - "ponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"h" + - "\n\023NativeBackupRequest\022\022\n\nstore_name\030\001 \002(" + - "\t\022\022\n\nbackup_dir\030\002 \002(\t\022\024\n\014verify_files\030\003 " + - "\002(\010\022\023\n\013incremental\030\004 \002(\010\">\n\024ReserveMemor" + - "yRequest\022\022\n\nstore_name\030\001 \002(\t\022\022\n\nsize_in_" + - "mb\030\002 \002(\003\"8\n\025ReserveMemoryResponse\022\037\n\005err" + - "or\030\001 \001(\0132\020.voldemort.Error\"\360\016\n\025Voldemort" + - "AdminRequest\022)\n\004type\030\001 \002(\0162\033.voldemort.A" + - "dminRequestType\0223\n\014get_metadata\030\002 \001(\0132\035.", - "voldemort.GetMetadataRequest\0229\n\017update_m" + - "etadata\030\003 \001(\0132 .voldemort.UpdateMetadata" + - "Request\022J\n\030update_partition_entries\030\004 \001(" + - "\0132(.voldemort.UpdatePartitionEntriesRequ" + - "est\022H\n\027fetch_partition_entries\030\005 \001(\0132\'.v" + - "oldemort.FetchPartitionEntriesRequest\022J\n" + - "\030delete_partition_entries\030\006 \001(\0132(.voldem" + - "ort.DeletePartitionEntriesRequest\022K\n\031ini" + - "tiate_fetch_and_update\030\007 \001(\0132(.voldemort" + - ".InitiateFetchAndUpdateRequest\022F\n\026async_", - "operation_status\030\010 \001(\0132&.voldemort.Async" + - "OperationStatusRequest\022H\n\027initiate_rebal" + - "ance_node\030\t \001(\0132\'.voldemort.InitiateReba" + - "lanceNodeRequest\022B\n\024async_operation_stop" + - "\030\n \001(\0132$.voldemort.AsyncOperationStopReq" + - "uest\022B\n\024async_operation_list\030\013 \001(\0132$.vol" + - "demort.AsyncOperationListRequest\022;\n\020trun" + - "cate_entries\030\014 \001(\0132!.voldemort.TruncateE" + - "ntriesRequest\022-\n\tadd_store\030\r \001(\0132\032.volde" + - "mort.AddStoreRequest\0223\n\014delete_store\030\016 \001", - "(\0132\035.voldemort.DeleteStoreRequest\0221\n\013fet" + - "ch_store\030\017 \001(\0132\034.voldemort.FetchStoreReq" + - "uest\022/\n\nswap_store\030\020 \001(\0132\033.voldemort.Swa" + - "pStoreRequest\0227\n\016rollback_store\030\021 \001(\0132\037." + - "voldemort.RollbackStoreRequest\022D\n\026get_ro" + - "_max_version_dir\030\022 \001(\0132$.voldemort.GetRO" + - "MaxVersionDirRequest\022L\n\032get_ro_current_v" + - "ersion_dir\030\023 \001(\0132(.voldemort.GetROCurren" + - "tVersionDirRequest\022D\n\025fetch_partition_fi" + - "les\030\024 \001(\0132%.voldemort.FetchPartitionFile", - "sRequest\022@\n\023update_slop_entries\030\026 \001(\0132#." + - "voldemort.UpdateSlopEntriesRequest\022>\n\022fa" + - "iled_fetch_store\030\030 \001(\0132\".voldemort.Faile" + - "dFetchStoreRequest\022C\n\025get_ro_storage_for" + - "mat\030\031 \001(\0132$.voldemort.GetROStorageFormat" + - "Request\022F\n\026rebalance_state_change\030\032 \001(\0132" + - "&.voldemort.RebalanceStateChangeRequest\022" + - "/\n\nrepair_job\030\033 \001(\0132\033.voldemort.RepairJo" + - "bRequest\022X\n initiate_rebalance_node_on_d" + - "onor\030\034 \001(\0132..voldemort.InitiateRebalance", - "NodeOnDonorRequest\022Q\n\034delete_store_rebal" + - "ance_state\030\035 \001(\0132+.voldemort.DeleteStore" + - "RebalanceStateRequest\0225\n\rnative_backup\030\036" + - " \001(\0132\036.voldemort.NativeBackupRequest\0227\n\016" + - "reserve_memory\030\037 \001(\0132\037.voldemort.Reserve" + - "MemoryRequest*\310\005\n\020AdminRequestType\022\020\n\014GE" + - "T_METADATA\020\000\022\023\n\017UPDATE_METADATA\020\001\022\034\n\030UPD" + - "ATE_PARTITION_ENTRIES\020\002\022\033\n\027FETCH_PARTITI" + - "ON_ENTRIES\020\003\022\034\n\030DELETE_PARTITION_ENTRIES" + - "\020\004\022\035\n\031INITIATE_FETCH_AND_UPDATE\020\005\022\032\n\026ASY", - "NC_OPERATION_STATUS\020\006\022\033\n\027INITIATE_REBALA" + - "NCE_NODE\020\007\022\030\n\024ASYNC_OPERATION_STOP\020\010\022\030\n\024" + - "ASYNC_OPERATION_LIST\020\t\022\024\n\020TRUNCATE_ENTRI" + - "ES\020\n\022\r\n\tADD_STORE\020\013\022\020\n\014DELETE_STORE\020\014\022\017\n" + - "\013FETCH_STORE\020\r\022\016\n\nSWAP_STORE\020\016\022\022\n\016ROLLBA" + - "CK_STORE\020\017\022\032\n\026GET_RO_MAX_VERSION_DIR\020\020\022\036" + - "\n\032GET_RO_CURRENT_VERSION_DIR\020\021\022\031\n\025FETCH_" + - "PARTITION_FILES\020\022\022\027\n\023UPDATE_SLOP_ENTRIES" + - "\020\024\022\026\n\022FAILED_FETCH_STORE\020\026\022\031\n\025GET_RO_STO" + - "RAGE_FORMAT\020\027\022\032\n\026REBALANCE_STATE_CHANGE\020", - "\030\022\016\n\nREPAIR_JOB\020\031\022$\n INITIATE_REBALANCE_" + - "NODE_ON_DONOR\020\032\022 \n\034DELETE_STORE_REBALANC" + - "E_STATE\020\033\022\021\n\rNATIVE_BACKUP\020\034\022\022\n\016RESERVE_" + - "MEMORY\020\035B-\n\034voldemort.client.protocol.pb" + - "B\013VAdminProtoH\001" + "ilter\022\024\n\014fetch_values\030\004 \001(\010\022*\n\"OBSOLETE_" + + "_DO_NOT_USE__skip_records\030\005 \001(\003\022\027\n\017initi" + + "al_cluster\030\006 \001(\t\022\026\n\016fetch_orphaned\030\007 \001(\010" + + "\022\023\n\013max_records\030\010 \001(\003\"\201\001\n\035FetchPartition" + + "EntriesResponse\0222\n\017partition_entry\030\001 \001(\013" + + "2\031.voldemort.PartitionEntry\022\013\n\003key\030\002 \001(\014" + + "\022\037\n\005error\030\003 \001(\0132\020.voldemort.Error\"\254\001\n\035De" + + "letePartitionEntriesRequest\022\r\n\005store\030\001 \002" + + "(\t\0227\n\024replica_to_partition\030\002 \003(\0132\031.volde" + + "mort.PartitionTuple\022*\n\006filter\030\003 \001(\0132\032.vo", + "ldemort.VoldemortFilter\022\027\n\017initial_clust" + + "er\030\004 \001(\t\"P\n\036DeletePartitionEntriesRespon" + + "se\022\r\n\005count\030\001 \001(\003\022\037\n\005error\030\002 \001(\0132\020.volde" + + "mort.Error\"\317\001\n\035InitiateFetchAndUpdateReq" + + "uest\022\017\n\007node_id\030\001 \002(\005\022\r\n\005store\030\002 \002(\t\022*\n\006" + + "filter\030\003 \001(\0132\032.voldemort.VoldemortFilter" + + "\0227\n\024replica_to_partition\030\004 \003(\0132\031.voldemo" + + "rt.PartitionTuple\022\027\n\017initial_cluster\030\005 \001" + + "(\t\022\020\n\010optimize\030\006 \001(\010\"1\n\033AsyncOperationSt" + + "atusRequest\022\022\n\nrequest_id\030\001 \002(\005\"/\n\031Async", + "OperationStopRequest\022\022\n\nrequest_id\030\001 \002(\005" + + "\"=\n\032AsyncOperationStopResponse\022\037\n\005error\030" + + "\001 \001(\0132\020.voldemort.Error\"2\n\031AsyncOperatio" + + "nListRequest\022\025\n\rshow_complete\030\002 \002(\010\"R\n\032A" + + "syncOperationListResponse\022\023\n\013request_ids" + + "\030\001 \003(\005\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error\"" + + ":\n\016PartitionTuple\022\024\n\014replica_type\030\001 \002(\005\022" + + "\022\n\npartitions\030\002 \003(\005\"e\n\026PerStorePartition" + + "Tuple\022\022\n\nstore_name\030\001 \002(\t\0227\n\024replica_to_" + + "partition\030\002 \003(\0132\031.voldemort.PartitionTup", + "le\"\370\001\n\031RebalancePartitionInfoMap\022\022\n\nstea" + + "ler_id\030\001 \002(\005\022\020\n\010donor_id\030\002 \002(\005\022\017\n\007attemp" + + "t\030\003 \002(\005\022C\n\030replica_to_add_partition\030\004 \003(" + + "\0132!.voldemort.PerStorePartitionTuple\022F\n\033" + + "replica_to_delete_partition\030\005 \003(\0132!.vold" + + "emort.PerStorePartitionTuple\022\027\n\017initial_" + + "cluster\030\006 \002(\t\"f\n\034InitiateRebalanceNodeRe" + + "quest\022F\n\030rebalance_partition_info\030\001 \002(\0132" + + "$.voldemort.RebalancePartitionInfoMap\"m\n" + + "#InitiateRebalanceNodeOnDonorRequest\022F\n\030", + "rebalance_partition_info\030\001 \003(\0132$.voldemo" + + "rt.RebalancePartitionInfoMap\"\212\001\n\034AsyncOp" + + "erationStatusResponse\022\022\n\nrequest_id\030\001 \001(" + + "\005\022\023\n\013description\030\002 \001(\t\022\016\n\006status\030\003 \001(\t\022\020" + + "\n\010complete\030\004 \001(\010\022\037\n\005error\030\005 \001(\0132\020.voldem" + + "ort.Error\"\'\n\026TruncateEntriesRequest\022\r\n\005s" + + "tore\030\001 \002(\t\":\n\027TruncateEntriesResponse\022\037\n" + + "\005error\030\001 \001(\0132\020.voldemort.Error\"*\n\017AddSto" + + "reRequest\022\027\n\017storeDefinition\030\001 \002(\t\"3\n\020Ad" + + "dStoreResponse\022\037\n\005error\030\001 \001(\0132\020.voldemor", + "t.Error\"\'\n\022DeleteStoreRequest\022\021\n\tstoreNa" + + "me\030\001 \002(\t\"6\n\023DeleteStoreResponse\022\037\n\005error" + + "\030\001 \001(\0132\020.voldemort.Error\"P\n\021FetchStoreRe" + + "quest\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030\002" + + " \002(\t\022\024\n\014push_version\030\003 \001(\003\"9\n\020SwapStoreR" + + "equest\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030" + + "\002 \002(\t\"P\n\021SwapStoreResponse\022\037\n\005error\030\001 \001(" + + "\0132\020.voldemort.Error\022\032\n\022previous_store_di" + + "r\030\002 \001(\t\"@\n\024RollbackStoreRequest\022\022\n\nstore" + + "_name\030\001 \002(\t\022\024\n\014push_version\030\002 \002(\003\"8\n\025Rol", + "lbackStoreResponse\022\037\n\005error\030\001 \001(\0132\020.vold" + + "emort.Error\"&\n\020RepairJobRequest\022\022\n\nstore" + + "_name\030\001 \001(\t\"4\n\021RepairJobResponse\022\037\n\005erro" + + "r\030\001 \001(\0132\020.voldemort.Error\"=\n\024ROStoreVers" + + "ionDirMap\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_d" + + "ir\030\002 \002(\t\"/\n\031GetROMaxVersionDirRequest\022\022\n" + + "\nstore_name\030\001 \003(\t\"y\n\032GetROMaxVersionDirR" + + "esponse\022:\n\021ro_store_versions\030\001 \003(\0132\037.vol" + + "demort.ROStoreVersionDirMap\022\037\n\005error\030\002 \001" + + "(\0132\020.voldemort.Error\"3\n\035GetROCurrentVers", + "ionDirRequest\022\022\n\nstore_name\030\001 \003(\t\"}\n\036Get" + + "ROCurrentVersionDirResponse\022:\n\021ro_store_" + + "versions\030\001 \003(\0132\037.voldemort.ROStoreVersio" + + "nDirMap\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error" + + "\"/\n\031GetROStorageFormatRequest\022\022\n\nstore_n" + + "ame\030\001 \003(\t\"y\n\032GetROStorageFormatResponse\022" + + ":\n\021ro_store_versions\030\001 \003(\0132\037.voldemort.R" + + "OStoreVersionDirMap\022\037\n\005error\030\002 \001(\0132\020.vol" + + "demort.Error\"@\n\027FailedFetchStoreRequest\022" + + "\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030\002 \002(\t\";", + "\n\030FailedFetchStoreResponse\022\037\n\005error\030\001 \001(" + + "\0132\020.voldemort.Error\"\346\001\n\033RebalanceStateCh" + + "angeRequest\022K\n\035rebalance_partition_info_" + + "list\030\001 \003(\0132$.voldemort.RebalancePartitio" + + "nInfoMap\022\026\n\016cluster_string\030\002 \002(\t\022\017\n\007swap" + + "_ro\030\003 \002(\010\022\037\n\027change_cluster_metadata\030\004 \002" + + "(\010\022\036\n\026change_rebalance_state\030\005 \002(\010\022\020\n\010ro" + + "llback\030\006 \002(\010\"?\n\034RebalanceStateChangeResp" + + "onse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"G\n" + + " DeleteStoreRebalanceStateRequest\022\022\n\nsto", + "re_name\030\001 \002(\t\022\017\n\007node_id\030\002 \002(\005\"D\n!Delete" + + "StoreRebalanceStateResponse\022\037\n\005error\030\001 \001" + + "(\0132\020.voldemort.Error\"h\n\023NativeBackupRequ" + + "est\022\022\n\nstore_name\030\001 \002(\t\022\022\n\nbackup_dir\030\002 " + + "\002(\t\022\024\n\014verify_files\030\003 \002(\010\022\023\n\013incremental" + + "\030\004 \002(\010\">\n\024ReserveMemoryRequest\022\022\n\nstore_" + + "name\030\001 \002(\t\022\022\n\nsize_in_mb\030\002 \002(\003\"8\n\025Reserv" + + "eMemoryResponse\022\037\n\005error\030\001 \001(\0132\020.voldemo" + + "rt.Error\"\360\016\n\025VoldemortAdminRequest\022)\n\004ty" + + "pe\030\001 \002(\0162\033.voldemort.AdminRequestType\0223\n", + "\014get_metadata\030\002 \001(\0132\035.voldemort.GetMetad" + + "ataRequest\0229\n\017update_metadata\030\003 \001(\0132 .vo" + + "ldemort.UpdateMetadataRequest\022J\n\030update_" + + "partition_entries\030\004 \001(\0132(.voldemort.Upda" + + "tePartitionEntriesRequest\022H\n\027fetch_parti" + + "tion_entries\030\005 \001(\0132\'.voldemort.FetchPart" + + "itionEntriesRequest\022J\n\030delete_partition_" + + "entries\030\006 \001(\0132(.voldemort.DeletePartitio" + + "nEntriesRequest\022K\n\031initiate_fetch_and_up" + + "date\030\007 \001(\0132(.voldemort.InitiateFetchAndU", + "pdateRequest\022F\n\026async_operation_status\030\010" + + " \001(\0132&.voldemort.AsyncOperationStatusReq" + + "uest\022H\n\027initiate_rebalance_node\030\t \001(\0132\'." + + "voldemort.InitiateRebalanceNodeRequest\022B" + + "\n\024async_operation_stop\030\n \001(\0132$.voldemort" + + ".AsyncOperationStopRequest\022B\n\024async_oper" + + "ation_list\030\013 \001(\0132$.voldemort.AsyncOperat" + + "ionListRequest\022;\n\020truncate_entries\030\014 \001(\013" + + "2!.voldemort.TruncateEntriesRequest\022-\n\ta" + + "dd_store\030\r \001(\0132\032.voldemort.AddStoreReque", + "st\0223\n\014delete_store\030\016 \001(\0132\035.voldemort.Del" + + "eteStoreRequest\0221\n\013fetch_store\030\017 \001(\0132\034.v" + + "oldemort.FetchStoreRequest\022/\n\nswap_store" + + "\030\020 \001(\0132\033.voldemort.SwapStoreRequest\0227\n\016r" + + "ollback_store\030\021 \001(\0132\037.voldemort.Rollback" + + "StoreRequest\022D\n\026get_ro_max_version_dir\030\022" + + " \001(\0132$.voldemort.GetROMaxVersionDirReque" + + "st\022L\n\032get_ro_current_version_dir\030\023 \001(\0132(" + + ".voldemort.GetROCurrentVersionDirRequest" + + "\022D\n\025fetch_partition_files\030\024 \001(\0132%.voldem", + "ort.FetchPartitionFilesRequest\022@\n\023update" + + "_slop_entries\030\026 \001(\0132#.voldemort.UpdateSl" + + "opEntriesRequest\022>\n\022failed_fetch_store\030\030" + + " \001(\0132\".voldemort.FailedFetchStoreRequest" + + "\022C\n\025get_ro_storage_format\030\031 \001(\0132$.voldem" + + "ort.GetROStorageFormatRequest\022F\n\026rebalan" + + "ce_state_change\030\032 \001(\0132&.voldemort.Rebala" + + "nceStateChangeRequest\022/\n\nrepair_job\030\033 \001(" + + "\0132\033.voldemort.RepairJobRequest\022X\n initia" + + "te_rebalance_node_on_donor\030\034 \001(\0132..volde", + "mort.InitiateRebalanceNodeOnDonorRequest" + + "\022Q\n\034delete_store_rebalance_state\030\035 \001(\0132+" + + ".voldemort.DeleteStoreRebalanceStateRequ" + + "est\0225\n\rnative_backup\030\036 \001(\0132\036.voldemort.N" + + "ativeBackupRequest\0227\n\016reserve_memory\030\037 \001" + + "(\0132\037.voldemort.ReserveMemoryRequest*\310\005\n\020" + + "AdminRequestType\022\020\n\014GET_METADATA\020\000\022\023\n\017UP" + + "DATE_METADATA\020\001\022\034\n\030UPDATE_PARTITION_ENTR" + + "IES\020\002\022\033\n\027FETCH_PARTITION_ENTRIES\020\003\022\034\n\030DE" + + "LETE_PARTITION_ENTRIES\020\004\022\035\n\031INITIATE_FET", + "CH_AND_UPDATE\020\005\022\032\n\026ASYNC_OPERATION_STATU" + + "S\020\006\022\033\n\027INITIATE_REBALANCE_NODE\020\007\022\030\n\024ASYN" + + "C_OPERATION_STOP\020\010\022\030\n\024ASYNC_OPERATION_LI" + + "ST\020\t\022\024\n\020TRUNCATE_ENTRIES\020\n\022\r\n\tADD_STORE\020" + + "\013\022\020\n\014DELETE_STORE\020\014\022\017\n\013FETCH_STORE\020\r\022\016\n\n" + + "SWAP_STORE\020\016\022\022\n\016ROLLBACK_STORE\020\017\022\032\n\026GET_" + + "RO_MAX_VERSION_DIR\020\020\022\036\n\032GET_RO_CURRENT_V" + + "ERSION_DIR\020\021\022\031\n\025FETCH_PARTITION_FILES\020\022\022" + + "\027\n\023UPDATE_SLOP_ENTRIES\020\024\022\026\n\022FAILED_FETCH" + + "_STORE\020\026\022\031\n\025GET_RO_STORAGE_FORMAT\020\027\022\032\n\026R", + "EBALANCE_STATE_CHANGE\020\030\022\016\n\nREPAIR_JOB\020\031\022" + + "$\n INITIATE_REBALANCE_NODE_ON_DONOR\020\032\022 \n" + + "\034DELETE_STORE_REBALANCE_STATE\020\033\022\021\n\rNATIV" + + "E_BACKUP\020\034\022\022\n\016RESERVE_MEMORY\020\035B-\n\034voldem" + + "ort.client.protocol.pbB\013VAdminProtoH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -23439,7 +23439,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_voldemort_FetchPartitionEntriesRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_voldemort_FetchPartitionEntriesRequest_descriptor, - new java.lang.String[] { "ReplicaToPartition", "Store", "Filter", "FetchValues", "SkipRecords", "InitialCluster", "FetchOrphaned", "MaxRecords", }, + new java.lang.String[] { "ReplicaToPartition", "Store", "Filter", "FetchValues", "OBSOLETEDONOTUSESkipRecords", "InitialCluster", "FetchOrphaned", "MaxRecords", }, voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest.class, voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest.Builder.class); internal_static_voldemort_FetchPartitionEntriesResponse_descriptor = diff --git a/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java b/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java index 556ab55434..8a70b3c8b6 100644 --- a/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java @@ -974,7 +974,6 @@ public void operate() { filter, false, initialCluster, - 0, 0); long numTuples = 0; long startTime = System.currentTimeMillis(); diff --git a/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java index a8e1aca050..f8637c9041 100644 --- a/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java @@ -87,7 +87,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, key.get(), replicaToPartitionList, initialCluster, - storeDef) && counter % skipRecords == 0) { + storeDef)) { entryAccepted = true; } } else { @@ -142,7 +142,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, + " s"); } - if(keyIterator.hasNext() && counter < maxRecords * skipRecords) + if(keyIterator.hasNext() && counter < maxRecords) return StreamRequestHandlerState.WRITING; else { return StreamRequestHandlerState.COMPLETE; diff --git a/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java index c43d51aba3..460f7e6190 100644 --- a/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java @@ -57,6 +57,7 @@ public FetchKeysStreamRequestHandler(FetchPartitionEntriesRequest request, + "' with replica to partition mapping " + replicaToPartitionList); } + @Override public StreamRequestHandlerState handleRequest(DataInputStream inputStream, DataOutputStream outputStream) throws IOException { @@ -73,14 +74,11 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, throttler.maybeThrottle(key.length()); boolean keyAccepted = false; if(!fetchOrphaned) { - // normal code path if(StoreInstance.checkKeyBelongsToPartition(nodeId, key.get(), replicaToPartitionList, initialCluster, - storeDef) - && filter.accept(key, null) - && counter % skipRecords == 0) { + storeDef) && filter.accept(key, null)) { keyAccepted = true; } @@ -100,8 +98,13 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, startNs = System.nanoTime(); ProtoUtils.writeMessage(outputStream, message); - if(streamStats != null) + if(streamStats != null) { + // TODO: The accounting for streaming reads should also + // move along with the next() call since we are indeed + // fetching from disk.. ---VChandar + streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); + } } // log progress @@ -116,7 +119,17 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, + " s"); } - if(keyIterator.hasNext() && counter < maxRecords * skipRecords) + // TODO: make usage clearer. Rename maxRecords to recordsPerPartition. + // And, make recordsPerPartition <=0 mean 'get them all'. + + // TODO: Remove skipRecords from message and from code. + + // TODO: Make sure the distinction between FetchKeysStream and + // FetchPartitionKeysStream is clear. + + // TODO: Add logic to FetchKeys and FetchEntries to account for keys per + // partition. + if(keyIterator.hasNext() && (counter < maxRecords)) return StreamRequestHandlerState.WRITING; else { return StreamRequestHandlerState.COMPLETE; diff --git a/src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java index 3a4bbd1bc2..58fde9efc3 100644 --- a/src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchPartitionEntriesStreamRequestHandler.java @@ -92,7 +92,8 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, // process the next partition if(entriesPartitionIterator == null) { - if(currentIndex == partitionList.size() || counter >= maxRecords * skipRecords) { + if(currentIndex == partitionList.size() || counter >= maxRecords) { + // TODO: Make .info consistent logger.info("Done fetching store " + storageEngine.getName() + " : " + counter + " records processed."); return StreamRequestHandlerState.COMPLETE; @@ -127,39 +128,33 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, counter++; Pair> entry = entriesPartitionIterator.next(); - // honor skipRecords - if(counter % skipRecords == 0) { - // do the filtering - if(streamStats != null) { - streamStats.reportStorageTime(operation, System.nanoTime() - startNs); - streamStats.reportStreamingScan(operation); - } - ByteArray key = entry.getFirst(); - Versioned value = entry.getSecond(); - - throttler.maybeThrottle(key.length()); - if(filter.accept(key, value)) { - fetched++; - if(streamStats != null) - streamStats.reportStreamingFetch(operation); - VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); - - VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder() - .setKey(ProtoUtils.encodeBytes(key)) - .setVersioned(ProtoUtils.encodeVersioned(value)) - .build(); - response.setPartitionEntry(partitionEntry); - Message message = response.build(); - - startNs = System.nanoTime(); - ProtoUtils.writeMessage(outputStream, message); - if(streamStats != null) - streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); - throttler.maybeThrottle(AdminServiceRequestHandler.valueSize(value)); - } - } else { + // do the filtering + if(streamStats != null) { + streamStats.reportStorageTime(operation, System.nanoTime() - startNs); + streamStats.reportStreamingScan(operation); + } + ByteArray key = entry.getFirst(); + Versioned value = entry.getSecond(); + + throttler.maybeThrottle(key.length()); + if(filter.accept(key, value)) { + fetched++; + if(streamStats != null) + streamStats.reportStreamingFetch(operation); + VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); + + VAdminProto.PartitionEntry partitionEntry = VAdminProto.PartitionEntry.newBuilder() + .setKey(ProtoUtils.encodeBytes(key)) + .setVersioned(ProtoUtils.encodeVersioned(value)) + .build(); + response.setPartitionEntry(partitionEntry); + Message message = response.build(); + + startNs = System.nanoTime(); + ProtoUtils.writeMessage(outputStream, message); if(streamStats != null) - streamStats.reportStorageTime(operation, System.nanoTime() - startNs); + streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); + throttler.maybeThrottle(AdminServiceRequestHandler.valueSize(value)); } // log progress @@ -174,7 +169,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, } // reset the iterator if done with this partition - if(!entriesPartitionIterator.hasNext() || counter >= maxRecords * skipRecords) { + if(!entriesPartitionIterator.hasNext() || counter >= maxRecords) { entriesPartitionIterator.close(); entriesPartitionIterator = null; } diff --git a/src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java index 3ad3f0fbec..a3f2e2cbbe 100644 --- a/src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchPartitionKeysStreamRequestHandler.java @@ -84,13 +84,16 @@ public FetchPartitionKeysStreamRequestHandler(FetchPartitionEntriesRequest reque } } + @Override public StreamRequestHandlerState handleRequest(DataInputStream inputStream, DataOutputStream outputStream) throws IOException { // process the next partition if(keysPartitionIterator == null) { - if(currentIndex == partitionList.size() || counter >= maxRecords * skipRecords) { + if(currentIndex == partitionList.size() || counter >= maxRecords) { + // TODO: Make all .info messages consistent. "Records fetched" + // instead of "Done fetching". logger.info("Done fetching store " + storageEngine.getName() + " : " + counter + " records processed."); return StreamRequestHandlerState.COMPLETE; @@ -125,32 +128,29 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, counter++; ByteArray key = keysPartitionIterator.next(); - // honor skipRecords - if(counter % skipRecords == 0) { - // do the filtering - if(streamStats != null) { - streamStats.reportStorageTime(operation, System.nanoTime() - startNs); - streamStats.reportStreamingScan(operation); - } - throttler.maybeThrottle(key.length()); - if(filter.accept(key, null)) { - - VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); - response.setKey(ProtoUtils.encodeBytes(key)); - - fetched++; - if(streamStats != null) - streamStats.reportStreamingFetch(operation); - Message message = response.build(); - - startNs = System.nanoTime(); - ProtoUtils.writeMessage(outputStream, message); - if(streamStats != null) - streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); - } - } else { + // do the filtering + if(streamStats != null) { + // TODO: The accounting for streaming reads should also + // move along with the next() call since we are indeed + // fetching from disk.. ---VChandar + streamStats.reportStorageTime(operation, System.nanoTime() - startNs); + streamStats.reportStreamingScan(operation); + } + throttler.maybeThrottle(key.length()); + if(filter.accept(key, null)) { + + VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); + response.setKey(ProtoUtils.encodeBytes(key)); + + fetched++; + if(streamStats != null) + streamStats.reportStreamingFetch(operation); + Message message = response.build(); + + startNs = System.nanoTime(); + ProtoUtils.writeMessage(outputStream, message); if(streamStats != null) - streamStats.reportStorageTime(operation, System.nanoTime() - startNs); + streamStats.reportNetworkTime(operation, System.nanoTime() - startNs); } // log progress @@ -166,7 +166,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, // reset the iterator if done with this partition or fetched enough // records - if(!keysPartitionIterator.hasNext() || counter >= maxRecords * skipRecords) { + if(!keysPartitionIterator.hasNext() || (counter >= maxRecords)) { keysPartitionIterator.close(); keysPartitionIterator = null; } diff --git a/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java index ff8f0139b6..196e105675 100644 --- a/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java @@ -68,8 +68,6 @@ public abstract class FetchStreamRequestHandler implements StreamRequestHandler protected long counter; - protected long skipRecords; - protected long maxRecords; protected int fetched; @@ -122,10 +120,8 @@ protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest req this.startTime = System.currentTimeMillis(); this.counter = 0; - this.skipRecords = 1; - if(request.hasSkipRecords() && request.getSkipRecords() >= 0) { - this.skipRecords = request.getSkipRecords() + 1; - } + // TODO: maxRecords should default to 0 for clarity imho. (And change to + // recordsPerPartition_ this.maxRecords = Long.MAX_VALUE; if(request.hasMaxRecords() && request.getMaxRecords() > 0) { this.maxRecords = request.getMaxRecords(); diff --git a/src/java/voldemort/utils/Entropy.java b/src/java/voldemort/utils/Entropy.java index e362fc603f..7c23194d6c 100644 --- a/src/java/voldemort/utils/Entropy.java +++ b/src/java/voldemort/utils/Entropy.java @@ -250,7 +250,6 @@ public void generateEntropy(Cluster cluster, .getPartitionIds(), null, false, - 0, numKeysPerNode); for(long keyId = 0; keyId < numKeysPerNode && keys.hasNext(); keyId++) { ByteArray key = keys.next(); @@ -283,7 +282,6 @@ public void generateEntropy(Cluster cluster, partitions, null, false, - 0, numKeysPerPartition); while(keys.hasNext() && numKeysStored < numKeys) { ByteArray key = keys.next(); diff --git a/src/java/voldemort/utils/KeySamplerCLI.java b/src/java/voldemort/utils/KeySamplerCLI.java index bbb494a262..748eb83e75 100644 --- a/src/java/voldemort/utils/KeySamplerCLI.java +++ b/src/java/voldemort/utils/KeySamplerCLI.java @@ -49,16 +49,14 @@ * cluster. A distinct file of sampled keys is generated for each store. * * By default, the "first" key of each partition is sampled. Optional arguments - * control sampling more keys per partition, and skipping some keys on the - * server while sampling. + * control sampling more keys per partition. */ public class KeySamplerCLI { - private static Logger logger = Logger.getLogger(ConsistencyCheck.class); + private static Logger logger = Logger.getLogger(KeySamplerCLI.class); - private final static int NODE_PARALLELISM = 8; - private final static int MAX_RECORDS = 1; - private final static int SKIP_RECORDS = 0; + private final static int DEFAULT_NODE_PARALLELISM = 8; + private final static int DEFAULT_MAX_RECORDS = 1; private final AdminClient adminClient; private final Cluster cluster; @@ -69,13 +67,8 @@ public class KeySamplerCLI { private final ExecutorService nodeSamplerService; private final int maxRecords; - private final int skipRecords; - public KeySamplerCLI(String url, - String outDir, - int nodeParallelism, - int maxRecords, - int skipRecords) { + public KeySamplerCLI(String url, String outDir, int nodeParallelism, int maxRecords) { if(logger.isInfoEnabled()) { logger.info("Connecting to bootstrap server: " + url); } @@ -92,7 +85,6 @@ public KeySamplerCLI(String url, this.nodeSamplerService = Executors.newFixedThreadPool(nodeParallelism); this.maxRecords = maxRecords; - this.skipRecords = skipRecords; } public boolean sampleStores() { @@ -112,6 +104,7 @@ public static class NodeSampleResult { NodeSampleResult(boolean success, String keyString) { this.success = success; + // TODO: keysString versus keyString this.keyString = keyString; } } @@ -136,6 +129,9 @@ public NodeSampleResult call() throws Exception { for(int partitionId: node.getPartitionIds()) { success = false; + // TODO: real per-server throttling and/or make '100' a command + // line argument. + // Simple, lame throttling since thread is going at same node // repeatedly try { @@ -158,13 +154,11 @@ public NodeSampleResult call() throws Exception { while(attempts < 5 && !success) { try { Iterator fetchIterator; - // TODO: should fetchMasterEntries be true? fetchIterator = adminClient.bulkFetchOps.fetchKeys(node.getId(), storeName, singlePartition, null, - false, - skipRecords, + true, maxRecords); int keyCount = 0; while(fetchIterator.hasNext()) { @@ -176,6 +170,9 @@ public NodeSampleResult call() throws Exception { if(keyCount < maxRecords) { logger.warn("Fewer keys (" + keyCount + ") than requested (" + maxRecords + ") returned --- " + infoTag); + } else if(keyCount < maxRecords) { + logger.warn("More keys (" + keyCount + ") than requested (" + + maxRecords + ") returned --- " + infoTag); } success = true; } catch(VoldemortException ve) { @@ -289,19 +286,14 @@ private static OptionParser getParser() { .describedAs("outputDirectory") .ofType(String.class); parser.accepts("parallelism", - "Number of nodes to sample in parallel. [Default: " + NODE_PARALLELISM - + " ]") + "Number of nodes to sample in parallel. [Default: " + + DEFAULT_NODE_PARALLELISM + " ]") .withRequiredArg() .describedAs("storeParallelism") .ofType(Integer.class); parser.accepts("max-records", - "Number of keys sampled per partitoin. [Default: " + MAX_RECORDS + " ]") - .withRequiredArg() - .describedAs("maxRecords") - .ofType(Integer.class); - parser.accepts("skip-records", - "Number of keys to skip between samples (per partition). [Default: " - + SKIP_RECORDS + " ]") + "Number of keys sampled per partitoin. [Default: " + DEFAULT_MAX_RECORDS + + " ]") .withRequiredArg() .describedAs("maxRecords") .ofType(Integer.class); @@ -322,7 +314,6 @@ private static void printUsage() { help.append(" Optional:\n"); help.append(" --parallelism \n"); help.append(" --max-records \n"); - help.append(" --skip-records \n"); help.append(" --help\n"); System.out.print(help.toString()); } @@ -332,11 +323,11 @@ private static void printUsageAndDie(String errMessage) { Utils.croak("\n" + errMessage); } - // TODO: Add a "stores" option so that a subset of stores can be done - // instead of all stores one-by-one. + // TODO: (if needed) Add a "stores" option so that a subset of stores can be + // done instead of all stores one-by-one. - // TODO: Add a "partitions" option so that a subset of partitions can be - // done instead of all partitions. + // TODO: (if needed) Add a "partitions" option so that a subset of + // partitions can be done instead of all partitions. public static void main(String[] args) throws Exception { OptionSet options = null; @@ -362,36 +353,24 @@ public static void main(String[] args) throws Exception { String outDir = (String) options.valueOf("out-dir"); Utils.mkdirs(new File(outDir)); - Integer nodeParallelism = NODE_PARALLELISM; + Integer nodeParallelism = DEFAULT_NODE_PARALLELISM; if(options.hasArgument("parallelism")) { nodeParallelism = (Integer) options.valueOf("parallelism"); } - Integer maxRecords = MAX_RECORDS; + Integer maxRecords = DEFAULT_MAX_RECORDS; if(options.hasArgument("max-records")) { maxRecords = (Integer) options.valueOf("max-records"); } - Integer skipRecords = SKIP_RECORDS; - if(options.hasArgument("skip-records")) { - skipRecords = (Integer) options.valueOf("skip-records"); - } - - // TODO: Add a '--pid-server' and a '--unordered-server' option and - // require exactly one of them to be set. This forces the person - // invoking the command to determine if the servers can do per-partition - // sampling directly, or if many keys must be explicitly sampled so that - // determination of partition coverage is done client-side. + // TODO: Assuming "right thing" happens server-side, then do not need + // the below warning... logger.warn("This tool is hard-coded to take advantage of servers that " + "use PID style layout of data in BDB. " + "Use fo this tool against other types of servers is undefined."); try { - KeySamplerCLI sampler = new KeySamplerCLI(url, - outDir, - nodeParallelism, - maxRecords, - skipRecords); + KeySamplerCLI sampler = new KeySamplerCLI(url, outDir, nodeParallelism, maxRecords); try { if(!sampler.sampleStores()) { logger.error("Some stores were not successfully sampled."); diff --git a/src/java/voldemort/utils/KeyVersionSamplerCLI.java b/src/java/voldemort/utils/KeyVersionSamplerCLI.java index 1079a0f9ef..31d449c15d 100644 --- a/src/java/voldemort/utils/KeyVersionSamplerCLI.java +++ b/src/java/voldemort/utils/KeyVersionSamplerCLI.java @@ -53,9 +53,10 @@ * key-versions is generated. * */ +// TODO: Rename KeyValueFetcher public class KeyVersionSamplerCLI { - private static Logger logger = Logger.getLogger(ConsistencyCheck.class); + private static Logger logger = Logger.getLogger(KeyVersionSamplerCLI.class); private final static int KEY_PARALLELISM = 4; @@ -270,6 +271,8 @@ private static void printUsageAndDie(String errMessage) { Utils.croak("\n" + errMessage); } + // TODO: Add option to fetch value in addition to version + // TODO: Add option to print human readable versions versus byte.hexstrings public static void main(String[] args) throws Exception { OptionSet options = null; try { diff --git a/src/proto/voldemort-admin.proto b/src/proto/voldemort-admin.proto index 0e1c016bce..4acdd5a65e 100644 --- a/src/proto/voldemort-admin.proto +++ b/src/proto/voldemort-admin.proto @@ -76,10 +76,11 @@ message FetchPartitionEntriesRequest { required string store = 2; optional VoldemortFilter filter = 3; optional bool fetch_values = 4; - optional int64 skip_records = 5; + optional int64 OBSOLETE__DO_NOT_USE__skip_records = 5; optional string initial_cluster = 6; optional bool fetch_orphaned = 7; optional int64 max_records = 8; + // optional int64 records_per_partition = 8; } message FetchPartitionEntriesResponse { diff --git a/test/unit/voldemort/client/AdminFetchTest.java b/test/unit/voldemort/client/AdminFetchTest.java index 11e5dc8b85..cf1b224acc 100644 --- a/test/unit/voldemort/client/AdminFetchTest.java +++ b/test/unit/voldemort/client/AdminFetchTest.java @@ -153,7 +153,6 @@ public void testFetchPartitionPrimaryEntries() { null, false, cluster, - 0, 0); // gather all the keys obtained Set fetchedKeys = getEntries(entriesItr); @@ -177,7 +176,6 @@ public void testFetchPartitionSecondaryEntries() { null, false, cluster, - 0, 0); // gather all the keys obtained Set fetchedKeys = getEntries(entriesItr); @@ -201,7 +199,6 @@ public void testFetchNonExistentEntriesPrimary() { null, false, cluster, - 0, 0); // gather all the keys obtained Set fetchedKeys = getEntries(entriesItr); @@ -220,7 +217,6 @@ public void testFetchNonExistentEntriesSecondary() { null, false, cluster, - 0, 0); // gather all the keys obtained Set fetchedKeys = getEntries(entriesItr);