Skip to content

Commit

Permalink
Merge pull request voldemort#83 from pbailis/master
Browse files Browse the repository at this point in the history
Debug messages for initial Voldemort profiling
  • Loading branch information
Zhongjie Wu committed Jul 10, 2012
2 parents 70718b4 + fa3d6f6 commit b687f02
Show file tree
Hide file tree
Showing 12 changed files with 561 additions and 17 deletions.
28 changes: 28 additions & 0 deletions src/java/voldemort/server/niosocket/AsyncRequestHandler.java
Expand Up @@ -76,6 +76,11 @@ public AsyncRequestHandler(Selector selector,
protected void read(SelectionKey selectionKey) throws IOException {
int count = 0;

long startNs = -1;

if(logger.isDebugEnabled())
startNs = System.nanoTime();

if((count = socketChannel.read(inputStream.getBuffer())) == -1)
throw new EOFException("EOF for " + socketChannel.socket());

Expand Down Expand Up @@ -125,6 +130,14 @@ protected void read(SelectionKey selectionKey) throws IOException {
streamRequestHandler = requestHandler.handleRequest(new DataInputStream(inputStream),
new DataOutputStream(outputStream));

if(logger.isDebugEnabled()) {
logger.debug("AsyncRequestHandler:read finished request from "
+ socketChannel.socket().getRemoteSocketAddress() + " handlerRef: "
+ System.identityHashCode(streamRequestHandler) + " at time: "
+ System.currentTimeMillis() + " elapsed time: "
+ (System.nanoTime() - startNs) + " ns");
}

if(streamRequestHandler != null) {
// In the case of a StreamRequestHandler, we handle that separately
// (attempting to process multiple "segments").
Expand Down Expand Up @@ -282,8 +295,23 @@ private StreamRequestHandlerState handleStreamRequestInternal(SelectionKey selec
if(logger.isTraceEnabled())
traceInputBufferState("Before streaming request handler");

// this is the lowest level in the NioSocketServer stack at which we
// still have a reference to the client IP address and port
long startNs = -1;

if(logger.isDebugEnabled())
startNs = System.nanoTime();

state = streamRequestHandler.handleRequest(dataInputStream, dataOutputStream);

if(logger.isDebugEnabled()) {
logger.debug("Handled request from "
+ socketChannel.socket().getRemoteSocketAddress() + " handlerRef: "
+ System.identityHashCode(streamRequestHandler) + " at time: "
+ System.currentTimeMillis() + " elapsed time: "
+ (System.nanoTime() - startNs) + " ns");
}

if(logger.isTraceEnabled())
traceInputBufferState("After streaming request handler");
} catch(Exception e) {
Expand Down
Expand Up @@ -101,6 +101,14 @@ private RequestRoutingType getRoutingType(DataInputStream inputStream) throws IO
private void handleGetVersion(DataInputStream inputStream,
DataOutputStream outputStream,
Store<ByteArray, byte[], byte[]> store) throws IOException {
long startTimeMs = -1;
long startTimeNs = -1;

if(logger.isDebugEnabled()) {
startTimeMs = System.currentTimeMillis();
startTimeNs = System.nanoTime();
}

ByteArray key = readKey(inputStream);
List<Version> results = null;
try {
Expand All @@ -112,11 +120,25 @@ private void handleGetVersion(DataInputStream inputStream,
return;
}
outputStream.writeInt(results.size());

String clockStr = "";

for(Version v: results) {
byte[] clock = ((VectorClock) v).toBytes();

if(logger.isDebugEnabled())
clockStr += clock + " ";

outputStream.writeInt(clock.length);
outputStream.write(clock);
}

if(logger.isDebugEnabled()) {
logger.debug("GETVERSIONS started at: " + startTimeMs + " handlerRef: "
+ System.identityHashCode(this) + " key: " + key + " "
+ (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length()
+ "clocks: " + clockStr);
}
}

/**
Expand Down Expand Up @@ -269,6 +291,14 @@ private void writeResults(DataOutputStream outputStream, List<Versioned<byte[]>>
private void handleGet(DataInputStream inputStream,
DataOutputStream outputStream,
Store<ByteArray, byte[], byte[]> store) throws IOException {
long startTimeMs = -1;
long startTimeNs = -1;

if(logger.isDebugEnabled()) {
startTimeMs = System.currentTimeMillis();
startTimeNs = System.nanoTime();
}

ByteArray key = readKey(inputStream);

byte[] transforms = null;
Expand All @@ -286,11 +316,22 @@ private void handleGet(DataInputStream inputStream,
return;
}
writeResults(outputStream, results);
if(logger.isDebugEnabled()) {
debugLogReturnValue(key, results, startTimeMs, startTimeNs, "GET");
}
}

private void handleGetAll(DataInputStream inputStream,
DataOutputStream outputStream,
Store<ByteArray, byte[], byte[]> store) throws IOException {
long startTimeMs = -1;
long startTimeNs = -1;

if(logger.isDebugEnabled()) {
startTimeMs = System.currentTimeMillis();
startTimeNs = System.nanoTime();
}

// read keys
int numKeys = inputStream.readInt();
List<ByteArray> keys = new ArrayList<ByteArray>(numKeys);
Expand Down Expand Up @@ -321,18 +362,69 @@ private void handleGetAll(DataInputStream inputStream,

// write back the results
outputStream.writeInt(results.size());

if(logger.isDebugEnabled())
logger.debug("GETALL start");

for(Map.Entry<ByteArray, List<Versioned<byte[]>>> entry: results.entrySet()) {
// write the key
outputStream.writeInt(entry.getKey().length());
outputStream.write(entry.getKey().get());
// write the values
writeResults(outputStream, entry.getValue());

if(logger.isDebugEnabled()) {
debugLogReturnValue(entry.getKey(),
entry.getValue(),
startTimeMs,
startTimeNs,
"GETALL");
}
}

if(logger.isDebugEnabled())
logger.debug("GETALL end");
}

private void debugLogReturnValue(ByteArray key,
List<Versioned<byte[]>> values,
long startTimeMs,
long startTimeNs,
String getType) {
long totalValueSize = 0;
String valueSizeStr = "[";
String valueHashStr = "[";
String versionsStr = "[";
for(Versioned<byte[]> b: values) {
int len = b.getValue().length;
totalValueSize += len;
valueSizeStr += len + ",";
valueHashStr += b.hashCode() + ",";
versionsStr += b.getVersion();
}
valueSizeStr += "]";
valueHashStr += "]";
versionsStr += "]";

logger.debug(getType + " handlerRef: " + System.identityHashCode(this) + " start time: "
+ startTimeMs + " key: " + key + " elapsed time: "
+ (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length()
+ " numResults: " + values.size() + " totalResultSize: " + totalValueSize
+ " resultSizes: " + valueSizeStr + " resultHashes: " + valueHashStr
+ " versions: " + versionsStr + " current time: " + System.currentTimeMillis());
}

private void handlePut(DataInputStream inputStream,
DataOutputStream outputStream,
Store<ByteArray, byte[], byte[]> store) throws IOException {
long startTimeMs = -1;
long startTimeNs = -1;

if(logger.isDebugEnabled()) {
startTimeMs = System.currentTimeMillis();
startTimeNs = System.nanoTime();
}

ByteArray key = readKey(inputStream);
int valueSize = inputStream.readInt();
byte[] bytes = new byte[valueSize];
Expand All @@ -352,11 +444,28 @@ private void handlePut(DataInputStream inputStream,
} catch(VoldemortException e) {
writeException(outputStream, e);
}

if(logger.isDebugEnabled()) {
logger.debug("PUT started at: " + startTimeMs + " handlerRef: "
+ System.identityHashCode(this) + " key: " + key + " "
+ (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length()
+ " valueHash: " + value.hashCode() + " valueSize: " + valueSize
+ " clockSize: " + clock.sizeInBytes() + " time: "
+ System.currentTimeMillis());
}
}

private void handleDelete(DataInputStream inputStream,
DataOutputStream outputStream,
Store<ByteArray, byte[], byte[]> store) throws IOException {
long startTimeMs = -1;
long startTimeNs = -1;

if(logger.isDebugEnabled()) {
startTimeMs = System.currentTimeMillis();
startTimeNs = System.nanoTime();
}

ByteArray key = readKey(inputStream);
int versionSize = inputStream.readShort();
byte[] versionBytes = new byte[versionSize];
Expand All @@ -369,6 +478,13 @@ private void handleDelete(DataInputStream inputStream,
} catch(VoldemortException e) {
writeException(outputStream, e);
}

if(logger.isDebugEnabled()) {
logger.debug("DELETE started at: " + startTimeMs + " key: " + key + " handlerRef: "
+ System.identityHashCode(this) + " time: "
+ (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length()
+ " clockSize: " + version.sizeInBytes());
}
}

private void writeException(DataOutputStream stream, VoldemortException e) throws IOException {
Expand Down
69 changes: 69 additions & 0 deletions src/java/voldemort/store/bdb/BdbStorageEngine.java
Expand Up @@ -211,6 +211,11 @@ private <T> List<T> get(ByteArray key,
Serializer<T> serializer) throws PersistenceFailureException {
StoreUtils.assertValidKey(key);

long startTimeNs = -1;

if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();

Cursor cursor = null;
try {
cursor = getBdbDatabase().openCursor(null, null);
Expand All @@ -227,6 +232,13 @@ private <T> List<T> get(ByteArray key,
logger.error(e);
throw new PersistenceFailureException(e);
} finally {
if(logger.isTraceEnabled()) {
logger.trace("Completed GET from key " + key + " (keyRef: "
+ System.identityHashCode(key) + ") in "
+ (System.nanoTime() - startTimeNs) + " ns at "
+ System.currentTimeMillis());
}

attemptClose(cursor);
}
}
Expand All @@ -252,12 +264,25 @@ private Database getBdbDatabase() {
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
Map<ByteArray, byte[]> transforms)
throws VoldemortException {

long startTimeNs = -1;

if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();

StoreUtils.assertValidKeys(keys);
Map<ByteArray, List<Versioned<byte[]>>> result = StoreUtils.newEmptyHashMap(keys);
Cursor cursor = null;

String keyStr = "";

try {
cursor = getBdbDatabase().openCursor(null, null);
for(ByteArray key: keys) {

if(logger.isTraceEnabled())
keyStr += key + " ";

List<Versioned<byte[]>> values = get(cursor, key, readLockMode, versionedSerializer);
if(!values.isEmpty())
result.put(key, values);
Expand All @@ -268,6 +293,12 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
} finally {
attemptClose(cursor);
}

if(logger.isTraceEnabled())
logger.trace("Completed GETALL from keys " + keyStr + " in "
+ (System.nanoTime() - startTimeNs) + " ns at "
+ System.currentTimeMillis());

return result;
}

Expand All @@ -277,6 +308,11 @@ private static <T> List<T> get(Cursor cursor,
Serializer<T> serializer) throws DatabaseException {
StoreUtils.assertValidKey(key);

long startTimeNs = -1;

if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();

DatabaseEntry keyEntry = new DatabaseEntry(key.get());
DatabaseEntry valueEntry = new DatabaseEntry();
List<T> results = Lists.newArrayList();
Expand All @@ -286,13 +322,25 @@ private static <T> List<T> get(Cursor cursor,
lockMode)) {
results.add(serializer.toObject(valueEntry.getData()));
}

if(logger.isTraceEnabled()) {
logger.trace("Completed GET from key " + key + " in "
+ (System.nanoTime() - startTimeNs) + " ns at "
+ System.currentTimeMillis());
}

return results;
}

public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws PersistenceFailureException {
StoreUtils.assertValidKey(key);

long startTimeNs = -1;

if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();

DatabaseEntry keyEntry = new DatabaseEntry(key.get());
boolean succeeded = false;
Transaction transaction = null;
Expand Down Expand Up @@ -340,10 +388,23 @@ else if(occurred == Occurred.AFTER)
else
attemptAbort(transaction);
}

if(logger.isTraceEnabled()) {
logger.trace("Completed PUT to key " + key + " (keyRef: "
+ System.identityHashCode(key) + " value " + value + " in "
+ (System.nanoTime() - startTimeNs) + " ns at "
+ System.currentTimeMillis());
}
}

public boolean delete(ByteArray key, Version version) throws PersistenceFailureException {
StoreUtils.assertValidKey(key);

long startTimeNs = -1;

if(logger.isTraceEnabled())
startTimeNs = System.nanoTime();

boolean deletedSomething = false;
Cursor cursor = null;
Transaction transaction = null;
Expand All @@ -368,6 +429,14 @@ public boolean delete(ByteArray key, Version version) throws PersistenceFailureE
logger.error(e);
throw new PersistenceFailureException(e);
} finally {

if(logger.isTraceEnabled()) {
logger.trace("Completed DELETE of key " + key + " (keyRef: "
+ System.identityHashCode(key) + ") in "
+ (System.nanoTime() - startTimeNs) + " ns at "
+ System.currentTimeMillis());
}

try {
attemptClose(cursor);
} finally {
Expand Down

0 comments on commit b687f02

Please sign in to comment.