Skip to content

Commit

Permalink
refactored streaming and writing of voldemort admin data printing
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiewu committed Sep 6, 2012
1 parent 604324d commit 614eab9
Showing 1 changed file with 152 additions and 160 deletions.
312 changes: 152 additions & 160 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -28,10 +28,12 @@
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -993,20 +995,91 @@ private static void executeFetchEntries(Integer nodeId,
+ Joiner.on(", ").join(partitionIdList) + " of " + store);
}

Iterator<Pair<ByteArray, Versioned<byte[]>>> entriesIterator = adminClient.fetchEntries(nodeId,
store,
partitionIdList,
null,
false);
final Iterator<Pair<ByteArray, Versioned<byte[]>>> entriesIterator = adminClient.fetchEntries(nodeId,
store,
partitionIdList,
null,
false);
File outputFile = null;
if(directory != null) {
outputFile = new File(directory, store + ".entries");
}

if(useAscii) {
writeEntriesAscii(entriesIterator, outputFile, storeDefinition);
// k-v serializer
SerializerDefinition keySerializerDef = storeDefinition.getKeySerializer();
SerializerDefinition valueSerializerDef = storeDefinition.getValueSerializer();
SerializerFactory serializerFactory = new DefaultSerializerFactory();
@SuppressWarnings("unchecked")
final Serializer<Object> keySerializer = (Serializer<Object>) serializerFactory.getSerializer(keySerializerDef);
@SuppressWarnings("unchecked")
final Serializer<Object> valueSerializer = (Serializer<Object>) serializerFactory.getSerializer(valueSerializerDef);

// compression strategy
final CompressionStrategy keyCompressionStrategy;
final CompressionStrategy valueCompressionStrategy;
if(keySerializerDef != null && keySerializerDef.hasCompression()) {
keyCompressionStrategy = new CompressionStrategyFactory().get(keySerializerDef.getCompression());
} else {
keyCompressionStrategy = null;
}
if(valueSerializerDef != null && valueSerializerDef.hasCompression()) {
valueCompressionStrategy = new CompressionStrategyFactory().get(valueSerializerDef.getCompression());
} else {
valueCompressionStrategy = null;
}

writeAscii(outputFile, new Writable() {

@Override
public void writeTo(BufferedWriter out) throws IOException {
final StringWriter stringWriter = new StringWriter();
final JsonGenerator generator = new JsonFactory(new ObjectMapper()).createJsonGenerator(stringWriter);

while(entriesIterator.hasNext()) {
Pair<ByteArray, Versioned<byte[]>> kvPair = entriesIterator.next();
byte[] keyBytes = kvPair.getFirst().get();
byte[] valueBytes = kvPair.getSecond().getValue();
VectorClock version = (VectorClock) kvPair.getSecond().getVersion();

Object keyObject = keySerializer.toObject((null == keyCompressionStrategy) ? keyBytes
: keyCompressionStrategy.inflate(keyBytes));
Object valueObject = valueSerializer.toObject((null == valueCompressionStrategy) ? valueBytes
: valueCompressionStrategy.inflate(valueBytes));
generator.writeObject(keyObject);
stringWriter.write(' ');
stringWriter.write(version.toString());
generator.writeObject(valueObject);

StringBuffer buf = stringWriter.getBuffer();
if(buf.charAt(0) == ' ') {
buf.setCharAt(0, '\n');
}
out.write(buf.toString());
buf.setLength(0);
}
out.write('\n');
}
});
} else {
writeEntriesBinary(entriesIterator, outputFile);
writeBinary(outputFile, new Printable() {

@Override
public void printTo(DataOutputStream out) throws IOException {
while(entriesIterator.hasNext()) {
Pair<ByteArray, Versioned<byte[]>> kvPair = entriesIterator.next();
byte[] keyBytes = kvPair.getFirst().get();
byte[] versionBytes = ((VectorClock) kvPair.getSecond().getVersion()).toBytes();
byte[] valueBytes = kvPair.getSecond().getValue();
out.writeInt(keyBytes.length);
out.write(keyBytes);
out.writeInt(versionBytes.length);
out.write(versionBytes);
out.writeInt(valueBytes.length);
out.write(valueBytes);
}
}
});
}

if(outputFile != null)
Expand Down Expand Up @@ -1098,93 +1171,6 @@ protected Pair<ByteArray, Versioned<byte[]>> computeNext() {
};
}

