Skip to content

Commit

Permalink
Reuses --stores option and added multiple store query support. Fixed …
Browse files Browse the repository at this point in the history
…bug that prevents queryKeys, fetchEntries, fetchKeys to print results beyond the first store
  • Loading branch information
zhongjiewu committed Sep 6, 2012
1 parent ef91db7 commit a33b280
Showing 1 changed file with 116 additions and 90 deletions.
206 changes: 116 additions & 90 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -27,6 +27,7 @@
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
Expand Down Expand Up @@ -512,14 +513,11 @@ public static void main(String[] args) throws Exception {
adminClient.reserveMemory(nodeId, storeNames, reserveMB);
}
if(ops.contains("q")) {
List<String> keyList = null;
String storeName = (String) options.valueOf("store");
if(storeName == null) {
throw new VoldemortException("Must specify store name using --store option (NOT --stores)");
List<String> keyList = (List<String>) options.valuesOf("query-keys");
if(storeNames == null || storeNames.size() == 0) {
throw new VoldemortException("Must specify store name using --stores option");
}
if(options.hasArgument("query-keys"))
keyList = (List<String>) options.valuesOf("query-keys");
executeQueryKeys(nodeId, adminClient, storeName, keyList);
executeQueryKeys(nodeId, adminClient, storeNames, keyList);
}
} catch(Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -1322,8 +1320,18 @@ private abstract static class Writable {
}

private static void writeBinary(File outputFile, Printable printable) throws IOException {
OutputStream outputStream = (outputFile == null) ? System.out
: (new FileOutputStream(outputFile));
OutputStream outputStream = null;
if(outputFile == null) {
outputStream = new FilterOutputStream(System.out) {

@Override
public void close() throws IOException {
flush();
}
};
} else {
outputStream = new FileOutputStream(outputFile);
}
DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream));
try {
printable.printTo(dataOutputStream);
Expand All @@ -1333,8 +1341,18 @@ private static void writeBinary(File outputFile, Printable printable) throws IOE
}

