Skip to content

Commit

Permalink
Readded peter's change
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiewu committed Sep 6, 2012
1 parent a33b280 commit 878af04
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 74 deletions.
11 changes: 7 additions & 4 deletions src/java/voldemort/server/niosocket/AsyncRequestHandler.java
Expand Up @@ -127,13 +127,16 @@ protected void read(SelectionKey selectionKey) throws IOException {
if(logger.isTraceEnabled())
logger.trace("Starting execution for " + socketChannel.socket());

streamRequestHandler = requestHandler.handleRequest(new DataInputStream(inputStream),
new DataOutputStream(outputStream));
DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream);

streamRequestHandler = requestHandler.handleRequest(dataInputStream,
dataOutputStream);

if(logger.isDebugEnabled()) {
logger.debug("AsyncRequestHandler:read finished request from "
+ socketChannel.socket().getRemoteSocketAddress() + " handlerRef: "
+ System.identityHashCode(streamRequestHandler) + " at time: "
+ System.identityHashCode(dataInputStream) + " at time: "
+ System.currentTimeMillis() + " elapsed time: "
+ (System.nanoTime() - startNs) + " ns");
}
Expand Down Expand Up @@ -307,7 +310,7 @@ private StreamRequestHandlerState handleStreamRequestInternal(SelectionKey selec
if(logger.isDebugEnabled()) {
logger.debug("Handled request from "
+ socketChannel.socket().getRemoteSocketAddress() + " handlerRef: "
+ System.identityHashCode(streamRequestHandler) + " at time: "
+ System.identityHashCode(dataInputStream) + " at time: "
+ System.currentTimeMillis() + " elapsed time: "
+ (System.nanoTime() - startNs) + " ns");
}
Expand Down
Expand Up @@ -135,7 +135,8 @@ private void handleGetVersion(DataInputStream inputStream,

if(logger.isDebugEnabled()) {
logger.debug("GETVERSIONS started at: " + startTimeMs + " handlerRef: "
+ System.identityHashCode(this) + " key: " + key + " "
+ System.identityHashCode(inputStream) + " key: "
+ ByteUtils.toHexString(key.get()) + " "
+ (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length()
+ "clocks: " + clockStr);
}
Expand Down Expand Up @@ -317,7 +318,7 @@ private void handleGet(DataInputStream inputStream,
}
writeResults(outputStream, results);
if(logger.isDebugEnabled()) {
debugLogReturnValue(key, results, startTimeMs, startTimeNs, "GET");
debugLogReturnValue(inputStream, key, results, startTimeMs, startTimeNs, "GET");
}
}

Expand Down Expand Up @@ -374,7 +375,8 @@ private void handleGetAll(DataInputStream inputStream,
writeResults(outputStream, entry.getValue());

if(logger.isDebugEnabled()) {
debugLogReturnValue(entry.getKey(),
debugLogReturnValue(inputStream,
entry.getKey(),
entry.getValue(),
startTimeMs,
startTimeNs,
Expand All @@ -386,7 +388,8 @@ private void handleGetAll(DataInputStream inputStream,
logger.debug("GETALL end");
}

private void debugLogReturnValue(ByteArray key,
private void debugLogReturnValue(DataInputStream input,
ByteArray key,
List<Versioned<byte[]>> values,
long startTimeMs,
long startTimeNs,
Expand All @@ -406,12 +409,13 @@ private void debugLogReturnValue(ByteArray key,
valueHashStr += "]";
versionsStr += "]";

logger.debug(getType + " handlerRef: " + System.identityHashCode(this) + " start time: "
+ startTimeMs + " key: " + key + " elapsed time: "
+ (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length()
+ " numResults: " + values.size() + " totalResultSize: " + totalValueSize
+ " resultSizes: " + valueSizeStr + " resultHashes: " + valueHashStr
+ " versions: " + versionsStr + " current time: " + System.currentTimeMillis());
logger.debug(getType + " handlerRef: " + System.identityHashCode(input) + " start time: "
+ startTimeMs + " key: " + ByteUtils.toHexString(key.get())
+ " elapsed time: " + (System.nanoTime() - startTimeNs) + " ns, keySize: "
+ key.length() + " numResults: " + values.size() + " totalResultSize: "
+ totalValueSize + " resultSizes: " + valueSizeStr + " resultHashes: "
+ valueHashStr + " versions: " + versionsStr + " current time: "
+ System.currentTimeMillis());
}

private void handlePut(DataInputStream inputStream,
Expand Down Expand Up @@ -447,9 +451,10 @@ private void handlePut(DataInputStream inputStream,

if(logger.isDebugEnabled()) {
logger.debug("PUT started at: " + startTimeMs + " handlerRef: "
+ System.identityHashCode(this) + " key: " + key + " "
+ System.identityHashCode(inputStream) + " key: "
+ ByteUtils.toHexString(key.get()) + " "
+ (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length()
+ " valueHash: " + value.hashCode() + " valueSize: " + valueSize
+ " valueHash: " + value.hashCode() + " valueSize: " + value.length
+ " clockSize: " + clock.sizeInBytes() + " time: "
+ System.currentTimeMillis());
}
Expand Down Expand Up @@ -480,8 +485,9 @@ private void handleDelete(DataInputStream inputStream,
}

if(logger.isDebugEnabled()) {
logger.debug("DELETE started at: " + startTimeMs + " key: " + key + " handlerRef: "
+ System.identityHashCode(this) + " time: "
logger.debug("DELETE started at: " + startTimeMs + " key: "
+ ByteUtils.toHexString(key.get()) + " handlerRef: "
+ System.identityHashCode(inputStream) + " time: "
+ (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length()
+ " clockSize: " + version.sizeInBytes());
}
Expand Down
10 changes: 5 additions & 5 deletions src/java/voldemort/store/bdb/BdbStorageEngine.java
Expand Up @@ -281,7 +281,7 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
for(ByteArray key: keys) {

if(logger.isTraceEnabled())
keyStr += key + " ";
keyStr += ByteUtils.toHexString(key.get()) + " ";

List<Versioned<byte[]>> values = get(cursor, key, readLockMode, versionedSerializer);
if(!values.isEmpty())
Expand Down Expand Up @@ -324,7 +324,7 @@ private static <T> List<T> get(Cursor cursor,
}

if(logger.isTraceEnabled()) {
logger.trace("Completed GET from key " + key + " in "
logger.trace("Completed GET from key " + ByteUtils.toHexString(key.get()) + " in "
+ (System.nanoTime() - startTimeNs) + " ns at "
+ System.currentTimeMillis());
}
Expand Down Expand Up @@ -390,7 +390,7 @@ else if(occurred == Occurred.AFTER)
}

if(logger.isTraceEnabled()) {
logger.trace("Completed PUT to key " + key + " (keyRef: "
logger.trace("Completed PUT to key " + ByteUtils.toHexString(key.get()) + " (keyRef: "
+ System.identityHashCode(key) + " value " + value + " in "
+ (System.nanoTime() - startTimeNs) + " ns at "
+ System.currentTimeMillis());
Expand Down Expand Up @@ -431,8 +431,8 @@ public boolean delete(ByteArray key, Version version) throws PersistenceFailureE
} finally {

if(logger.isTraceEnabled()) {
logger.trace("Completed DELETE of key " + key + " (keyRef: "
+ System.identityHashCode(key) + ") in "
logger.trace("Completed DELETE of key " + ByteUtils.toHexString(key.get())
+ " (keyRef: " + System.identityHashCode(key) + ") in "
+ (System.nanoTime() - startTimeNs) + " ns at "
+ System.currentTimeMillis());
}
Expand Down
45 changes: 26 additions & 19 deletions src/java/voldemort/store/routed/PipelineRoutedStore.java
Expand Up @@ -254,9 +254,10 @@ public List<Versioned<byte[]>> request(Store<ByteArray, byte[], byte[]> store) {
}

if(logger.isDebugEnabled()) {
logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key " + key
+ " keyRef: " + System.identityHashCode(key) + "; started at "
+ startTimeMs + " took " + (System.nanoTime() - startTimeNs) + " values: "
logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key "
+ ByteUtils.toHexString(key.get()) + " keyRef: "
+ System.identityHashCode(key) + "; started at " + startTimeMs + " took "
+ (System.nanoTime() - startTimeNs) + " values: "
+ formatNodeValuesFromGet(pipelineData.getResponses()));
}

Expand Down Expand Up @@ -358,9 +359,10 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
throw pipelineData.getFatalError();

if(logger.isDebugEnabled()) {
logger.debug("Finished " + pipeline.getOperation().getSimpleName() + "for keys " + keys
+ " keyRef: " + System.identityHashCode(keys) + "; started at "
+ startTimeMs + " took " + (System.nanoTime() - startTimeNs) + " values: "
logger.debug("Finished " + pipeline.getOperation().getSimpleName() + "for keys "
+ ByteArray.toHexStrings(keys) + " keyRef: "
+ System.identityHashCode(keys) + "; started at " + startTimeMs + " took "
+ (System.nanoTime() - startTimeNs) + " values: "
+ formatNodeValuesFromGetAll(pipelineData.getResponses()));
}

Expand All @@ -372,8 +374,9 @@ private String formatNodeValuesFromGetAll(List<Response<Iterable<ByteArray>, Map
StringBuilder builder = new StringBuilder();
builder.append("{");
for(Response<Iterable<ByteArray>, Map<ByteArray, List<Versioned<byte[]>>>> r: list) {
builder.append("(nodeId=" + r.getNode().getId() + ", key=" + r.getKey()
+ ", retrieved= " + r.getValue() + ")");
builder.append("(nodeId=" + r.getNode().getId() + ", keys="
+ ByteArray.toHexStrings(r.getKey()) + ", retrieved= " + r.getValue()
+ ")");
builder.append(", ");
}
builder.append("}");
Expand Down Expand Up @@ -472,9 +475,10 @@ public List<Version> request(Store<ByteArray, byte[], byte[]> store) {
results.addAll(response.getValue());

if(logger.isDebugEnabled()) {
logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key " + key
+ " keyRef: " + System.identityHashCode(key) + "; started at "
+ startTimeMs + " took " + (System.nanoTime() - startTimeNs) + " values: "
logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key "
+ ByteUtils.toHexString(key.get()) + " keyRef: "
+ System.identityHashCode(key) + "; started at " + startTimeMs + " took "
+ (System.nanoTime() - startTimeNs) + " values: "
+ formatNodeValuesFromGetVersions(pipelineData.getResponses()));
}

Expand All @@ -486,8 +490,9 @@ private <R> String formatNodeValuesFromGetVersions(List<Response<ByteArray, List
StringBuilder builder = new StringBuilder();
builder.append("{");
for(Response<ByteArray, List<Version>> r: results) {
builder.append("(nodeId=" + r.getNode().getId() + ", key=" + r.getKey()
+ ", retrieved= " + r.getValue() + "), ");
builder.append("(nodeId=" + r.getNode().getId() + ", key="
+ ByteUtils.toHexString(r.getKey().get()) + ", retrieved= "
+ r.getValue() + "), ");
}
builder.append("}");

Expand Down Expand Up @@ -578,8 +583,9 @@ public boolean delete(final ByteArray key, final Version version) throws Voldemo

if(logger.isDebugEnabled()) {
logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key "
+ key.get() + " keyRef: " + System.identityHashCode(key) + "; started at "
+ startTimeMs + " took " + (System.nanoTime() - startTimeNs));
+ ByteUtils.toHexString(key.get()) + " keyRef: "
+ System.identityHashCode(key) + "; started at " + startTimeMs + " took "
+ (System.nanoTime() - startTimeNs));
}

if(pipelineData.getFatalError() != null)
Expand Down Expand Up @@ -703,10 +709,11 @@ public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
}

if(logger.isDebugEnabled()) {
logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key " + key
+ " keyRef: " + System.identityHashCode(key) + "; started at "
+ startTimeMs + " took " + (System.nanoTime() - startTimeNs) + " value: "
+ versioned.getValue() + " (size: " + versioned.getValue().length + ")");
logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key "
+ ByteUtils.toHexString(key.get()) + " keyRef: "
+ System.identityHashCode(key) + "; started at " + startTimeMs + " took "
+ (System.nanoTime() - startTimeNs) + " value: " + versioned.getValue()
+ " (size: " + versioned.getValue().length + ")");
}

if(pipelineData.getFatalError() != null)
Expand Down
11 changes: 7 additions & 4 deletions src/java/voldemort/store/routed/action/AbstractReadRepair.java
Expand Up @@ -24,10 +24,11 @@
import voldemort.store.nonblockingstore.NonblockingStore;
import voldemort.store.routed.NodeValue;
import voldemort.store.routed.Pipeline;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.routed.PipelineData;
import voldemort.store.routed.ReadRepairer;
import voldemort.store.routed.Pipeline.Event;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;

Expand Down Expand Up @@ -101,7 +102,8 @@ public void execute(Pipeline pipeline) {
try {
if(logger.isDebugEnabled())
logger.debug("Doing read repair on node " + v.getNodeId() + " for key '"
+ v.getKey() + "' with version " + v.getVersion() + ".");
+ ByteUtils.toHexString(v.getKey().get()) + "' with version "
+ v.getVersion() + ".");

NonblockingStore store = nonblockingStores.get(v.getNodeId());
store.submitPutRequest(v.getKey(), v.getVersioned(), null, null, timeoutMs);
Expand All @@ -110,8 +112,9 @@ public void execute(Pipeline pipeline) {
logger.debug("Read repair cancelled due to application level exception on node "
+ v.getNodeId()
+ " for key '"
+ v.getKey()
+ "' with version " + v.getVersion() + ": " + e.getMessage());
+ ByteUtils.toHexString(v.getKey().get())
+ "' with version "
+ v.getVersion() + ": " + e.getMessage());
} catch(Exception e) {
logger.debug("Read repair failed: ", e);
}
Expand Down
Expand Up @@ -93,9 +93,9 @@ public void execute(final Pipeline pipeline) {

public void requestComplete(Object result, long requestTime) {
if(logger.isTraceEnabled())
logger.info(pipeline.getOperation().getSimpleName()
+ " response received (" + requestTime + " ms.) from node "
+ node.getId());
logger.trace(pipeline.getOperation().getSimpleName()
+ " response received (" + requestTime + " ms.) from node "
+ node.getId());

Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
key,
Expand Down
Expand Up @@ -42,6 +42,7 @@
import voldemort.store.slop.HintedHandoff;
import voldemort.store.slop.Slop;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.Time;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Versioned;
Expand Down Expand Up @@ -130,7 +131,8 @@ public void requestComplete(Object result, long requestTime) {
responses.put(node.getId(), response);

if(logger.isDebugEnabled())
logger.debug("Finished secondary PUT for key " + key + " (keyRef: "
logger.debug("Finished secondary PUT for key "
+ ByteUtils.toHexString(key.get()) + " (keyRef: "
+ System.identityHashCode(key) + "); took " + requestTime
+ " ms on node " + node.getId() + "(" + node.getHost() + ")");

Expand Down

0 comments on commit 878af04

Please sign in to comment.