private static void writeEntriesAscii(Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator,
File outputFile,
StoreDefinition storeDefinition) throws IOException {
BufferedWriter writer = null;
CompressionStrategy keyCompressionStrategy = null;
CompressionStrategy valueCompressionStrategy = null;

if(outputFile != null) {
writer = new BufferedWriter(new FileWriter(outputFile));
} else {
writer = new BufferedWriter(new OutputStreamWriter(System.out));
}
SerializerFactory serializerFactory = new DefaultSerializerFactory();
StringWriter stringWriter = new StringWriter();
JsonGenerator generator = new JsonFactory(new ObjectMapper()).createJsonGenerator(stringWriter);

SerializerDefinition keySerializerDef = storeDefinition.getKeySerializer();
if(null != keySerializerDef && keySerializerDef.hasCompression()) {
keyCompressionStrategy = new CompressionStrategyFactory().get(keySerializerDef.getCompression());
}

SerializerDefinition valueSerializerDef = storeDefinition.getValueSerializer();
if(null != valueSerializerDef && valueSerializerDef.hasCompression()) {
valueCompressionStrategy = new CompressionStrategyFactory().get(valueSerializerDef.getCompression());
}

@SuppressWarnings("unchecked")
Serializer<Object> keySerializer = (Serializer<Object>) serializerFactory.getSerializer(storeDefinition.getKeySerializer());
@SuppressWarnings("unchecked")
Serializer<Object> valueSerializer = (Serializer<Object>) serializerFactory.getSerializer(storeDefinition.getValueSerializer());

try {
while(iterator.hasNext()) {
Pair<ByteArray, Versioned<byte[]>> kvPair = iterator.next();
byte[] keyBytes = kvPair.getFirst().get();
VectorClock version = (VectorClock) kvPair.getSecond().getVersion();
byte[] valueBytes = kvPair.getSecond().getValue();

Object keyObject = keySerializer.toObject((null == keyCompressionStrategy) ? keyBytes
: keyCompressionStrategy.inflate(keyBytes));
Object valueObject = valueSerializer.toObject((null == valueCompressionStrategy) ? valueBytes
: valueCompressionStrategy.inflate(valueBytes));

generator.writeObject(keyObject);
stringWriter.write(' ');
stringWriter.write(version.toString());
generator.writeObject(valueObject);

StringBuffer buf = stringWriter.getBuffer();
if(buf.charAt(0) == ' ') {
buf.setCharAt(0, '\n');
}
writer.write(buf.toString());
buf.setLength(0);
}
writer.write('\n');
} finally {
writer.close();
}
}

private static void writeEntriesBinary(Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator,
File outputFile) throws IOException {
DataOutputStream dos = null;
if(outputFile != null) {
dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(outputFile)));
} else {
dos = new DataOutputStream(new BufferedOutputStream(System.out));
}
try {
while(iterator.hasNext()) {
Pair<ByteArray, Versioned<byte[]>> kvPair = iterator.next();
byte[] keyBytes = kvPair.getFirst().get();
byte[] versionBytes = ((VectorClock) kvPair.getSecond().getVersion()).toBytes();
byte[] valueBytes = kvPair.getSecond().getValue();
dos.writeInt(keyBytes.length);
dos.write(keyBytes);
dos.writeInt(versionBytes.length);
dos.write(versionBytes);
dos.writeInt(valueBytes.length);
dos.write(valueBytes);
}
} finally {
dos.close();
}
}

private static void executeFetchKeys(Integer nodeId,
AdminClient adminClient,
List<Integer> partitionIdList,
Expand Down Expand Up @@ -1232,95 +1218,101 @@ private static void executeFetchKeys(Integer nodeId,
+ Joiner.on(", ").join(partitionIdList) + " of " + store);
}

Iterator<ByteArray> keyIterator = adminClient.fetchKeys(nodeId,
store,
partitionIdList,
null,
false);
final Iterator<ByteArray> keyIterator = adminClient.fetchKeys(nodeId,
store,
partitionIdList,
null,
false);
File outputFile = null;
if(directory != null) {
outputFile = new File(directory, store + ".keys");
}

