Skip to content

Commit

Permalink
Added admin option to query keys on specified nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiewu committed Sep 6, 2012
1 parent 614eab9 commit b18840b
Show file tree
Hide file tree
Showing 3 changed files with 429 additions and 0 deletions.
123 changes: 123 additions & 0 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -34,6 +34,8 @@
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -55,6 +57,7 @@
import voldemort.serialization.Serializer;
import voldemort.serialization.SerializerDefinition;
import voldemort.serialization.SerializerFactory;
import voldemort.serialization.StringSerializer;
import voldemort.server.rebalance.RebalancerState;
import voldemort.store.StoreDefinition;
import voldemort.store.compress.CompressionStrategy;
Expand Down Expand Up @@ -130,6 +133,11 @@ public static void main(String[] args) throws Exception {
.describedAs("store-names")
.withValuesSeparatedBy(',')
.ofType(String.class);
parser.accepts("store", "Store name for querying keys")
.withRequiredArg()
.describedAs("store-name")
.withValuesSeparatedBy(',')
.ofType(String.class);
parser.accepts("add-stores", "Add stores in this stores.xml")
.withRequiredArg()
.describedAs("stores.xml containing just the new stores")
Expand Down Expand Up @@ -226,6 +234,11 @@ public static void main(String[] args) throws Exception {
.withRequiredArg()
.describedAs("size-in-mb")
.ofType(Long.class);
parser.accepts("query-keys", "Get values of keys on specific nodes")
.withRequiredArg()
.describedAs("query-keys")
.withValuesSeparatedBy(',')
.ofType(String.class);

OptionSet options = parser.parse(args);

Expand Down Expand Up @@ -323,6 +336,9 @@ public static void main(String[] args) throws Exception {
}
ops += "f";
}
if(options.has("query-keys")) {
ops += "q";
}
if(ops.length() < 1) {
Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, "
+ "fetch-keys, add-stores, delete-store, update-entries, get-metadata, ro-metadata, "
Expand Down Expand Up @@ -496,6 +512,16 @@ public static void main(String[] args) throws Exception {
long reserveMB = (Long) options.valueOf("reserve-memory");
adminClient.reserveMemory(nodeId, storeNames, reserveMB);
}
if(ops.contains("q")) {
List<String> keyList = null;
String storeName = (String) options.valueOf("store");
if(storeName == null) {
throw new VoldemortException("Must specify store name using --store option (NOT --stores)");
}
if(options.hasArgument("query-keys"))
keyList = (List<String>) options.valuesOf("query-keys");
executeQueryKeys(nodeId, adminClient, storeName, keyList);
}
} catch(Exception e) {
e.printStackTrace();
Utils.croak(e.getMessage());
Expand Down Expand Up @@ -599,6 +625,8 @@ public static void printHelp(PrintStream stream, OptionParser parser) throws IOE
stream.println("\t\t./bin/voldemort-admin-tool.sh --fetch-entries --url [url] --node [node-id]");
stream.println("\t9) Update entries for a set of stores using the output from a binary dump fetch entries");
stream.println("\t\t./bin/voldemort-admin-tool.sh --update-entries [folder path from output of --fetch-entries --outdir] --url [url] --node [node-id] --stores [comma-separated list of store names]");
stream.println("\t10) Query a store for a set of keys on a specific node. Notice that the --store option is not prural");
stream.println("\t\t./bin/voldemort-admin-tool.sh --query-keys [comma-separated list of keys] --url [url] --node [node-id] --store [store name]");
stream.println();
stream.println("READ-ONLY OPERATIONS");
stream.println("\t1) Retrieve metadata information of read-only data for a particular node and all stores");
Expand Down Expand Up @@ -1336,4 +1364,99 @@ private static void executeDeletePartitions(Integer nodeId,
adminClient.deletePartitions(nodeId, store, partitionIdList, null);
}
}

private static void executeQueryKeys(Integer nodeId,
AdminClient adminClient,
String storeName,
List<String> keys) throws IOException {
Serializer<String> serializer = new StringSerializer();
List<ByteArray> listKeys = new ArrayList<ByteArray>();
for(String key: keys) {
listKeys.add(new ByteArray(serializer.toBytes(key)));
}
final Iterator<Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>>> iterator = adminClient.queryKeys(nodeId.intValue(),
storeName,
listKeys.iterator());
List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId)
.getValue();
StoreDefinition storeDefinition = null;
for(StoreDefinition storeDef: storeDefinitionList) {
if(storeDef.getName().equals(storeName))
storeDefinition = storeDef;
}

// k-v serializer
SerializerDefinition keySerializerDef = storeDefinition.getKeySerializer();
SerializerDefinition valueSerializerDef = storeDefinition.getValueSerializer();
SerializerFactory serializerFactory = new DefaultSerializerFactory();
@SuppressWarnings("unchecked")
final Serializer<Object> keySerializer = (Serializer<Object>) serializerFactory.getSerializer(keySerializerDef);
@SuppressWarnings("unchecked")
final Serializer<Object> valueSerializer = (Serializer<Object>) serializerFactory.getSerializer(valueSerializerDef);

// compression strategy
final CompressionStrategy keyCompressionStrategy;
final CompressionStrategy valueCompressionStrategy;
if(keySerializerDef != null && keySerializerDef.hasCompression()) {
keyCompressionStrategy = new CompressionStrategyFactory().get(keySerializerDef.getCompression());
} else {
keyCompressionStrategy = null;
}
if(valueSerializerDef != null && valueSerializerDef.hasCompression()) {
valueCompressionStrategy = new CompressionStrategyFactory().get(valueSerializerDef.getCompression());
} else {
valueCompressionStrategy = null;
}

// write to stdout
writeAscii(null, new Writable() {

@Override
public void writeTo(BufferedWriter out) throws IOException {
final StringWriter stringWriter = new StringWriter();
final JsonGenerator generator = new JsonFactory(new ObjectMapper()).createJsonGenerator(stringWriter);

while(iterator.hasNext()) {
Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>> kvPair = iterator.next();
// unserialize and write key
byte[] keyBytes = kvPair.getFirst().get();
Object keyObject = keySerializer.toObject((null == keyCompressionStrategy) ? keyBytes
: keyCompressionStrategy.inflate(keyBytes));
generator.writeObject(keyObject);

// iterate through, unserialize and write values
List<Versioned<byte[]>> values = kvPair.getSecond().getFirst();
if(values != null) {
for(Versioned<byte[]> versioned: values) {
VectorClock version = (VectorClock) versioned.getVersion();
byte[] valueBytes = versioned.getValue();
Object valueObject = valueSerializer.toObject((null == valueCompressionStrategy) ? valueBytes
: valueCompressionStrategy.inflate(valueBytes));

stringWriter.write(", ");
stringWriter.write(version.toString());
stringWriter.write('[');
stringWriter.write(new Date(version.getTimestamp()).toString());
stringWriter.write(']');
generator.writeObject(valueObject);

}
}
// write out exception
if(kvPair.getSecond().getSecond() != null) {
stringWriter.write(", ");
stringWriter.write(kvPair.getSecond().getSecond().toString());
}

StringBuffer buf = stringWriter.getBuffer();
if(buf.charAt(0) == ' ') {
buf.setCharAt(0, '\n');
}
out.write(buf.toString());
buf.setLength(0);
}
out.write('\n');
}
});
}
}
64 changes: 64 additions & 0 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -58,10 +58,12 @@
import voldemort.cluster.Node;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.server.RequestRoutingType;
import voldemort.server.protocol.admin.AsyncOperationStatus;
import voldemort.server.rebalance.RebalancerState;
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.Store;
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.metadata.MetadataStore.VoldemortState;
Expand All @@ -73,6 +75,7 @@
import voldemort.store.slop.Slop;
import voldemort.store.slop.Slop.Operation;
import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
import voldemort.store.views.ViewStorageConfiguration;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
Expand Down Expand Up @@ -510,6 +513,67 @@ public Pair<ByteArray, Versioned<byte[]>> computeNext() {

}