private static void writeAscii(File outputFile, Writable writable) throws IOException {
Writer writer = (outputFile == null) ? (new OutputStreamWriter(System.out))
: (new FileWriter(outputFile));
Writer writer = null;
if(outputFile == null) {
writer = new OutputStreamWriter(new FilterOutputStream(System.out) {

@Override
public void close() throws IOException {
flush();
}
});
} else {
writer = new FileWriter(outputFile);
}
BufferedWriter bufferedWriter = new BufferedWriter(writer);
try {
writable.writeTo(bufferedWriter);
Expand Down Expand Up @@ -1364,98 +1382,106 @@ private static void executeDeletePartitions(Integer nodeId,
}
}

private static void executeQueryKeys(Integer nodeId,
private static void executeQueryKeys(final Integer nodeId,
AdminClient adminClient,
String storeName,
List<String> storeNames,
List<String> keys) throws IOException {
Serializer<String> serializer = new StringSerializer();
List<ByteArray> listKeys = new ArrayList<ByteArray>();
for(String key: keys) {
listKeys.add(new ByteArray(serializer.toBytes(key)));
}
final Iterator<Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>>> iterator = adminClient.queryKeys(nodeId.intValue(),
storeName,
listKeys.iterator());
List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId)
.getValue();
StoreDefinition storeDefinition = null;
for(StoreDefinition storeDef: storeDefinitionList) {
if(storeDef.getName().equals(storeName))
storeDefinition = storeDef;
}

// k-v serializer
SerializerDefinition keySerializerDef = storeDefinition.getKeySerializer();
SerializerDefinition valueSerializerDef = storeDefinition.getValueSerializer();
SerializerFactory serializerFactory = new DefaultSerializerFactory();
@SuppressWarnings("unchecked")
final Serializer<Object> keySerializer = (Serializer<Object>) serializerFactory.getSerializer(keySerializerDef);
@SuppressWarnings("unchecked")
final Serializer<Object> valueSerializer = (Serializer<Object>) serializerFactory.getSerializer(valueSerializerDef);

// compression strategy
final CompressionStrategy keyCompressionStrategy;
final CompressionStrategy valueCompressionStrategy;
if(keySerializerDef != null && keySerializerDef.hasCompression()) {
keyCompressionStrategy = new CompressionStrategyFactory().get(keySerializerDef.getCompression());
} else {
keyCompressionStrategy = null;
}
if(valueSerializerDef != null && valueSerializerDef.hasCompression()) {
valueCompressionStrategy = new CompressionStrategyFactory().get(valueSerializerDef.getCompression());
} else {
valueCompressionStrategy = null;
}

// write to stdout
writeAscii(null, new Writable() {
for(final String storeName: storeNames) {
final Iterator<Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>>> iterator = adminClient.queryKeys(nodeId.intValue(),
storeName,
listKeys.iterator());
List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId)
.getValue();
StoreDefinition storeDefinition = null;
for(StoreDefinition storeDef: storeDefinitionList) {
if(storeDef.getName().equals(storeName))
storeDefinition = storeDef;
}

@Override
public void writeTo(BufferedWriter out) throws IOException {
final StringWriter stringWriter = new StringWriter();
final JsonGenerator generator = new JsonFactory(new ObjectMapper()).createJsonGenerator(stringWriter);

while(iterator.hasNext()) {
Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>> kvPair = iterator.next();
// unserialize and write key
byte[] keyBytes = kvPair.getFirst().get();
Object keyObject = keySerializer.toObject((null == keyCompressionStrategy) ? keyBytes
: keyCompressionStrategy.inflate(keyBytes));
generator.writeObject(keyObject);

// iterate through, unserialize and write values
List<Versioned<byte[]>> values = kvPair.getSecond().getFirst();
if(values != null) {
for(Versioned<byte[]> versioned: values) {
VectorClock version = (VectorClock) versioned.getVersion();
byte[] valueBytes = versioned.getValue();
Object valueObject = valueSerializer.toObject((null == valueCompressionStrategy) ? valueBytes
: valueCompressionStrategy.inflate(valueBytes));
// k-v serializer
SerializerDefinition keySerializerDef = storeDefinition.getKeySerializer();
SerializerDefinition valueSerializerDef = storeDefinition.getValueSerializer();
SerializerFactory serializerFactory = new DefaultSerializerFactory();
@SuppressWarnings("unchecked")
final Serializer<Object> keySerializer = (Serializer<Object>) serializerFactory.getSerializer(keySerializerDef);
@SuppressWarnings("unchecked")
final Serializer<Object> valueSerializer = (Serializer<Object>) serializerFactory.getSerializer(valueSerializerDef);

// compression strategy
final CompressionStrategy keyCompressionStrategy;
final CompressionStrategy valueCompressionStrategy;
if(keySerializerDef != null && keySerializerDef.hasCompression()) {
keyCompressionStrategy = new CompressionStrategyFactory().get(keySerializerDef.getCompression());
} else {
keyCompressionStrategy = null;
}
if(valueSerializerDef != null && valueSerializerDef.hasCompression()) {
valueCompressionStrategy = new CompressionStrategyFactory().get(valueSerializerDef.getCompression());
} else {
valueCompressionStrategy = null;
}

// write to stdout
writeAscii(null, new Writable() {

@Override
public void writeTo(BufferedWriter out) throws IOException {
final StringWriter stringWriter = new StringWriter();
final JsonGenerator generator = new JsonFactory(new ObjectMapper()).createJsonGenerator(stringWriter);
stringWriter.write("Querying keys in node " + nodeId + " of " + storeName
+ "\n");

while(iterator.hasNext()) {
Pair<ByteArray, Pair<List<Versioned<byte[]>>, Exception>> kvPair = iterator.next();
// unserialize and write key
byte[] keyBytes = kvPair.getFirst().get();
Object keyObject = keySerializer.toObject((null == keyCompressionStrategy) ? keyBytes
: keyCompressionStrategy.inflate(keyBytes));
generator.writeObject(keyObject);

// iterate through, unserialize and write values
List<Versioned<byte[]>> values = kvPair.getSecond().getFirst();
if(values != null) {
if(values.size() == 0) {
stringWriter.write(", null");
}
for(Versioned<byte[]> versioned: values) {
VectorClock version = (VectorClock) versioned.getVersion();
byte[] valueBytes = versioned.getValue();
Object valueObject = valueSerializer.toObject((null == valueCompressionStrategy) ? valueBytes
: valueCompressionStrategy.inflate(valueBytes));

stringWriter.write(", ");
stringWriter.write(version.toString());
stringWriter.write('[');
stringWriter.write(new Date(version.getTimestamp()).toString());
stringWriter.write(']');
generator.writeObject(valueObject);
}
} else {
stringWriter.write(", null");
}
// write out exception
if(kvPair.getSecond().getSecond() != null) {
stringWriter.write(", ");
stringWriter.write(version.toString());
stringWriter.write('[');
stringWriter.write(new Date(version.getTimestamp()).toString());
stringWriter.write(']');
generator.writeObject(valueObject);

stringWriter.write(kvPair.getSecond().getSecond().toString());
}
}
// write out exception
if(kvPair.getSecond().getSecond() != null) {
stringWriter.write(", ");
stringWriter.write(kvPair.getSecond().getSecond().toString());
}

StringBuffer buf = stringWriter.getBuffer();
if(buf.charAt(0) == ' ') {
buf.setCharAt(0, '\n');
StringBuffer buf = stringWriter.getBuffer();
if(buf.charAt(0) == ' ') {
buf.setCharAt(0, '\n');
}
out.write(buf.toString());
buf.setLength(0);
}
out.write(buf.toString());
buf.setLength(0);
out.write('\n');
}
out.write('\n');
}
});
});
}
}
}

0 comments on commit a33b280

Please sign in to comment.