diff --git a/clients/python/voldemort_admin_pb2.py b/clients/python/voldemort_admin_pb2.py index 5d5bcd890c..c2091ecb4e 100644 --- a/clients/python/voldemort_admin_pb2.py +++ b/clients/python/voldemort_admin_pb2.py @@ -436,6 +436,13 @@ message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), + descriptor.FieldDescriptor( + name='skip_records', full_name='voldemort.FetchPartitionEntriesRequest.skip_records', index=5, + number=6, type=3, cpp_type=2, label=1, + default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), ], extensions=[ ], diff --git a/src/java/voldemort/client/protocol/admin/AdminClient.java b/src/java/voldemort/client/protocol/admin/AdminClient.java index 4149e915eb..087c765612 100644 --- a/src/java/voldemort/client/protocol/admin/AdminClient.java +++ b/src/java/voldemort/client/protocol/admin/AdminClient.java @@ -285,12 +285,14 @@ private void initiateFetchRequest(DataOutputStream outputStream, List partitionList, VoldemortFilter filter, boolean fetchValues, - boolean fetchMasterEntries) throws IOException { + boolean fetchMasterEntries, + long skipRecords) throws IOException { VAdminProto.FetchPartitionEntriesRequest.Builder fetchRequest = VAdminProto.FetchPartitionEntriesRequest.newBuilder() .addAllPartitions(partitionList) .setFetchValues(fetchValues) .setFetchMasterEntries(fetchMasterEntries) - .setStore(storeName); + .setStore(storeName) + .setSkipRecords(skipRecords); if(filter != null) { fetchRequest.setFilter(encodeFilter(filter)); @@ -334,7 +336,8 @@ private VAdminProto.FetchPartitionEntriesResponse responseFromStream(DataInputSt * @param partitionList List of the partitions * @param filter Custom filter implementation to filter out entries which * should not be fetched. - * @param fetch only entries which belong to Master + * @param fetchMasterEntries Fetch an entry only if master replica + * @param skipRecords Number of records to skip * @return An iterator which allows entries to be streamed as they're being * iterated over. * @throws VoldemortException @@ -343,7 +346,9 @@ public Iterator>> fetchEntries(int nodeId, String storeName, List partitionList, VoldemortFilter filter, - boolean fetchMasterEntries) { + boolean fetchMasterEntries, + long skipRecords) { + Node node = this.getAdminClientCluster().getNodeById(nodeId); final SocketDestination destination = new SocketDestination(node.getHost(), node.getAdminPort(), @@ -358,7 +363,8 @@ public Iterator>> fetchEntries(int nodeId, partitionList, filter, true, - fetchMasterEntries); + fetchMasterEntries, + skipRecords); } catch(IOException e) { close(sands.getSocket()); pool.checkin(destination, sands); @@ -398,6 +404,19 @@ public Pair> computeNext() { } + /** + * See documentation for + * {@link AdminClient#fetchEntries(int, String, List, VoldemortFilter, boolean, long)} + * . Kept for backwards compatibility + */ + public Iterator>> fetchEntries(int nodeId, + String storeName, + List partitionList, + VoldemortFilter filter, + boolean fetchMasterEntries) { + return fetchEntries(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0); + } + /** * Fetch All keys belonging to partitionList from requested node. Identical * to {@link AdminClient#fetchEntries} but will only fetch the keys @@ -407,13 +426,16 @@ public Pair> computeNext() { * @param partitionList See documentation for * {@link AdminClient#fetchEntries} * @param filter See documentation for {@link AdminClient#fetchEntries} + * @param skipRecords See documentation for + * {@link AdminClient#fetchEntries(int, String, List, VoldemortFilter, boolean, long)} * @return See documentation for {@link AdminClient#fetchEntries} */ public Iterator fetchKeys(int nodeId, String storeName, List partitionList, VoldemortFilter filter, - boolean fetchMasterEntries) { + boolean fetchMasterEntries, + long skipRecords) { Node node = this.getAdminClientCluster().getNodeById(nodeId); final SocketDestination destination = new SocketDestination(node.getHost(), node.getAdminPort(), @@ -428,7 +450,8 @@ public Iterator fetchKeys(int nodeId, partitionList, filter, false, - fetchMasterEntries); + fetchMasterEntries, + skipRecords); } catch(IOException e) { close(sands.getSocket()); pool.checkin(destination, sands); @@ -465,6 +488,19 @@ public ByteArray computeNext() { }; } + /** + * See documentation for + * {@link AdminClient#fetchKeys(int, String, List, VoldemortFilter, boolean, long)} + * . Kept for backwards compatibility + */ + public Iterator fetchKeys(int nodeId, + String storeName, + List partitionList, + VoldemortFilter filter, + boolean fetchMasterEntries) { + return fetchKeys(nodeId, storeName, partitionList, filter, fetchMasterEntries, 0); + } + /** * RestoreData from copies on other machines for the given nodeId *

diff --git a/src/java/voldemort/client/protocol/pb/VAdminProto.java b/src/java/voldemort/client/protocol/pb/VAdminProto.java index d4df40d999..8571365e68 100644 --- a/src/java/voldemort/client/protocol/pb/VAdminProto.java +++ b/src/java/voldemort/client/protocol/pb/VAdminProto.java @@ -3432,6 +3432,13 @@ public int getPartitions(int index) { public boolean hasFetchMasterEntries() { return hasFetchMasterEntries; } public boolean getFetchMasterEntries() { return fetchMasterEntries_; } + // optional int64 skip_records = 6; + public static final int SKIP_RECORDS_FIELD_NUMBER = 6; + private boolean hasSkipRecords; + private long skipRecords_ = 0L; + public boolean hasSkipRecords() { return hasSkipRecords; } + public long getSkipRecords() { return skipRecords_; } + @Override public final boolean isInitialized() { if (!hasStore) return false; @@ -3459,6 +3466,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (hasFetchMasterEntries()) { output.writeBool(5, getFetchMasterEntries()); } + if (hasSkipRecords()) { + output.writeInt64(6, getSkipRecords()); + } getUnknownFields().writeTo(output); } @@ -3494,6 +3504,10 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeBoolSize(5, getFetchMasterEntries()); } + if (hasSkipRecords()) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(6, getSkipRecords()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3657,6 +3671,9 @@ public Builder mergeFrom(voldemort.client.protocol.pb.VAdminProto.FetchPartition if (other.hasFetchMasterEntries()) { setFetchMasterEntries(other.getFetchMasterEntries()); } + if (other.hasSkipRecords()) { + setSkipRecords(other.getSkipRecords()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3716,6 +3733,10 @@ public Builder mergeFrom( setFetchMasterEntries(input.readBool()); break; } + case 48: { + setSkipRecords(input.readInt64()); + break; + } } } } @@ -3848,6 +3869,24 @@ public Builder clearFetchMasterEntries() { result.fetchMasterEntries_ = false; return this; } + + // optional int64 skip_records = 6; + public boolean hasSkipRecords() { + return result.hasSkipRecords(); + } + public long getSkipRecords() { + return result.getSkipRecords(); + } + public Builder setSkipRecords(long value) { + result.hasSkipRecords = true; + result.skipRecords_ = value; + return this; + } + public Builder clearSkipRecords() { + result.hasSkipRecords = false; + result.skipRecords_ = 0L; + return this; + } } static { @@ -15685,121 +15724,122 @@ public Builder clearSwapStoresAndCleanState() { "ror\030\001 \001(\0132\020.voldemort.Error\"-\n\017Voldemort" + "Filter\022\014\n\004name\030\001 \002(\t\022\014\n\004data\030\002 \002(\014\"?\n\032Fe" + "tchPartitionFilesRequest\022\022\n\npartitions\030\001" + - " \003(\005\022\r\n\005store\030\002 \002(\t\"\241\001\n\034FetchPartitionEn" + + " \003(\005\022\r\n\005store\030\002 \002(\t\"\267\001\n\034FetchPartitionEn" + "triesRequest\022\022\n\npartitions\030\001 \003(\005\022\r\n\005stor" + "e\030\002 \002(\t\022*\n\006filter\030\003 \001(\0132\032.voldemort.Vold" + "emortFilter\022\024\n\014fetch_values\030\004 \001(\010\022\034\n\024fet" + - "ch_master_entries\030\005 \001(\010\"\201\001\n\035FetchPartiti" + - "onEntriesResponse\0222\n\017partition_entry\030\001 \001" + - "(\0132\031.voldemort.PartitionEntry\022\013\n\003key\030\002 \001" + - "(\014\022\037\n\005error\030\003 \001(\0132\020.voldemort.Error\"n\n\035D" + - "eletePartitionEntriesRequest\022\r\n\005store\030\001 " + - "\002(\t\022\022\n\npartitions\030\002 \003(\005\022*\n\006filter\030\003 \001(\0132" + - "\032.voldemort.VoldemortFilter\"P\n\036DeletePar" + - "titionEntriesResponse\022\r\n\005count\030\001 \001(\005\022\037\n\005" + - "error\030\002 \001(\0132\020.voldemort.Error\"\224\001\n\035Initia" + - "teFetchAndUpdateRequest\022\017\n\007node_id\030\001 \002(\005" + - "\022\022\n\npartitions\030\002 \003(\005\022\r\n\005store\030\003 \002(\t\022*\n\006f" + - "ilter\030\004 \001(\0132\032.voldemort.VoldemortFilter\022" + - "\023\n\013is_readonly\030\005 \001(\010\"1\n\033AsyncOperationSt" + - "atusRequest\022\022\n\nrequest_id\030\001 \002(\005\"/\n\031Async" + - "OperationStopRequest\022\022\n\nrequest_id\030\001 \002(\005" + - "\"=\n\032AsyncOperationStopResponse\022\037\n\005error\030" + - "\001 \001(\0132\020.voldemort.Error\"M\n\031AsyncOperatio" + - "nListRequest\022\022\n\nrequest_id\030\001 \002(\005\022\034\n\rshow" + - "_complete\030\002 \002(\010:\005false\"R\n\032AsyncOperation" + - "ListResponse\022\023\n\013request_ids\030\001 \003(\005\022\037\n\005err" + - "or\030\002 \001(\0132\020.voldemort.Error\"\276\002\n\034InitiateR" + - "ebalanceNodeRequest\022\022\n\nstealer_id\030\002 \002(\005\022" + - "\020\n\010donor_id\030\003 \002(\005\022\022\n\npartitions\030\004 \003(\005\022\017\n" + - "\007attempt\030\005 \002(\005\022\030\n\020deletePartitions\030\006 \003(\005" + - "\022\030\n\020unbalanced_store\030\007 \003(\t\022\035\n\025stealMaste" + - "rPartitions\030\010 \003(\005\022@\n\027stealer_ro_store_to" + - "_dir\030\t \003(\0132\037.voldemort.ROStoreVersionDir" + - "Map\022>\n\025donor_ro_store_to_dir\030\n \003(\0132\037.vol" + - "demort.ROStoreVersionDirMap\"\212\001\n\034AsyncOpe" + - "rationStatusResponse\022\022\n\nrequest_id\030\001 \001(\005" + - "\022\023\n\013description\030\002 \001(\t\022\016\n\006status\030\003 \001(\t\022\020\n" + - "\010complete\030\004 \001(\010\022\037\n\005error\030\005 \001(\0132\020.voldemo" + - "rt.Error\"\'\n\026TruncateEntriesRequest\022\r\n\005st" + - "ore\030\001 \002(\t\":\n\027TruncateEntriesResponse\022\037\n\005" + - "error\030\001 \001(\0132\020.voldemort.Error\"*\n\017AddStor" + - "eRequest\022\027\n\017storeDefinition\030\001 \002(\t\"3\n\020Add" + - "StoreResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort" + - ".Error\"\'\n\022DeleteStoreRequest\022\021\n\tstoreNam" + - "e\030\001 \002(\t\"6\n\023DeleteStoreResponse\022\037\n\005error\030" + - "\001 \001(\0132\020.voldemort.Error\"P\n\021FetchStoreReq" + - "uest\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030\002 " + - "\002(\t\022\024\n\014push_version\030\003 \001(\003\"9\n\020SwapStoreRe" + - "quest\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030\002" + - " \002(\t\"4\n\021SwapStoreResponse\022\037\n\005error\030\001 \001(\013" + - "2\020.voldemort.Error\"@\n\024RollbackStoreReque" + - "st\022\022\n\nstore_name\030\001 \002(\t\022\024\n\014push_version\030\002" + - " \002(\003\"8\n\025RollbackStoreResponse\022\037\n\005error\030\001" + - " \001(\0132\020.voldemort.Error\"=\n\024ROStoreVersion" + - "DirMap\022\022\n\nstore_name\030\001 \002(\t\022\021\n\tstore_dir\030" + - "\002 \002(\t\"/\n\031GetROMaxVersionDirRequest\022\022\n\nst" + - "ore_name\030\001 \003(\t\"y\n\032GetROMaxVersionDirResp" + - "onse\022:\n\021ro_store_versions\030\001 \003(\0132\037.voldem" + - "ort.ROStoreVersionDirMap\022\037\n\005error\030\002 \001(\0132" + - "\020.voldemort.Error\"3\n\035GetROCurrentVersion" + - "DirRequest\022\022\n\nstore_name\030\001 \003(\t\"}\n\036GetROC" + - "urrentVersionDirResponse\022:\n\021ro_store_ver" + - "sions\030\001 \003(\0132\037.voldemort.ROStoreVersionDi" + - "rMap\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error\"\\\n" + - "\036SwapStoresAndCleanStateRequest\022:\n\021ro_st" + - "ore_versions\030\001 \003(\0132\037.voldemort.ROStoreVe" + - "rsionDirMap\"B\n\037SwapStoresAndCleanStateRe" + - "sponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error\"" + - "\343\n\n\025VoldemortAdminRequest\022)\n\004type\030\001 \002(\0162" + - "\033.voldemort.AdminRequestType\0223\n\014get_meta" + - "data\030\002 \001(\0132\035.voldemort.GetMetadataReques" + - "t\0229\n\017update_metadata\030\003 \001(\0132 .voldemort.U" + - "pdateMetadataRequest\022J\n\030update_partition" + - "_entries\030\004 \001(\0132(.voldemort.UpdatePartiti" + - "onEntriesRequest\022H\n\027fetch_partition_entr" + - "ies\030\005 \001(\0132\'.voldemort.FetchPartitionEntr" + - "iesRequest\022J\n\030delete_partition_entries\030\006" + - " \001(\0132(.voldemort.DeletePartitionEntriesR" + - "equest\022K\n\031initiate_fetch_and_update\030\007 \001(" + - "\0132(.voldemort.InitiateFetchAndUpdateRequ" + - "est\022F\n\026async_operation_status\030\010 \001(\0132&.vo" + - "ldemort.AsyncOperationStatusRequest\022H\n\027i" + - "nitiate_rebalance_node\030\t \001(\0132\'.voldemort" + - ".InitiateRebalanceNodeRequest\022B\n\024async_o" + - "peration_stop\030\n \001(\0132$.voldemort.AsyncOpe" + - "rationStopRequest\022B\n\024async_operation_lis" + - "t\030\013 \001(\0132$.voldemort.AsyncOperationListRe" + - "quest\022;\n\020truncate_entries\030\014 \001(\0132!.voldem" + - "ort.TruncateEntriesRequest\022-\n\tadd_store\030" + - "\r \001(\0132\032.voldemort.AddStoreRequest\0223\n\014del" + - "ete_store\030\016 \001(\0132\035.voldemort.DeleteStoreR" + - "equest\0221\n\013fetch_store\030\017 \001(\0132\034.voldemort." + - "FetchStoreRequest\022/\n\nswap_store\030\020 \001(\0132\033." + - "voldemort.SwapStoreRequest\0227\n\016rollback_s" + - "tore\030\021 \001(\0132\037.voldemort.RollbackStoreRequ" + - "est\022D\n\026get_ro_max_version_dir\030\022 \001(\0132$.vo" + - "ldemort.GetROMaxVersionDirRequest\022L\n\032get" + - "_ro_current_version_dir\030\023 \001(\0132(.voldemor" + - "t.GetROCurrentVersionDirRequest\022D\n\025fetch" + - "_partition_files\030\024 \001(\0132%.voldemort.Fetch" + - "PartitionFilesRequest\022N\n\033swap_stores_and" + - "_clean_state\030\025 \001(\0132).voldemort.SwapStore" + - "sAndCleanStateRequest*\202\004\n\020AdminRequestTy" + - "pe\022\020\n\014GET_METADATA\020\000\022\023\n\017UPDATE_METADATA\020" + - "\001\022\034\n\030UPDATE_PARTITION_ENTRIES\020\002\022\033\n\027FETCH" + - "_PARTITION_ENTRIES\020\003\022\034\n\030DELETE_PARTITION" + - "_ENTRIES\020\004\022\035\n\031INITIATE_FETCH_AND_UPDATE\020" + - "\005\022\032\n\026ASYNC_OPERATION_STATUS\020\006\022\033\n\027INITIAT" + - "E_REBALANCE_NODE\020\007\022\030\n\024ASYNC_OPERATION_ST" + - "OP\020\010\022\030\n\024ASYNC_OPERATION_LIST\020\t\022\024\n\020TRUNCA" + - "TE_ENTRIES\020\n\022\r\n\tADD_STORE\020\013\022\020\n\014DELETE_ST" + - "ORE\020\014\022\017\n\013FETCH_STORE\020\r\022\016\n\nSWAP_STORE\020\016\022\022" + - "\n\016ROLLBACK_STORE\020\017\022\032\n\026GET_RO_MAX_VERSION" + - "_DIR\020\020\022\036\n\032GET_RO_CURRENT_VERSION_DIR\020\021\022\031" + - "\n\025FETCH_PARTITION_FILES\020\022\022\037\n\033SWAP_STORES" + - "_AND_CLEAN_STATE\020\023B-\n\034voldemort.client.p" + - "rotocol.pbB\013VAdminProtoH\001"; + "ch_master_entries\030\005 \001(\010\022\024\n\014skip_records\030" + + "\006 \001(\003\"\201\001\n\035FetchPartitionEntriesResponse\022" + + "2\n\017partition_entry\030\001 \001(\0132\031.voldemort.Par" + + "titionEntry\022\013\n\003key\030\002 \001(\014\022\037\n\005error\030\003 \001(\0132" + + "\020.voldemort.Error\"n\n\035DeletePartitionEntr" + + "iesRequest\022\r\n\005store\030\001 \002(\t\022\022\n\npartitions\030" + + "\002 \003(\005\022*\n\006filter\030\003 \001(\0132\032.voldemort.Voldem" + + "ortFilter\"P\n\036DeletePartitionEntriesRespo" + + "nse\022\r\n\005count\030\001 \001(\005\022\037\n\005error\030\002 \001(\0132\020.vold" + + "emort.Error\"\224\001\n\035InitiateFetchAndUpdateRe" + + "quest\022\017\n\007node_id\030\001 \002(\005\022\022\n\npartitions\030\002 \003" + + "(\005\022\r\n\005store\030\003 \002(\t\022*\n\006filter\030\004 \001(\0132\032.vold" + + "emort.VoldemortFilter\022\023\n\013is_readonly\030\005 \001" + + "(\010\"1\n\033AsyncOperationStatusRequest\022\022\n\nreq" + + "uest_id\030\001 \002(\005\"/\n\031AsyncOperationStopReque" + + "st\022\022\n\nrequest_id\030\001 \002(\005\"=\n\032AsyncOperation" + + "StopResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort." + + "Error\"M\n\031AsyncOperationListRequest\022\022\n\nre" + + "quest_id\030\001 \002(\005\022\034\n\rshow_complete\030\002 \002(\010:\005f" + + "alse\"R\n\032AsyncOperationListResponse\022\023\n\013re" + + "quest_ids\030\001 \003(\005\022\037\n\005error\030\002 \001(\0132\020.voldemo" + + "rt.Error\"\276\002\n\034InitiateRebalanceNodeReques" + + "t\022\022\n\nstealer_id\030\002 \002(\005\022\020\n\010donor_id\030\003 \002(\005\022" + + "\022\n\npartitions\030\004 \003(\005\022\017\n\007attempt\030\005 \002(\005\022\030\n\020" + + "deletePartitions\030\006 \003(\005\022\030\n\020unbalanced_sto" + + "re\030\007 \003(\t\022\035\n\025stealMasterPartitions\030\010 \003(\005\022" + + "@\n\027stealer_ro_store_to_dir\030\t \003(\0132\037.volde" + + "mort.ROStoreVersionDirMap\022>\n\025donor_ro_st" + + "ore_to_dir\030\n \003(\0132\037.voldemort.ROStoreVers" + + "ionDirMap\"\212\001\n\034AsyncOperationStatusRespon" + + "se\022\022\n\nrequest_id\030\001 \001(\005\022\023\n\013description\030\002 " + + "\001(\t\022\016\n\006status\030\003 \001(\t\022\020\n\010complete\030\004 \001(\010\022\037\n" + + "\005error\030\005 \001(\0132\020.voldemort.Error\"\'\n\026Trunca" + + "teEntriesRequest\022\r\n\005store\030\001 \002(\t\":\n\027Trunc" + + "ateEntriesResponse\022\037\n\005error\030\001 \001(\0132\020.vold" + + "emort.Error\"*\n\017AddStoreRequest\022\027\n\017storeD" + + "efinition\030\001 \002(\t\"3\n\020AddStoreResponse\022\037\n\005e" + + "rror\030\001 \001(\0132\020.voldemort.Error\"\'\n\022DeleteSt" + + "oreRequest\022\021\n\tstoreName\030\001 \002(\t\"6\n\023DeleteS" + + "toreResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort." + + "Error\"P\n\021FetchStoreRequest\022\022\n\nstore_name" + + "\030\001 \002(\t\022\021\n\tstore_dir\030\002 \002(\t\022\024\n\014push_versio" + + "n\030\003 \001(\003\"9\n\020SwapStoreRequest\022\022\n\nstore_nam" + + "e\030\001 \002(\t\022\021\n\tstore_dir\030\002 \002(\t\"4\n\021SwapStoreR" + + "esponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.Error" + + "\"@\n\024RollbackStoreRequest\022\022\n\nstore_name\030\001" + + " \002(\t\022\024\n\014push_version\030\002 \002(\003\"8\n\025RollbackSt" + + "oreResponse\022\037\n\005error\030\001 \001(\0132\020.voldemort.E" + + "rror\"=\n\024ROStoreVersionDirMap\022\022\n\nstore_na" + + "me\030\001 \002(\t\022\021\n\tstore_dir\030\002 \002(\t\"/\n\031GetROMaxV" + + "ersionDirRequest\022\022\n\nstore_name\030\001 \003(\t\"y\n\032" + + "GetROMaxVersionDirResponse\022:\n\021ro_store_v" + + "ersions\030\001 \003(\0132\037.voldemort.ROStoreVersion" + + "DirMap\022\037\n\005error\030\002 \001(\0132\020.voldemort.Error\"" + + "3\n\035GetROCurrentVersionDirRequest\022\022\n\nstor" + + "e_name\030\001 \003(\t\"}\n\036GetROCurrentVersionDirRe" + + "sponse\022:\n\021ro_store_versions\030\001 \003(\0132\037.vold" + + "emort.ROStoreVersionDirMap\022\037\n\005error\030\002 \001(" + + "\0132\020.voldemort.Error\"\\\n\036SwapStoresAndClea" + + "nStateRequest\022:\n\021ro_store_versions\030\001 \003(\013" + + "2\037.voldemort.ROStoreVersionDirMap\"B\n\037Swa" + + "pStoresAndCleanStateResponse\022\037\n\005error\030\001 " + + "\001(\0132\020.voldemort.Error\"\343\n\n\025VoldemortAdmin" + + "Request\022)\n\004type\030\001 \002(\0162\033.voldemort.AdminR" + + "equestType\0223\n\014get_metadata\030\002 \001(\0132\035.volde" + + "mort.GetMetadataRequest\0229\n\017update_metada" + + "ta\030\003 \001(\0132 .voldemort.UpdateMetadataReque" + + "st\022J\n\030update_partition_entries\030\004 \001(\0132(.v" + + "oldemort.UpdatePartitionEntriesRequest\022H" + + "\n\027fetch_partition_entries\030\005 \001(\0132\'.voldem" + + "ort.FetchPartitionEntriesRequest\022J\n\030dele" + + "te_partition_entries\030\006 \001(\0132(.voldemort.D" + + "eletePartitionEntriesRequest\022K\n\031initiate" + + "_fetch_and_update\030\007 \001(\0132(.voldemort.Init" + + "iateFetchAndUpdateRequest\022F\n\026async_opera" + + "tion_status\030\010 \001(\0132&.voldemort.AsyncOpera" + + "tionStatusRequest\022H\n\027initiate_rebalance_" + + "node\030\t \001(\0132\'.voldemort.InitiateRebalance" + + "NodeRequest\022B\n\024async_operation_stop\030\n \001(" + + "\0132$.voldemort.AsyncOperationStopRequest\022" + + "B\n\024async_operation_list\030\013 \001(\0132$.voldemor" + + "t.AsyncOperationListRequest\022;\n\020truncate_" + + "entries\030\014 \001(\0132!.voldemort.TruncateEntrie" + + "sRequest\022-\n\tadd_store\030\r \001(\0132\032.voldemort." + + "AddStoreRequest\0223\n\014delete_store\030\016 \001(\0132\035." + + "voldemort.DeleteStoreRequest\0221\n\013fetch_st" + + "ore\030\017 \001(\0132\034.voldemort.FetchStoreRequest\022" + + "/\n\nswap_store\030\020 \001(\0132\033.voldemort.SwapStor" + + "eRequest\0227\n\016rollback_store\030\021 \001(\0132\037.volde" + + "mort.RollbackStoreRequest\022D\n\026get_ro_max_" + + "version_dir\030\022 \001(\0132$.voldemort.GetROMaxVe" + + "rsionDirRequest\022L\n\032get_ro_current_versio" + + "n_dir\030\023 \001(\0132(.voldemort.GetROCurrentVers" + + "ionDirRequest\022D\n\025fetch_partition_files\030\024" + + " \001(\0132%.voldemort.FetchPartitionFilesRequ" + + "est\022N\n\033swap_stores_and_clean_state\030\025 \001(\013" + + "2).voldemort.SwapStoresAndCleanStateRequ" + + "est*\202\004\n\020AdminRequestType\022\020\n\014GET_METADATA" + + "\020\000\022\023\n\017UPDATE_METADATA\020\001\022\034\n\030UPDATE_PARTIT" + + "ION_ENTRIES\020\002\022\033\n\027FETCH_PARTITION_ENTRIES" + + "\020\003\022\034\n\030DELETE_PARTITION_ENTRIES\020\004\022\035\n\031INIT" + + "IATE_FETCH_AND_UPDATE\020\005\022\032\n\026ASYNC_OPERATI" + + "ON_STATUS\020\006\022\033\n\027INITIATE_REBALANCE_NODE\020\007" + + "\022\030\n\024ASYNC_OPERATION_STOP\020\010\022\030\n\024ASYNC_OPER" + + "ATION_LIST\020\t\022\024\n\020TRUNCATE_ENTRIES\020\n\022\r\n\tAD" + + "D_STORE\020\013\022\020\n\014DELETE_STORE\020\014\022\017\n\013FETCH_STO" + + "RE\020\r\022\016\n\nSWAP_STORE\020\016\022\022\n\016ROLLBACK_STORE\020\017" + + "\022\032\n\026GET_RO_MAX_VERSION_DIR\020\020\022\036\n\032GET_RO_C" + + "URRENT_VERSION_DIR\020\021\022\031\n\025FETCH_PARTITION_" + + "FILES\020\022\022\037\n\033SWAP_STORES_AND_CLEAN_STATE\020\023" + + "B-\n\034voldemort.client.protocol.pbB\013VAdmin" + + "ProtoH\001"; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { public com.google.protobuf.ExtensionRegistry assignDescriptors( @@ -15890,7 +15930,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_voldemort_FetchPartitionEntriesRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_voldemort_FetchPartitionEntriesRequest_descriptor, - new java.lang.String[] { "Partitions", "Store", "Filter", "FetchValues", "FetchMasterEntries", }, + new java.lang.String[] { "Partitions", "Store", "Filter", "FetchValues", "FetchMasterEntries", "SkipRecords", }, voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest.class, voldemort.client.protocol.pb.VAdminProto.FetchPartitionEntriesRequest.Builder.class); internal_static_voldemort_FetchPartitionEntriesResponse_descriptor = diff --git a/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java index aaba1b31dd..b3468af160 100644 --- a/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchEntriesStreamRequestHandler.java @@ -49,7 +49,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, ByteArray key = keyIterator.next(); - if(validPartition(key.get())) { + if(validPartition(key.get()) && counter % skipRecords == 0) { for(Versioned value: storageEngine.get(key, null)) { throttler.maybeThrottle(key.length()); if(filter.accept(key, value)) { diff --git a/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java index a57b69b626..2b49cc660a 100644 --- a/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchKeysStreamRequestHandler.java @@ -32,7 +32,8 @@ public FetchKeysStreamRequestHandler(FetchPartitionEntriesRequest request, networkClassLoader); } - public StreamRequestHandlerState handleRequest(DataInputStream inputStream, DataOutputStream outputStream) + public StreamRequestHandlerState handleRequest(DataInputStream inputStream, + DataOutputStream outputStream) throws IOException { if(!keyIterator.hasNext()) return StreamRequestHandlerState.COMPLETE; @@ -40,7 +41,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, Data ByteArray key = keyIterator.next(); throttler.maybeThrottle(key.length()); - if(validPartition(key.get()) && filter.accept(key, null)) { + if(validPartition(key.get()) && filter.accept(key, null) && counter % skipRecords == 0) { VAdminProto.FetchPartitionEntriesResponse.Builder response = VAdminProto.FetchPartitionEntriesResponse.newBuilder(); response.setKey(ProtoUtils.encodeBytes(key)); diff --git a/src/java/voldemort/server/protocol/admin/FetchMasterEntriesStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchMasterEntriesStreamRequestHandler.java index 230a517513..2ed32ddaf6 100644 --- a/src/java/voldemort/server/protocol/admin/FetchMasterEntriesStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchMasterEntriesStreamRequestHandler.java @@ -53,7 +53,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream, // Since Master-Only filter does not need value, we can save some disk // seeks by getting back only Master replica values - if(validPartition(key.get()) && filter.accept(key, null)) { + if(validPartition(key.get()) && filter.accept(key, null) && counter % skipRecords == 0) { for(Versioned value: storageEngine.get(key, null)) { throttler.maybeThrottle(key.length()); fetched++; diff --git a/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java b/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java index ffd29d88a2..ba3f8bf1c0 100644 --- a/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java +++ b/src/java/voldemort/server/protocol/admin/FetchStreamRequestHandler.java @@ -42,7 +42,9 @@ public abstract class FetchStreamRequestHandler implements StreamRequestHandler protected final ClosableIterator keyIterator; - protected int counter; + protected long counter; + + protected long skipRecords; protected int fetched; @@ -76,6 +78,12 @@ protected FetchStreamRequestHandler(VAdminProto.FetchPartitionEntriesRequest req } keyIterator = storageEngine.keys(); startTime = System.currentTimeMillis(); + counter = 0; + + skipRecords = 1; + if(request.hasSkipRecords() && request.getSkipRecords() >= 0) { + skipRecords = request.getSkipRecords() + 1; + } } public final StreamRequestDirection getDirection() { diff --git a/src/java/voldemort/utils/EntropyDetection.java b/src/java/voldemort/utils/EntropyDetection.java new file mode 100644 index 0000000000..2aaafe5e59 --- /dev/null +++ b/src/java/voldemort/utils/EntropyDetection.java @@ -0,0 +1,159 @@ +package voldemort.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import voldemort.client.protocol.RequestFormatType; +import voldemort.client.protocol.admin.AdminClient; +import voldemort.client.protocol.admin.AdminClientConfig; +import voldemort.cluster.Cluster; +import voldemort.cluster.Node; +import voldemort.routing.RoutingStrategy; +import voldemort.routing.RoutingStrategyFactory; +import voldemort.server.RequestRoutingType; +import voldemort.store.Store; +import voldemort.store.StoreDefinition; +import voldemort.store.socket.SocketStoreFactory; +import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; +import voldemort.versioning.Versioned; + +import com.google.common.base.Joiner; + +public class EntropyDetection { + + public static void main(String args[]) throws IOException { + OptionParser parser = new OptionParser(); + parser.accepts("help", "print help information"); + parser.accepts("url", "[REQUIRED] bootstrap URL") + .withRequiredArg() + .describedAs("bootstrap-url") + .ofType(String.class); + parser.accepts("first-id", "[REQUIRED] node id for first node") + .withRequiredArg() + .describedAs("node-id") + .ofType(Integer.class); + parser.accepts("second-id", "[REQUIRED] node id for second node") + .withRequiredArg() + .describedAs("node-id") + .ofType(Integer.class); + parser.accepts("store-name", "[REQUIRED] name of the store") + .withRequiredArg() + .describedAs("store-name") + .ofType(String.class); + parser.accepts("skip-records", "number of records to skip [default: 0 i.e. none]") + .withRequiredArg() + .ofType(Integer.class); + + OptionSet options = parser.parse(args); + + if(options.has("help")) { + parser.printHelpOn(System.out); + System.exit(0); + } + + Set missing = CmdUtils.missing(options, + "url", + "first-id", + "second-id", + "store-name"); + if(missing.size() > 0) { + System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing)); + parser.printHelpOn(System.err); + System.exit(1); + } + + // compulsory params + String url = (String) options.valueOf("url"); + Integer firstId = (Integer) options.valueOf("first-id"); + Integer secondId = (Integer) options.valueOf("second-id"); + String storeName = (String) options.valueOf("store-name"); + + // optional params + Integer skipRecords = CmdUtils.valueOf(options, "skip-records", 0); + + // Use admin client to get cluster / store definition + AdminClient client = new AdminClient(url, new AdminClientConfig()); + + List storeDefs = client.getRemoteStoreDefList(firstId).getValue(); + StoreDefinition storeDef = null; + for(StoreDefinition def: storeDefs) { + if(def.getName().compareTo(storeName) == 0) { + storeDef = def; + } + } + + if(storeDef == null) { + System.err.println("Store name mentioned not found"); + parser.printHelpOn(System.err); + System.exit(1); + } + + // Find partitions which are replicated over to the other node + Cluster cluster = client.getAdminClientCluster(); + Node firstNode = cluster.getNodeById(firstId), secondNode = cluster.getNodeById(secondId); + + RoutingStrategy strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef, + cluster); + + List firstNodePartitionIds = firstNode.getPartitionIds(); + List secondNodePartitionIds = secondNode.getPartitionIds(); + + // This is list of partitions which we need to retrieve + List finalPartitionIds = new ArrayList(); + + for(Integer firstNodePartition: firstNodePartitionIds) { + List replicatingPartitionIds = strategy.getReplicatingPartitionList(firstNodePartition); + + // Check if replicating partition id is one of the partition ids + for(Integer replicatingPartitionId: replicatingPartitionIds) { + if(secondNodePartitionIds.contains(replicatingPartitionId)) { + finalPartitionIds.add(firstNodePartition); + break; + } + } + } + + if(finalPartitionIds.size() == 0) { + System.out.println("No partition found whose replica is on the other node"); + System.exit(0); + } + Iterator>> iterator = client.fetchEntries(firstId, + storeName, + finalPartitionIds, + null, + false, + skipRecords); + + // Get Socket store for other node + SocketStoreFactory storeFactory = new ClientRequestExecutorPool(10, 1000, 1000, 32 * 1024); + final Store secondStore = storeFactory.create(storeName, + secondNode.getHost(), + secondNode.getSocketPort(), + RequestFormatType.VOLDEMORT_V3, + RequestRoutingType.NORMAL); + + long totalKeyValues = 0, totalCorrect = 0; + while(iterator.hasNext()) { + Pair> entry = iterator.next(); + List> otherValues = secondStore.get(entry.getFirst(), null); + + totalKeyValues++; + for(Versioned value: otherValues) { + if(ByteUtils.compare(value.getValue(), entry.getSecond().getValue()) == 0) { + totalCorrect++; + break; + } + } + } + if(totalKeyValues > 0) + System.out.println("Percent correct = " + (double) (totalCorrect / totalKeyValues) + * 100); + else + System.out.println("Percent correct = 0"); + } +} diff --git a/src/proto/voldemort-admin.proto b/src/proto/voldemort-admin.proto index 73a4135ba6..bfb01ee3e4 100644 --- a/src/proto/voldemort-admin.proto +++ b/src/proto/voldemort-admin.proto @@ -60,6 +60,7 @@ message FetchPartitionEntriesRequest { optional VoldemortFilter filter = 3; optional bool fetch_values = 4; optional bool fetch_master_entries = 5; + optional int64 skip_records = 6; } message FetchPartitionEntriesResponse {