if(useAscii) {
writeKeysAscii(keyIterator, outputFile, storeDefinition);
final SerializerDefinition serializerDef = storeDefinition.getKeySerializer();
final SerializerFactory serializerFactory = new DefaultSerializerFactory();
@SuppressWarnings("unchecked")
final Serializer<Object> serializer = (Serializer<Object>) serializerFactory.getSerializer(serializerDef);

final CompressionStrategy keysCompressionStrategy;
if(serializerDef != null && serializerDef.hasCompression()) {
keysCompressionStrategy = new CompressionStrategyFactory().get(serializerDef.getCompression());
} else {
keysCompressionStrategy = null;
}

writeAscii(outputFile, new Writable() {

@Override
public void writeTo(BufferedWriter out) throws IOException {
final StringWriter stringWriter = new StringWriter();
final JsonGenerator generator = new JsonFactory(new ObjectMapper()).createJsonGenerator(stringWriter);

while(keyIterator.hasNext()) {
// Ugly hack to be able to separate text by newlines
// vs. spaces
byte[] keyBytes = keyIterator.next().get();
Object keyObject = serializer.toObject((null == keysCompressionStrategy) ? keyBytes
: keysCompressionStrategy.inflate(keyBytes));
generator.writeObject(keyObject);
StringBuffer buf = stringWriter.getBuffer();
if(buf.charAt(0) == ' ') {
buf.setCharAt(0, '\n');
}
out.write(buf.toString());
buf.setLength(0);
}
out.write('\n');
}
});
} else {
writeKeysBinary(keyIterator, outputFile);
writeBinary(outputFile, new Printable() {

@Override
public void printTo(DataOutputStream out) throws IOException {
while(keyIterator.hasNext()) {
byte[] keyBytes = keyIterator.next().get();
out.writeInt(keyBytes.length);
out.write(keyBytes);
}
}
});
}

if(outputFile != null)
System.out.println("Fetched keys from " + store + " to " + outputFile);
}
}

private static void writeKeysAscii(Iterator<ByteArray> keyIterator,
File outputFile,
StoreDefinition storeDefinition) throws IOException {
BufferedWriter writer = null;
CompressionStrategy keysCompressionStrategy = null;
FileWriter fileWriter = null;
if(outputFile != null) {
fileWriter = new FileWriter(outputFile);
writer = new BufferedWriter(fileWriter);
} else {
writer = new BufferedWriter(new OutputStreamWriter(System.out));
}
private abstract static class Printable {

SerializerDefinition serializerDef = storeDefinition.getKeySerializer();
if(null != serializerDef && serializerDef.hasCompression()) {
keysCompressionStrategy = new CompressionStrategyFactory().get(serializerDef.getCompression());
}
public abstract void printTo(DataOutputStream out) throws IOException;
}

private abstract static class Writable {

public abstract void writeTo(BufferedWriter out) throws IOException;
}

SerializerFactory serializerFactory = new DefaultSerializerFactory();
StringWriter stringWriter = new StringWriter();
JsonGenerator generator = new JsonFactory(new ObjectMapper()).createJsonGenerator(stringWriter);
@SuppressWarnings("unchecked")
Serializer<Object> serializer = (Serializer<Object>) serializerFactory.getSerializer(storeDefinition.getKeySerializer());
private static void writeBinary(File outputFile, Printable printable) throws IOException {
OutputStream outputStream = (outputFile == null) ? System.out
: (new FileOutputStream(outputFile));
DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream));
try {
while(keyIterator.hasNext()) {
// Ugly hack to be able to separate text by newlines vs. spaces
byte[] keyBytes = keyIterator.next().get();
Object keyObject = serializer.toObject((null == keysCompressionStrategy) ? keyBytes
: keysCompressionStrategy.inflate(keyBytes));
generator.writeObject(keyObject);
StringBuffer buf = stringWriter.getBuffer();
if(buf.charAt(0) == ' ') {
buf.setCharAt(0, '\n');
}
writer.write(buf.toString());
buf.setLength(0);
}
writer.write('\n');
printable.printTo(dataOutputStream);
} finally {
if(fileWriter != null) {
fileWriter.close();
}
writer.close();
dataOutputStream.close();
}
}

private static void writeKeysBinary(Iterator<ByteArray> keyIterator, File outputFile)
throws IOException {
DataOutputStream dos = null;
FileOutputStream outputStream = null;
if(outputFile != null) {
outputStream = new FileOutputStream(outputFile);
dos = new DataOutputStream(new BufferedOutputStream(outputStream));
} else {
dos = new DataOutputStream(new BufferedOutputStream(System.out));
}

private static void writeAscii(File outputFile, Writable writable) throws IOException {
Writer writer = (outputFile == null) ? (new OutputStreamWriter(System.out))
: (new FileWriter(outputFile));
BufferedWriter bufferedWriter = new BufferedWriter(writer);
try {
while(keyIterator.hasNext()) {
byte[] keyBytes = keyIterator.next().get();
dos.writeInt(keyBytes.length);
dos.write(keyBytes);
}
writable.writeTo(bufferedWriter);
} finally {
if(outputStream != null) {
outputStream.close();
}
dos.close();
bufferedWriter.close();
}
}

Expand Down

0 comments on commit 614eab9

Please sign in to comment.