Permalink
Browse files

admin api to fetch orphaned keys/entries

  • Loading branch information...
1 parent 1b57663 commit 442ca464eb3f47a2acb9071f67c9df8c6179254d @vinothchandar vinothchandar committed Jan 18, 2013
Oops, something went wrong.
@@ -260,6 +260,7 @@ public static void main(String[] args) throws Exception {
.withRequiredArg()
.describedAs("id-of-mirror-node")
.ofType(Integer.class);
+ parser.accepts("fetch-orphaned", "Fetch any orphaned keys/entries in the node");
OptionSet options = parser.parse(args);
@@ -300,54 +301,51 @@ public static void main(String[] args) throws Exception {
}
String ops = "";
- if(options.has("delete-partitions")) {
- ops += "d";
- }
- if(options.has("fetch-keys")) {
- ops += "k";
- }
- if(options.has("fetch-entries")) {
- ops += "v";
- }
- if(options.has("restore")) {
- ops += "r";
- }
+ // Honestly, the most insane code I have seen. Atleast sorting this for
+ // now so its easy to find a spare character
if(options.has("add-stores")) {
ops += "a";
}
- if(options.has("update-entries")) {
- ops += "u";
+ if(options.has("async")) {
+ ops += "b";
}
- if(options.has("delete-store")) {
- ops += "s";
+ if(options.has("check-metadata")) {
+ ops += "c";
}
- if(options.has("get-metadata")) {
- ops += "g";
+ if(options.has("delete-partitions")) {
+ ops += "d";
}
if(options.has("ro-metadata")) {
ops += "e";
}
- if(options.has("truncate")) {
- ops += "t";
- }
- if(options.has("set-metadata")) {
- ops += "m";
+ if(options.has("reserve-memory")) {
+ if(!options.has("stores")) {
+ Utils.croak("Specify the list of stores to reserve memory");
+ }
+ ops += "f";
}
- if(options.has("check-metadata")) {
- ops += "c";
+ if(options.has("get-metadata")) {
+ ops += "g";
}
- if(options.has("key-distribution")) {
- ops += "y";
+ if(options.has("mirror-from-url")) {
+ if(!options.has("mirror-node")) {
+ Utils.croak("Specify the mirror node to fetch from");
+ }
+ ops += "h";
}
if(options.has("clear-rebalancing-metadata")) {
ops += "i";
}
- if(options.has("async")) {
- ops += "b";
+ if(options.has("fetch-keys")) {
+ ops += "k";
}
+
if(options.has("repair-job")) {
ops += "l";
}
+ if(options.has("set-metadata")) {
+ ops += "m";
+ }
if(options.has("native-backup")) {
if(!options.has("backup-dir")) {
Utils.croak("A backup directory must be specified with backup-dir option");
@@ -360,24 +358,32 @@ public static void main(String[] args) throws Exception {
}
ops += "o";
}
- if(options.has("synchronize-metadata-version")) {
- ops += "z";
- }
- if(options.has("reserve-memory")) {
- if(!options.has("stores")) {
- Utils.croak("Specify the list of stores to reserve memory");
- }
- ops += "f";
- }
if(options.has("query-keys")) {
ops += "q";
}
+ if(options.has("restore")) {
+ ops += "r";
+ }
+ if(options.has("delete-store")) {
+ ops += "s";
+ }
- if(options.has("mirror-from-url")) {
- if(!options.has("mirror-node")) {
- Utils.croak("Specify the mirror node to fetch from");
- }
- ops += "h";
+ if(options.has("truncate")) {
+ ops += "t";
+ }
+ if(options.has("update-entries")) {
+ ops += "u";
+ }
+ if(options.has("fetch-entries")) {
+ ops += "v";
+ }
+
+ if(options.has("key-distribution")) {
+ ops += "y";
+ }
+
+ if(options.has("synchronize-metadata-version")) {
+ ops += "z";
}
if(ops.length() < 1) {
Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, "
@@ -425,7 +431,8 @@ public static void main(String[] args) throws Exception {
partitionIdList,
outputDir,
storeNames,
- useAscii);
+ useAscii,
+ options.has("fetch-orphaned"));
}
if(ops.contains("v")) {
boolean useAscii = options.has("ascii");
@@ -438,8 +445,10 @@ public static void main(String[] args) throws Exception {
partitionIdList,
outputDir,
storeNames,
- useAscii);
+ useAscii,
+ options.has("fetch-orphaned"));
}
+
if(ops.contains("a")) {
String storesXml = (String) options.valueOf("add-stores");
executeAddStores(adminClient, storesXml, nodeId);
@@ -810,6 +819,10 @@ public static void printHelp(PrintStream stream, OptionParser parser) throws IOE
stream.println("\t\t./bin/voldemort-admin-tool.sh --mirror-from-url [bootstrap url to mirror from] --mirror-node [node to mirror from] --url [url] --node [node-id] --stores [comma-separated-list-of-store-names]");
stream.println("\t12) Mirror data from another voldemort server (possibly in another cluster) for all stores in current cluster");
stream.println("\t\t./bin/voldemort-admin-tool.sh --mirror-from-url [bootstrap url to mirror from] --mirror-node [node to mirror from] --url [url] --node [node-id]");
+ stream.println("\t13) Fetch all orphaned keys on a particular node");
+ stream.println("\t\t./bin/voldemort-admin-tool.sh --fetch-keys --url [url] --node [node-id] --fetch-orphaned");
+ stream.println("\t14) Fetch all orphaned entries on a particular node");
+ stream.println("\t\t./bin/voldemort-admin-tool.sh --fetch-entries --url [url] --node [node-id] --fetch-orphaned");
stream.println();
stream.println("READ-ONLY OPERATIONS");
stream.println("\t1) Retrieve metadata information of read-only data for a particular node and all stores");
@@ -1152,7 +1165,8 @@ private static void executeFetchEntries(Integer nodeId,
List<Integer> partitionIdList,
String outputDir,
List<String> storeNames,
- boolean useAscii) throws IOException {
+ boolean useAscii,
+ boolean fetchOrphaned) throws IOException {
List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId)
.getValue();
@@ -1196,16 +1210,23 @@ private static void executeFetchEntries(Integer nodeId,
System.out.println("No store found under the name \'" + store + "\'");
continue;
+ }
+
+ Iterator<Pair<ByteArray, Versioned<byte[]>>> entriesIteratorRef = null;
+ if(fetchOrphaned) {
+ System.out.println("Fetching orphaned entries of " + store);
+ entriesIteratorRef = adminClient.fetchOrphanedEntries(nodeId, store);
} else {
System.out.println("Fetching entries in partitions "
+ Joiner.on(", ").join(partitionIdList) + " of " + store);
+ entriesIteratorRef = adminClient.fetchEntries(nodeId,
+ store,
+ partitionIdList,
+ null,
+ false);
}
- final Iterator<Pair<ByteArray, Versioned<byte[]>>> entriesIterator = adminClient.fetchEntries(nodeId,
- store,
- partitionIdList,
- null,
- false);
+ final Iterator<Pair<ByteArray, Versioned<byte[]>>> entriesIterator = entriesIteratorRef;
File outputFile = null;
if(directory != null) {
outputFile = new File(directory, store + ".entries");
@@ -1391,7 +1412,8 @@ private static void executeFetchKeys(Integer nodeId,
List<Integer> partitionIdList,
String outputDir,
List<String> storeNames,
- boolean useAscii) throws IOException {
+ boolean useAscii,
+ boolean fetchOrphaned) throws IOException {
List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId)
.getValue();
Map<String, StoreDefinition> storeDefinitionMap = Maps.newHashMap();
@@ -1432,21 +1454,22 @@ private static void executeFetchKeys(Integer nodeId,
if(null == storeDefinition) {
System.out.println("No store found under the name \'" + store + "\'");
continue;
+ }
+
+ Iterator<ByteArray> keyIteratorRef = null;
+ if(fetchOrphaned) {
+ System.out.println("Fetching orphaned keys of " + store);
+ keyIteratorRef = adminClient.fetchOrphanedKeys(nodeId, store);
} else {
System.out.println("Fetching keys in partitions "
+ Joiner.on(", ").join(partitionIdList) + " of " + store);
+ keyIteratorRef = adminClient.fetchKeys(nodeId, store, partitionIdList, null, false);
}
-
- final Iterator<ByteArray> keyIterator = adminClient.fetchKeys(nodeId,
- store,
- partitionIdList,
- null,
- false);
File outputFile = null;
if(directory != null) {
outputFile = new File(directory, store + ".keys");
}
-
+ final Iterator<ByteArray> keyIterator = keyIteratorRef;
if(useAscii) {
final SerializerDefinition serializerDef = storeDefinition.getKeySerializer();
final SerializerFactory serializerFactory = new DefaultSerializerFactory();
@@ -474,6 +474,76 @@ private void initiateFetchRequest(DataOutputStream outputStream,
}
/**
+ * Fetches entries that don't belong to the node, based on current metadata
+ * and yet persisted on the node
+ *
+ * @param nodeId Id of the node to fetch from
+ * @param storeName Name of the store
+ * @return An iterator which allows entries to be streamed as they're being
+ * iterated over.
+ */
+ public Iterator<Pair<ByteArray, Versioned<byte[]>>> fetchOrphanedEntries(int nodeId,
+ String storeName) {
+
+ Node node = this.getAdminClientCluster().getNodeById(nodeId);
+ final SocketDestination destination = new SocketDestination(node.getHost(),
+ node.getAdminPort(),
+ RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
+ final SocketAndStreams sands = pool.checkout(destination);
+ DataOutputStream outputStream = sands.getOutputStream();
+ final DataInputStream inputStream = sands.getInputStream();
+
+ try {
+ VAdminProto.FetchPartitionEntriesRequest.Builder fetchOrphanedRequest = VAdminProto.FetchPartitionEntriesRequest.newBuilder()
+ .setFetchValues(true)
+ .setStore(storeName)
+ .setFetchOrphaned(true);
+
+ VAdminProto.VoldemortAdminRequest request = VAdminProto.VoldemortAdminRequest.newBuilder()
+ .setType(VAdminProto.AdminRequestType.FETCH_PARTITION_ENTRIES)
+ .setFetchPartitionEntries(fetchOrphanedRequest)
+ .build();
+ ProtoUtils.writeMessage(outputStream, request);
+ outputStream.flush();
+ } catch(IOException e) {
+ close(sands.getSocket());
+ pool.checkin(destination, sands);
+ throw new VoldemortException(e);
+ }
+
+ return new AbstractIterator<Pair<ByteArray, Versioned<byte[]>>>() {
+
+ @Override
+ public Pair<ByteArray, Versioned<byte[]>> computeNext() {
+ try {
+ int size = inputStream.readInt();
+ if(size == -1) {
+ pool.checkin(destination, sands);
+ return endOfData();
+ }
+
+ VAdminProto.FetchPartitionEntriesResponse response = responseFromStream(inputStream,
+ size);
+
+ if(response.hasError()) {
+ pool.checkin(destination, sands);
+ throwException(response.getError());
+ }
+
+ VAdminProto.PartitionEntry partitionEntry = response.getPartitionEntry();
+
+ return Pair.create(ProtoUtils.decodeBytes(partitionEntry.getKey()),
+ ProtoUtils.decodeVersioned(partitionEntry.getVersioned()));
+ } catch(IOException e) {
+ close(sands.getSocket());
+ pool.checkin(destination, sands);
+ throw new VoldemortException(e);
+ }
+ }
+ };
+ }
+
+ /**
* Legacy interface for fetching entries. See
* {@link AdminClient#fetchEntries(int, String, HashMap, VoldemortFilter, boolean, Cluster, long)}
* for more information.
@@ -680,6 +750,73 @@ private void initiateFetchRequest(DataOutputStream outputStream,
}
/**
+ * Fetch all the keys on the node that don't belong to it, based on its
+ * current metadata and yet stored on the node. i.e all keys orphaned on the
+ * node due to say not running the repair job after a rebalance
+ *
+ * @param nodeId Id of the node to fetch from
+ * @param storeName Name of the store
+ * @return An iterator which allows keys to be streamed as they're being
+ * iterated over.
+ */
+ public Iterator<ByteArray> fetchOrphanedKeys(int nodeId, String storeName) {
+ Node node = this.getAdminClientCluster().getNodeById(nodeId);
+ final SocketDestination destination = new SocketDestination(node.getHost(),
+ node.getAdminPort(),
+ RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
+ final SocketAndStreams sands = pool.checkout(destination);
+ DataOutputStream outputStream = sands.getOutputStream();
+ final DataInputStream inputStream = sands.getInputStream();
+
+ try {
+ VAdminProto.FetchPartitionEntriesRequest.Builder fetchOrphanedRequest = VAdminProto.FetchPartitionEntriesRequest.newBuilder()
+ .setFetchValues(false)
+ .setStore(storeName)
+ .setFetchOrphaned(true);
+
+ VAdminProto.VoldemortAdminRequest request = VAdminProto.VoldemortAdminRequest.newBuilder()
+ .setType(VAdminProto.AdminRequestType.FETCH_PARTITION_ENTRIES)
+ .setFetchPartitionEntries(fetchOrphanedRequest)
+ .build();
+ ProtoUtils.writeMessage(outputStream, request);
+ outputStream.flush();
+ } catch(IOException e) {
+ close(sands.getSocket());
+ pool.checkin(destination, sands);
+ throw new VoldemortException(e);
+ }
+
+ return new AbstractIterator<ByteArray>() {
+
+ @Override
+ public ByteArray computeNext() {
+ try {
+ int size = inputStream.readInt();
+ if(size == -1) {
+ pool.checkin(destination, sands);
+ return endOfData();
+ }
+
+ VAdminProto.FetchPartitionEntriesResponse response = responseFromStream(inputStream,
+ size);
+
+ if(response.hasError()) {
+ pool.checkin(destination, sands);
+ throwException(response.getError());
+ }
+
+ return ProtoUtils.decodeBytes(response.getKey());
+ } catch(IOException e) {
+ close(sands.getSocket());
+ pool.checkin(destination, sands);
+ throw new VoldemortException(e);
+ }
+
+ }
+ };
+ }
+
+ /**
* Legacy interface for fetching entries. See
* {@link AdminClient#fetchKeys(int, String, HashMap, VoldemortFilter, boolean, Cluster, long)}
* for more information.
Oops, something went wrong.

0 comments on commit 442ca46

Please sign in to comment.