/**
* Fetch key/value tuples belonging to a node with given key values
*
* <p>
* Entries are being queried synchronously <em>as the iteration happens</em>
* i.e. the whole result set is <b>not</b> buffered in memory.
*
* @param nodeId Id of the node to fetch from
* @param storeName Name of the store
* @param keys An Iterable of keys
* @return An iterator which allows entries to be streamed as they're being
* iterated over.
*/
public Iterator<Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>>> queryKeys(int nodeId,
String storeName,
final Iterator<ByteArray> keys) {

Node node = this.getAdminClientCluster().getNodeById(nodeId);
ClientConfig clientConfig = new ClientConfig();
final Store<ByteArray, byte[], byte[]> store;
final ClientRequestExecutorPool clientPool = new ClientRequestExecutorPool(clientConfig.getSelectors(),
clientConfig.getMaxConnectionsPerNode(),
clientConfig.getConnectionTimeout(TimeUnit.MILLISECONDS),
clientConfig.getSocketTimeout(TimeUnit.MILLISECONDS),
clientConfig.getSocketBufferSize(),
clientConfig.getSocketKeepAlive());
try {
store = clientPool.create(storeName,
node.getHost(),
node.getSocketPort(),
clientConfig.getRequestFormatType(),
RequestRoutingType.IGNORE_CHECKS);

} catch(Exception e) {
clientPool.close();
throw new VoldemortException(e);
}

return new AbstractIterator<Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>>>() {

@Override
public Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>> computeNext() {
ByteArray key;
Exception exception = null;
List<Versioned<byte[]>> value = null;
if(!keys.hasNext()) {
clientPool.close();
return endOfData();
} else {
key = keys.next();
}
try {
value = store.get(key, null);
} catch(Exception e) {
exception = e;
}
return Pair.create(key, Pair.create(value, exception));
}
};
}

/**
* Legacy interface for fetching entries. See
* {@link AdminClient#fetchKeys(int, String, HashMap, VoldemortFilter, boolean, Cluster, long)}
Expand Down

0 comments on commit b18840b

Please sign in to comment.