Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #83 from pbailis/master

Debug messages for initial Voldemort profiling
  • Loading branch information...
commit b687f0229102fa8893ac4d6d6b84ca0f33c7fbfd 2 parents 70718b4 + fa3d6f6
@zhongjiewu zhongjiewu authored
View
28 src/java/voldemort/server/niosocket/AsyncRequestHandler.java
@@ -76,6 +76,11 @@ public AsyncRequestHandler(Selector selector,
protected void read(SelectionKey selectionKey) throws IOException {
int count = 0;
+ long startNs = -1;
+
+ if(logger.isDebugEnabled())
+ startNs = System.nanoTime();
+
if((count = socketChannel.read(inputStream.getBuffer())) == -1)
throw new EOFException("EOF for " + socketChannel.socket());
@@ -125,6 +130,14 @@ protected void read(SelectionKey selectionKey) throws IOException {
streamRequestHandler = requestHandler.handleRequest(new DataInputStream(inputStream),
new DataOutputStream(outputStream));
+ if(logger.isDebugEnabled()) {
+ logger.debug("AsyncRequestHandler:read finished request from "
+ + socketChannel.socket().getRemoteSocketAddress() + " handlerRef: "
+ + System.identityHashCode(streamRequestHandler) + " at time: "
+ + System.currentTimeMillis() + " elapsed time: "
+ + (System.nanoTime() - startNs) + " ns");
+ }
+
if(streamRequestHandler != null) {
// In the case of a StreamRequestHandler, we handle that separately
// (attempting to process multiple "segments").
@@ -282,8 +295,23 @@ private StreamRequestHandlerState handleStreamRequestInternal(SelectionKey selec
if(logger.isTraceEnabled())
traceInputBufferState("Before streaming request handler");
+ // this is the lowest level in the NioSocketServer stack at which we
+ // still have a reference to the client IP address and port
+ long startNs = -1;
+
+ if(logger.isDebugEnabled())
+ startNs = System.nanoTime();
+
state = streamRequestHandler.handleRequest(dataInputStream, dataOutputStream);
+ if(logger.isDebugEnabled()) {
+ logger.debug("Handled request from "
+ + socketChannel.socket().getRemoteSocketAddress() + " handlerRef: "
+ + System.identityHashCode(streamRequestHandler) + " at time: "
+ + System.currentTimeMillis() + " elapsed time: "
+ + (System.nanoTime() - startNs) + " ns");
+ }
+
if(logger.isTraceEnabled())
traceInputBufferState("After streaming request handler");
} catch(Exception e) {
View
116 src/java/voldemort/server/protocol/vold/VoldemortNativeRequestHandler.java
@@ -101,6 +101,14 @@ private RequestRoutingType getRoutingType(DataInputStream inputStream) throws IO
private void handleGetVersion(DataInputStream inputStream,
DataOutputStream outputStream,
Store<ByteArray, byte[], byte[]> store) throws IOException {
+ long startTimeMs = -1;
+ long startTimeNs = -1;
+
+ if(logger.isDebugEnabled()) {
+ startTimeMs = System.currentTimeMillis();
+ startTimeNs = System.nanoTime();
+ }
+
ByteArray key = readKey(inputStream);
List<Version> results = null;
try {
@@ -112,11 +120,25 @@ private void handleGetVersion(DataInputStream inputStream,
return;
}
outputStream.writeInt(results.size());
+
+ String clockStr = "";
+
for(Version v: results) {
byte[] clock = ((VectorClock) v).toBytes();
+
+ if(logger.isDebugEnabled())
+ clockStr += clock + " ";
+
outputStream.writeInt(clock.length);
outputStream.write(clock);
}
+
+ if(logger.isDebugEnabled()) {
+ logger.debug("GETVERSIONS started at: " + startTimeMs + " handlerRef: "
+ + System.identityHashCode(this) + " key: " + key + " "
+ + (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length()
+ + "clocks: " + clockStr);
+ }
}
/**
@@ -269,6 +291,14 @@ private void writeResults(DataOutputStream outputStream, List<Versioned<byte[]>>
private void handleGet(DataInputStream inputStream,
DataOutputStream outputStream,
Store<ByteArray, byte[], byte[]> store) throws IOException {
+ long startTimeMs = -1;
+ long startTimeNs = -1;
+
+ if(logger.isDebugEnabled()) {
+ startTimeMs = System.currentTimeMillis();
+ startTimeNs = System.nanoTime();
+ }
+
ByteArray key = readKey(inputStream);
byte[] transforms = null;
@@ -286,11 +316,22 @@ private void handleGet(DataInputStream inputStream,
return;
}
writeResults(outputStream, results);
+ if(logger.isDebugEnabled()) {
+ debugLogReturnValue(key, results, startTimeMs, startTimeNs, "GET");
+ }
}
private void handleGetAll(DataInputStream inputStream,
DataOutputStream outputStream,
Store<ByteArray, byte[], byte[]> store) throws IOException {
+ long startTimeMs = -1;
+ long startTimeNs = -1;
+
+ if(logger.isDebugEnabled()) {
+ startTimeMs = System.currentTimeMillis();
+ startTimeNs = System.nanoTime();
+ }
+
// read keys
int numKeys = inputStream.readInt();
List<ByteArray> keys = new ArrayList<ByteArray>(numKeys);
@@ -321,18 +362,69 @@ private void handleGetAll(DataInputStream inputStream,
// write back the results
outputStream.writeInt(results.size());
+
+ if(logger.isDebugEnabled())
+ logger.debug("GETALL start");
+
for(Map.Entry<ByteArray, List<Versioned<byte[]>>> entry: results.entrySet()) {
// write the key
outputStream.writeInt(entry.getKey().length());
outputStream.write(entry.getKey().get());
// write the values
writeResults(outputStream, entry.getValue());
+
+ if(logger.isDebugEnabled()) {
+ debugLogReturnValue(entry.getKey(),
+ entry.getValue(),
+ startTimeMs,
+ startTimeNs,
+ "GETALL");
+ }
}
+
+ if(logger.isDebugEnabled())
+ logger.debug("GETALL end");
+ }
+
+ private void debugLogReturnValue(ByteArray key,
+ List<Versioned<byte[]>> values,
+ long startTimeMs,
+ long startTimeNs,
+ String getType) {
+ long totalValueSize = 0;
+ String valueSizeStr = "[";
+ String valueHashStr = "[";
+ String versionsStr = "[";
+ for(Versioned<byte[]> b: values) {
+ int len = b.getValue().length;
+ totalValueSize += len;
+ valueSizeStr += len + ",";
+ valueHashStr += b.hashCode() + ",";
+ versionsStr += b.getVersion();
+ }
+ valueSizeStr += "]";
+ valueHashStr += "]";
+ versionsStr += "]";
+
+ logger.debug(getType + " handlerRef: " + System.identityHashCode(this) + " start time: "
+ + startTimeMs + " key: " + key + " elapsed time: "
+ + (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length()
+ + " numResults: " + values.size() + " totalResultSize: " + totalValueSize
+ + " resultSizes: " + valueSizeStr + " resultHashes: " + valueHashStr
+ + " versions: " + versionsStr + " current time: " + System.currentTimeMillis());
}
private void handlePut(DataInputStream inputStream,
DataOutputStream outputStream,
Store<ByteArray, byte[], byte[]> store) throws IOException {
+ long startTimeMs = -1;
+ long startTimeNs = -1;
+
+ if(logger.isDebugEnabled()) {
+ startTimeMs = System.currentTimeMillis();
+ startTimeNs = System.nanoTime();
+ }
+
ByteArray key = readKey(inputStream);
int valueSize = inputStream.readInt();
byte[] bytes = new byte[valueSize];
@@ -352,11 +444,28 @@ private void handlePut(DataInputStream inputStream,
} catch(VoldemortException e) {
writeException(outputStream, e);
}
+
+ if(logger.isDebugEnabled()) {
+ logger.debug("PUT started at: " + startTimeMs + " handlerRef: "
+ + System.identityHashCode(this) + " key: " + key + " "
+ + (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length()
+ + " valueHash: " + value.hashCode() + " valueSize: " + valueSize
+ + " clockSize: " + clock.sizeInBytes() + " time: "
+ + System.currentTimeMillis());
+ }
}
private void handleDelete(DataInputStream inputStream,
DataOutputStream outputStream,
Store<ByteArray, byte[], byte[]> store) throws IOException {
+ long startTimeMs = -1;
+ long startTimeNs = -1;
+
+ if(logger.isDebugEnabled()) {
+ startTimeMs = System.currentTimeMillis();
+ startTimeNs = System.nanoTime();
+ }
+
ByteArray key = readKey(inputStream);
int versionSize = inputStream.readShort();
byte[] versionBytes = new byte[versionSize];
@@ -369,6 +478,13 @@ private void handleDelete(DataInputStream inputStream,
} catch(VoldemortException e) {
writeException(outputStream, e);
}
+
+ if(logger.isDebugEnabled()) {
+ logger.debug("DELETE started at: " + startTimeMs + " key: " + key + " handlerRef: "
+ + System.identityHashCode(this) + " time: "
+ + (System.nanoTime() - startTimeNs) + " ns, keySize: " + key.length()
+ + " clockSize: " + version.sizeInBytes());
+ }
}
private void writeException(DataOutputStream stream, VoldemortException e) throws IOException {
View
69 src/java/voldemort/store/bdb/BdbStorageEngine.java
@@ -211,6 +211,11 @@ private boolean reopenBdbDatabase() {
Serializer<T> serializer) throws PersistenceFailureException {
StoreUtils.assertValidKey(key);
+ long startTimeNs = -1;
+
+ if(logger.isTraceEnabled())
+ startTimeNs = System.nanoTime();
+
Cursor cursor = null;
try {
cursor = getBdbDatabase().openCursor(null, null);
@@ -227,6 +232,13 @@ private boolean reopenBdbDatabase() {
logger.error(e);
throw new PersistenceFailureException(e);
} finally {
+ if(logger.isTraceEnabled()) {
+ logger.trace("Completed GET from key " + key + " (keyRef: "
+ + System.identityHashCode(key) + ") in "
+ + (System.nanoTime() - startTimeNs) + " ns at "
+ + System.currentTimeMillis());
+ }
+
attemptClose(cursor);
}
}
@@ -252,12 +264,25 @@ private Database getBdbDatabase() {
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
Map<ByteArray, byte[]> transforms)
throws VoldemortException {
+
+ long startTimeNs = -1;
+
+ if(logger.isTraceEnabled())
+ startTimeNs = System.nanoTime();
+
StoreUtils.assertValidKeys(keys);
Map<ByteArray, List<Versioned<byte[]>>> result = StoreUtils.newEmptyHashMap(keys);
Cursor cursor = null;
+
+ String keyStr = "";
+
try {
cursor = getBdbDatabase().openCursor(null, null);
for(ByteArray key: keys) {
+
+ if(logger.isTraceEnabled())
+ keyStr += key + " ";
+
List<Versioned<byte[]>> values = get(cursor, key, readLockMode, versionedSerializer);
if(!values.isEmpty())
result.put(key, values);
@@ -268,6 +293,12 @@ private Database getBdbDatabase() {
} finally {
attemptClose(cursor);
}
+
+ if(logger.isTraceEnabled())
+ logger.trace("Completed GETALL from keys " + keyStr + " in "
+ + (System.nanoTime() - startTimeNs) + " ns at "
+ + System.currentTimeMillis());
+
return result;
}
@@ -277,6 +308,11 @@ private Database getBdbDatabase() {
Serializer<T> serializer) throws DatabaseException {
StoreUtils.assertValidKey(key);
+ long startTimeNs = -1;
+
+ if(logger.isTraceEnabled())
+ startTimeNs = System.nanoTime();
+
DatabaseEntry keyEntry = new DatabaseEntry(key.get());
DatabaseEntry valueEntry = new DatabaseEntry();
List<T> results = Lists.newArrayList();
@@ -286,6 +322,13 @@ private Database getBdbDatabase() {
lockMode)) {
results.add(serializer.toObject(valueEntry.getData()));
}
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Completed GET from key " + key + " in "
+ + (System.nanoTime() - startTimeNs) + " ns at "
+ + System.currentTimeMillis());
+ }
+
return results;
}
@@ -293,6 +336,11 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws PersistenceFailureException {
StoreUtils.assertValidKey(key);
+ long startTimeNs = -1;
+
+ if(logger.isTraceEnabled())
+ startTimeNs = System.nanoTime();
+
DatabaseEntry keyEntry = new DatabaseEntry(key.get());
boolean succeeded = false;
Transaction transaction = null;
@@ -340,10 +388,23 @@ else if(occurred == Occurred.AFTER)
else
attemptAbort(transaction);
}
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Completed PUT to key " + key + " (keyRef: "
+ + System.identityHashCode(key) + " value " + value + " in "
+ + (System.nanoTime() - startTimeNs) + " ns at "
+ + System.currentTimeMillis());
+ }
}
public boolean delete(ByteArray key, Version version) throws PersistenceFailureException {
StoreUtils.assertValidKey(key);
+
+ long startTimeNs = -1;
+
+ if(logger.isTraceEnabled())
+ startTimeNs = System.nanoTime();
+
boolean deletedSomething = false;
Cursor cursor = null;
Transaction transaction = null;
@@ -368,6 +429,14 @@ public boolean delete(ByteArray key, Version version) throws PersistenceFailureE
logger.error(e);
throw new PersistenceFailureException(e);
} finally {
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Completed DELETE of key " + key + " (keyRef: "
+ + System.identityHashCode(key) + ") in "
+ + (System.nanoTime() - startTimeNs) + " ns at "
+ + System.currentTimeMillis());
+ }
+
try {
attemptClose(cursor);
} finally {
View
117 src/java/voldemort/store/routed/PipelineRoutedStore.java
@@ -144,6 +144,14 @@ public PipelineRoutedStore(String name,
public List<Versioned<byte[]>> get(final ByteArray key, final byte[] transforms) {
StoreUtils.assertValidKey(key);
+ long startTimeMs = -1;
+ long startTimeNs = -1;
+
+ if(logger.isDebugEnabled()) {
+ startTimeMs = System.currentTimeMillis();
+ startTimeNs = System.nanoTime();
+ }
+
BasicPipelineData<List<Versioned<byte[]>>> pipelineData = new BasicPipelineData<List<Versioned<byte[]>>>();
if(zoneRoutingEnabled)
pipelineData.setZonesRequired(storeDef.getZoneCountReads());
@@ -242,14 +250,42 @@ public PipelineRoutedStore(String name,
results.addAll(value);
}
+ if(logger.isDebugEnabled()) {
+ logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key " + key
+ + " keyRef: " + System.identityHashCode(key) + "; started at "
+ + startTimeMs + " took " + (System.nanoTime() - startTimeNs) + " values: "
+ + formatNodeValuesFromGet(pipelineData.getResponses()));
+ }
+
return results;
}
+ private String formatNodeValuesFromGet(List<Response<ByteArray, List<Versioned<byte[]>>>> results) {
+ // log all retrieved values
+ StringBuilder builder = new StringBuilder();
+ builder.append("{");
+ for(Response<ByteArray, List<Versioned<byte[]>>> r: results) {
+ builder.append("(nodeId=" + r.getNode().getId() + ", key=" + r.getKey()
+ + ", retrieved= " + r.getValue() + "), ");
+ }
+ builder.append("}");
+
+ return builder.toString();
+ }
+
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
Map<ByteArray, byte[]> transforms)
throws VoldemortException {
StoreUtils.assertValidKeys(keys);
+ long startTimeMs = -1;
+ long startTimeNs = -1;
+
+ if(logger.isDebugEnabled()) {
+ startTimeMs = System.currentTimeMillis();
+ startTimeNs = System.nanoTime();
+ }
+
boolean allowReadRepair = repairReads && (transforms == null || transforms.size() == 0);
GetAllPipelineData pipelineData = new GetAllPipelineData();
@@ -318,12 +354,41 @@ public PipelineRoutedStore(String name,
if(pipelineData.getFatalError() != null)
throw pipelineData.getFatalError();
+ if(logger.isDebugEnabled()) {
+ logger.debug("Finished " + pipeline.getOperation().getSimpleName() + "for keys " + keys
+ + " keyRef: " + System.identityHashCode(keys) + "; started at "
+ + startTimeMs + " took " + (System.nanoTime() - startTimeNs) + " values: "
+ + formatNodeValuesFromGetAll(pipelineData.getResponses()));
+ }
+
return pipelineData.getResult();
}
+ private String formatNodeValuesFromGetAll(List<Response<Iterable<ByteArray>, Map<ByteArray, List<Versioned<byte[]>>>>> list) {
+ // log all retrieved values
+ StringBuilder builder = new StringBuilder();
+ builder.append("{");
+ for(Response<Iterable<ByteArray>, Map<ByteArray, List<Versioned<byte[]>>>> r: list) {
+ builder.append("(nodeId=" + r.getNode().getId() + ", key=" + r.getKey()
+ + ", retrieved= " + r.getValue() + ")");
+ builder.append(", ");
+ }
+ builder.append("}");
+
+ return builder.toString();
+ }
+
public List<Version> getVersions(final ByteArray key) {
StoreUtils.assertValidKey(key);
+ long startTimeMs = -1;
+ long startTimeNs = -1;
+
+ if(logger.isDebugEnabled()) {
+ startTimeMs = System.currentTimeMillis();
+ startTimeNs = System.nanoTime();
+ }
+
BasicPipelineData<List<Version>> pipelineData = new BasicPipelineData<List<Version>>();
if(zoneRoutingEnabled)
pipelineData.setZonesRequired(storeDef.getZoneCountReads());
@@ -385,7 +450,7 @@ public PipelineRoutedStore(String name,
pipeline.addEvent(Event.STARTED);
if(logger.isDebugEnabled()) {
- logger.debug("Operation " + pipeline.getOperation().getSimpleName() + "Key "
+ logger.debug("Operation " + pipeline.getOperation().getSimpleName() + " Key "
+ ByteUtils.toHexString(key.get()));
}
try {
@@ -403,12 +468,40 @@ public PipelineRoutedStore(String name,
for(Response<ByteArray, List<Version>> response: pipelineData.getResponses())
results.addAll(response.getValue());
+ if(logger.isDebugEnabled()) {
+ logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key " + key
+ + " keyRef: " + System.identityHashCode(key) + "; started at "
+ + startTimeMs + " took " + (System.nanoTime() - startTimeNs) + " values: "
+ + formatNodeValuesFromGetVersions(pipelineData.getResponses()));
+ }
+
return results;
}
+ private <R> String formatNodeValuesFromGetVersions(List<Response<ByteArray, List<Version>>> results) {
+ // log all retrieved values
+ StringBuilder builder = new StringBuilder();
+ builder.append("{");
+ for(Response<ByteArray, List<Version>> r: results) {
+ builder.append("(nodeId=" + r.getNode().getId() + ", key=" + r.getKey()
+ + ", retrieved= " + r.getValue() + "), ");
+ }
+ builder.append("}");
+
+ return builder.toString();
+ }
+
public boolean delete(final ByteArray key, final Version version) throws VoldemortException {
StoreUtils.assertValidKey(key);
+ long startTimeMs = -1;
+ long startTimeNs = -1;
+
+ if(logger.isDebugEnabled()) {
+ startTimeMs = System.currentTimeMillis();
+ startTimeNs = System.nanoTime();
+ }
+
BasicPipelineData<Boolean> pipelineData = new BasicPipelineData<Boolean>();
if(zoneRoutingEnabled)
pipelineData.setZonesRequired(storeDef.getZoneCountWrites());
@@ -480,6 +573,12 @@ public boolean delete(final ByteArray key, final Version version) throws Voldemo
throw e;
}
+ if(logger.isDebugEnabled()) {
+ logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key "
+ + key.get() + " keyRef: " + System.identityHashCode(key) + "; started at "
+ + startTimeMs + " took " + (System.nanoTime() - startTimeNs));
+ }
+
if(pipelineData.getFatalError() != null)
throw pipelineData.getFatalError();
@@ -497,6 +596,15 @@ public boolean isHintedHandoffEnabled() {
public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
throws VoldemortException {
+
+ long startTimeMs = -1;
+ long startTimeNs = -1;
+
+ if(logger.isDebugEnabled()) {
+ startTimeMs = System.currentTimeMillis();
+ startTimeNs = System.nanoTime();
+ }
+
StoreUtils.assertValidKey(key);
PutPipelineData pipelineData = new PutPipelineData();
if(zoneRoutingEnabled)
@@ -591,6 +699,13 @@ public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
throw e;
}
+ if(logger.isDebugEnabled()) {
+ logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key " + key
+ + " keyRef: " + System.identityHashCode(key) + "; started at "
+ + startTimeMs + " took " + (System.nanoTime() - startTimeNs) + " value: "
+ + versioned.getValue() + " (size: " + versioned.getValue().length + ")");
+ }
+
if(pipelineData.getFatalError() != null)
throw pipelineData.getFatalError();
}
View
14 src/java/voldemort/store/routed/action/AbstractReadRepair.java
@@ -75,6 +75,11 @@ protected void insertNodeValue(Node node, ByteArray key, List<Versioned<byte[]>>
public void execute(Pipeline pipeline) {
insertNodeValues();
+ long startTimeNs = -1;
+
+ if(logger.isTraceEnabled())
+ startTimeNs = System.nanoTime();
+
if(nodeValues.size() > 1 && preferred > 1) {
List<NodeValue<ByteArray, byte[]>> toReadRepair = Lists.newArrayList();
@@ -111,6 +116,15 @@ public void execute(Pipeline pipeline) {
logger.debug("Read repair failed: ", e);
}
}
+
+ if(logger.isDebugEnabled()) {
+ String logStr = "Repaired (node, key, version): (";
+ for(NodeValue<ByteArray, byte[]> v: toReadRepair) {
+ logStr += "(" + v.getNodeId() + ", " + v.getKey() + "," + v.getVersion() + ") ";
+ }
+ logStr += "in " + (System.nanoTime() - startTimeNs) + " ns";
+ logger.debug(logStr);
+ }
}
pipeline.addEvent(completeEvent);
View
5 src/java/voldemort/store/routed/action/PerformParallelPutRequests.java
@@ -129,6 +129,11 @@ public void requestComplete(Object result, long requestTime) {
requestTime);
responses.put(node.getId(), response);
+ if(logger.isDebugEnabled())
+ logger.debug("Finished secondary PUT for key " + key + " (keyRef: "
+ + System.identityHashCode(key) + "); took " + requestTime
+ + " ms on node " + node.getId() + "(" + node.getHost() + ")");
+
if(isHintedHandoffEnabled() && pipeline.isFinished()) {
if(response.getValue() instanceof UnreachableStoreException) {
Slop slop = new Slop(pipelineData.getStoreName(),
View
14 src/java/voldemort/store/routed/action/PerformParallelRequests.java
@@ -95,6 +95,8 @@ public void execute(final Pipeline pipeline) {
final Node node = nodes.get(i);
pipelineData.incrementNodeIndex();
+ final long startMs = logger.isDebugEnabled() ? System.currentTimeMillis() : -1;
+
NonblockingStoreCallback callback = new NonblockingStoreCallback() {
public void requestComplete(Object result, long requestTime) {
@@ -107,6 +109,13 @@ public void requestComplete(Object result, long requestTime) {
key,
result,
requestTime);
+ if(logger.isDebugEnabled())
+ logger.debug("Finished " + pipeline.getOperation().getSimpleName()
+ + " for key " + key + " (keyRef: "
+ + System.identityHashCode(key) + "); started at " + startMs
+ + " took " + requestTime + " ms on node " + node.getId() + "("
+ + node.getHost() + ")");
+
responses.put(node.getId(), response);
latch.countDown();
@@ -164,6 +173,11 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS)
}
}
+ if(logger.isDebugEnabled())
+ logger.debug("GET for key " + key + " (keyRef: " + System.identityHashCode(key)
+ + "); successes: " + pipelineData.getSuccesses() + " preferred: "
+ + preferred + " required: " + required);
+
if(pipelineData.getSuccesses() < required) {
if(insufficientSuccessesEvent != null) {
pipeline.addEvent(insufficientSuccessesEvent);
View
12 src/java/voldemort/store/routed/action/PerformSerialGetAllRequests.java
@@ -79,6 +79,11 @@ public void execute(Pipeline pipeline) {
boolean zoneRequirement = false;
MutableInt successCount = pipelineData.getSuccessCount(key);
+ if(logger.isDebugEnabled())
+ logger.debug("GETALL for key " + key + " (keyRef: " + System.identityHashCode(key)
+ + ") successes: " + successCount.intValue() + " preferred: " + preferred
+ + " required: " + required);
+
if(successCount.intValue() >= preferred) {
if(pipelineData.getZonesRequired() != null) {
@@ -132,6 +137,13 @@ public void execute(Pipeline pipeline) {
pipelineData.getResponses().add(response);
failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
+ if(logger.isDebugEnabled())
+ logger.debug("GET for key " + key + " (keyRef: "
+ + System.identityHashCode(key) + ") successes: "
+ + successCount.intValue() + " preferred: " + preferred
+ + " required: " + required + " new GET success on node "
+ + node.getId());
+
HashSet<Integer> zoneResponses = null;
if(pipelineData.getKeyToZoneResponse().containsKey(key)) {
zoneResponses = pipelineData.getKeyToZoneResponse().get(key);
View
40 src/java/voldemort/store/routed/action/PerformSerialPutRequests.java
@@ -74,11 +74,20 @@ public void execute(Pipeline pipeline) {
int currentNode = 0;
List<Node> nodes = pipelineData.getNodes();
+ long startMasterMs = -1;
+ long startMasterNs = -1;
+
+ if(logger.isDebugEnabled()) {
+ startMasterMs = System.currentTimeMillis();
+ startMasterNs = System.nanoTime();
+ }
+
if(logger.isDebugEnabled())
logger.debug("Performing serial put requests to determine master");
+ Node node = null;
for(; currentNode < nodes.size(); currentNode++) {
- Node node = nodes.get(currentNode);
+ node = nodes.get(currentNode);
pipelineData.incrementNodeIndex();
VectorClock versionedClock = (VectorClock) versioned.getVersion();
@@ -86,8 +95,8 @@ public void execute(Pipeline pipeline) {
versionedClock.incremented(node.getId(),
time.getMilliseconds()));
- if(logger.isTraceEnabled())
- logger.trace("Attempt #" + (currentNode + 1) + " to perform put (node "
+ if(logger.isDebugEnabled())
+ logger.debug("Attempt #" + (currentNode + 1) + " to perform put (node "
+ node.getId() + ")");
long start = System.nanoTime();
@@ -98,8 +107,8 @@ public void execute(Pipeline pipeline) {
pipelineData.incrementSuccesses();
failureDetector.recordSuccess(node, requestTime);
- if(logger.isTraceEnabled())
- logger.trace("Put on node " + node.getId() + " succeeded, using as master");
+ if(logger.isDebugEnabled())
+ logger.debug("Put on node " + node.getId() + " succeeded, using as master");
pipelineData.setMaster(node);
pipelineData.setVersionedCopy(versionedCopy);
@@ -108,6 +117,12 @@ public void execute(Pipeline pipeline) {
} catch(Exception e) {
long requestTime = (System.nanoTime() - start) / Time.NS_PER_MS;
+ if(logger.isDebugEnabled())
+ logger.debug("Master PUT at node " + currentNode + "(" + node.getHost() + ")"
+ + " failed (" + e.getMessage() + ") in "
+ + (System.nanoTime() - start) + " ns" + " (keyRef: "
+ + System.identityHashCode(key) + ")");
+
if(handleResponseError(e, node, requestTime, pipeline, failureDetector))
return;
}
@@ -157,10 +172,25 @@ public void execute(Pipeline pipeline) {
}
} else {
+ if(logger.isDebugEnabled())
+ logger.debug("Finished master PUT for key " + key + " (keyRef: "
+ + System.identityHashCode(key) + "); started at "
+ + startMasterMs + " took "
+ + (System.nanoTime() - startMasterNs) + " ns on node "
+ + (node == null ? "NULL" : node.getId()) + "("
+ + (node == null ? "NULL" : node.getHost()) + "); now complete");
+
pipeline.addEvent(completeEvent);
}
}
} else {
+ if(logger.isDebugEnabled())
+ logger.debug("Finished master PUT for key " + key + " (keyRef: "
+ + System.identityHashCode(key) + "); started at " + startMasterMs
+ + " took " + (System.nanoTime() - startMasterNs) + " ns on node "
+ + (node == null ? "NULL" : node.getId()) + "("
+ + (node == null ? "NULL" : node.getHost()) + ")");
+
pipeline.addEvent(masterDeterminedEvent);
}
}
View
7 src/java/voldemort/store/routed/action/PerformSerialRequests.java
@@ -98,6 +98,13 @@ public void execute(Pipeline pipeline) {
result,
((System.nanoTime() - start) / Time.NS_PER_MS));
+ if(logger.isDebugEnabled())
+ logger.debug(pipeline.getOperation().getSimpleName() + " for key " + key
+ + " successes: " + pipelineData.getSuccesses() + " preferred: "
+ + preferred + " required: " + required + " new "
+ + pipeline.getOperation().getSimpleName() + " success on node "
+ + node.getId());
+
pipelineData.incrementSuccesses();
pipelineData.getResponses().add(response);
failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
View
43 src/java/voldemort/store/slop/HintedHandoff.java
@@ -101,15 +101,17 @@ public void sendHintParallel(final Node failedNode, final Version version, final
for(final Node node: handoffStrategy.routeHint(failedNode)) {
int nodeId = node.getId();
- if(logger.isTraceEnabled())
- logger.trace("Sending an async hint to " + nodeId);
+
+ if(logger.isDebugEnabled())
+ logger.debug("Sending an async hint to " + nodeId);
if(!failedNodes.contains(node) && failureDetector.isAvailable(node)) {
NonblockingStore nonblockingStore = nonblockingSlopStores.get(nodeId);
Utils.notNull(nonblockingStore);
final long startNs = System.nanoTime();
- if(logger.isTraceEnabled())
- logger.trace("Attempt to write " + slop.getKey() + " for " + failedNode
+
+ if(logger.isDebugEnabled())
+ logger.debug("Slop attempt to write " + slop.getKey() + " for " + failedNode
+ " to node " + node);
NonblockingStoreCallback callback = new NonblockingStoreCallback() {
@@ -127,6 +129,13 @@ public void requestComplete(Object result, long requestTime) {
failedNodes.add(node);
if(response.getValue() instanceof UnreachableStoreException) {
UnreachableStoreException use = (UnreachableStoreException) response.getValue();
+
+ if(logger.isDebugEnabled())
+ logger.debug("Write of key " + slop.getKey() + " for "
+ + failedNode + " to node " + node
+ + " failed due to unreachable: "
+ + use.getMessage());
+
failureDetector.recordException(node,
(System.nanoTime() - startNs)
/ Time.NS_PER_MS,
@@ -136,6 +145,12 @@ public void requestComplete(Object result, long requestTime) {
}
return;
}
+
+ if(logger.isDebugEnabled())
+ logger.debug("Slop write of key " + slop.getKey() + " for "
+ + failedNode + " to node " + node + " succeeded in "
+ + (System.nanoTime() - startNs) + " ns");
+
failureDetector.recordSuccess(node, (System.nanoTime() - startNs)
/ Time.NS_PER_MS);
@@ -151,7 +166,7 @@ public void requestComplete(Object result, long requestTime) {
}
}
}
-
+
/**
* Send a hint of a request originally meant for the failed node to another
* node in the ring, as selected by the {@link HintedHandoffStrategy}
@@ -166,8 +181,8 @@ public boolean sendHintSerial(Node failedNode, Version version, Slop slop) {
boolean persisted = false;
for(Node node: handoffStrategy.routeHint(failedNode)) {
int nodeId = node.getId();
- if(logger.isTraceEnabled())
- logger.trace("Trying to send hint to " + nodeId);
+ if(logger.isDebugEnabled())
+ logger.debug("Trying to send hint to " + nodeId);
if(!failedNodes.contains(node) && failureDetector.isAvailable(node)) {
Store<ByteArray, Slop, byte[]> slopStore = slopStores.get(nodeId);
@@ -175,10 +190,10 @@ public boolean sendHintSerial(Node failedNode, Version version, Slop slop) {
long startNs = System.nanoTime();
try {
- if(logger.isTraceEnabled())
- logger.trace("Attempt to handoff " + slop.getOperation() + " on "
- + slop.getKey() + " for " + failedNode
- + " to node " + node);
+ if(logger.isDebugEnabled())
+ logger.debug("Slop attempt to write " + slop.getKey() + " (keyRef: "
+ + System.identityHashCode(slop.getKey()) + ") for "
+ + failedNode + " to node " + node);
// No transform needs to applied to the slop
slopStore.put(slop.makeKey(), new Versioned<Slop>(slop, version), null);
@@ -197,6 +212,12 @@ public boolean sendHintSerial(Node failedNode, Version version, Slop slop) {
} catch(ObsoleteVersionException e) {
logger.debug(e, e);
}
+
+ if(logger.isDebugEnabled())
+ logger.debug("Slop write of key " + slop.getKey() + " (keyRef: "
+ + System.identityHashCode(slop.getKey()) + " for " + failedNode
+ + " to node " + node + " succeeded in "
+ + (System.nanoTime() - startNs) + " ns");
}
}
View
113 src/java/voldemort/store/socket/SocketStore.java
@@ -100,6 +100,9 @@ public void submitDeleteRequest(ByteArray key,
requestRoutingType,
key,
version);
+ if(logger.isDebugEnabled())
+ logger.debug("DELETE keyRef: " + System.identityHashCode(key) + " requestRef: "
+ + System.identityHashCode(clientRequest));
requestAsync(clientRequest, callback, timeoutMs, "delete");
}
@@ -113,6 +116,9 @@ public void submitGetRequest(ByteArray key,
requestRoutingType,
key,
transforms);
+ if(logger.isDebugEnabled())
+ logger.debug("GET keyRef: " + System.identityHashCode(key) + " requestRef: "
+ + System.identityHashCode(clientRequest));
requestAsync(clientRequest, callback, timeoutMs, "get");
}
@@ -126,6 +132,9 @@ public void submitGetAllRequest(Iterable<ByteArray> keys,
requestRoutingType,
keys,
transforms);
+ if(logger.isDebugEnabled())
+ logger.debug("GETALL keyRef: " + System.identityHashCode(keys) + " requestRef: "
+ + System.identityHashCode(clientRequest));
requestAsync(clientRequest, callback, timeoutMs, "get all");
}
@@ -137,6 +146,9 @@ public void submitGetVersionsRequest(ByteArray key,
requestFormat,
requestRoutingType,
key);
+ if(logger.isDebugEnabled())
+ logger.debug("GETVERSIONS keyRef: " + System.identityHashCode(key) + " requestRef: "
+ + System.identityHashCode(clientRequest));
requestAsync(clientRequest, callback, timeoutMs, "get versions");
}
@@ -152,6 +164,9 @@ public void submitPutRequest(ByteArray key,
key,
value,
transforms);
+ if(logger.isDebugEnabled())
+ logger.debug("PUT keyRef: " + System.identityHashCode(key) + " requestRef: "
+ + System.identityHashCode(clientRequest));
requestAsync(clientRequest, callback, timeoutMs, "put");
}
@@ -162,6 +177,9 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
requestRoutingType,
key,
version);
+ if(logger.isDebugEnabled())
+ logger.debug("DELETE keyRef: " + System.identityHashCode(key) + " requestRef: "
+ + System.identityHashCode(clientRequest));
return request(clientRequest, "delete");
}
@@ -172,6 +190,9 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
requestRoutingType,
key,
transforms);
+ if(logger.isDebugEnabled())
+ logger.debug("GET keyRef: " + System.identityHashCode(key) + " requestRef: "
+ + System.identityHashCode(clientRequest));
return request(clientRequest, "get");
}
@@ -184,6 +205,9 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
requestRoutingType,
keys,
transforms);
+ if(logger.isDebugEnabled())
+ logger.debug("GETALL keyRef: " + System.identityHashCode(keys) + " requestRef: "
+ + System.identityHashCode(clientRequest));
return request(clientRequest, "getAll");
}
@@ -193,6 +217,9 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
requestFormat,
requestRoutingType,
key);
+ if(logger.isDebugEnabled())
+ logger.debug("GETVERSIONS keyRef: " + System.identityHashCode(key) + " requestRef: "
+ + System.identityHashCode(clientRequest));
return request(clientRequest, "getVersions");
}
@@ -205,6 +232,9 @@ public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
key,
versioned,
transforms);
+ if(logger.isDebugEnabled())
+ logger.debug("PUT keyRef: " + System.identityHashCode(key) + " requestRef: "
+ + System.identityHashCode(clientRequest));
request(clientRequest, "put");
}
@@ -240,17 +270,40 @@ public void close() throws VoldemortException {
private <T> T request(ClientRequest<T> delegate, String operationName) {
ClientRequestExecutor clientRequestExecutor = pool.checkout(destination);
+
+ long startTimeMs = -1;
+ long startTimeNs = -1;
+
+ if(logger.isDebugEnabled()) {
+ startTimeMs = System.currentTimeMillis();
+ startTimeNs = System.nanoTime();
+ }
+
+ String debugMsgStr = "";
+
BlockingClientRequest<T> blockingClientRequest = null;
try {
blockingClientRequest = new BlockingClientRequest<T>(delegate, timeoutMs);
clientRequestExecutor.addClientRequest(blockingClientRequest, timeoutMs);
blockingClientRequest.await();
+
+ if(logger.isDebugEnabled())
+ debugMsgStr += "success";
+
return blockingClientRequest.getResult();
} catch(InterruptedException e) {
+
+ if(logger.isDebugEnabled())
+ debugMsgStr += "unreachable: " + e.getMessage();
+
throw new UnreachableStoreException("Failure in " + operationName + " on "
+ destination + ": " + e.getMessage(), e);
} catch(IOException e) {
clientRequestExecutor.close();
+
+ if(logger.isDebugEnabled())
+ debugMsgStr += "failure: " + e.getMessage();
+
throw new UnreachableStoreException("Failure in " + operationName + " on "
+ destination + ": " + e.getMessage(), e);
} finally {
@@ -258,6 +311,29 @@ public void close() throws VoldemortException {
// close the executor if we timed out
clientRequestExecutor.close();
}
+
+ if(logger.isDebugEnabled()) {
+ logger.debug("Sync request end, type: "
+ + operationName
+ + " requestRef: "
+ + System.identityHashCode(delegate)
+ + " totalTimeNs: "
+ + (System.nanoTime() - startTimeNs)
+ + " start time: "
+ + startTimeMs
+ + " end time: "
+ + System.currentTimeMillis()
+ + " client:"
+ + clientRequestExecutor.getSocketChannel().socket().getLocalAddress()
+ + ":"
+ + clientRequestExecutor.getSocketChannel().socket().getLocalPort()
+ + " server: "
+ + clientRequestExecutor.getSocketChannel()
+ .socket()
+ .getRemoteSocketAddress() + " outcome: "
+ + debugMsgStr);
+ }
+
pool.checkin(destination, clientRequestExecutor);
}
}
@@ -285,6 +361,23 @@ public void close() throws VoldemortException {
try {
clientRequestExecutor = pool.checkout(destination);
+
+ if(logger.isDebugEnabled()) {
+ logger.debug("Async request start; type: "
+ + operationName
+ + " requestRef: "
+ + System.identityHashCode(delegate)
+ + " time: "
+ + System.currentTimeMillis()
+ + " server: "
+ + clientRequestExecutor.getSocketChannel()
+ .socket()
+ .getRemoteSocketAddress() + " local socket: "
+ + clientRequestExecutor.getSocketChannel().socket().getLocalAddress()
+ + ":"
+ + clientRequestExecutor.getSocketChannel().socket().getLocalPort());
+ }
+
} catch(Exception e) {
// If we can't check out a socket from the pool, we'll usually get
// either an IOException (subclass) or an UnreachableStoreException
@@ -335,6 +428,26 @@ public NonblockingStoreCallbackClientRequest(ClientRequest<T> clientRequest,
private void invokeCallback(Object o, long requestTime) {
if(callback != null) {
try {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Async request end; requestRef: "
+ + System.identityHashCode(clientRequest)
+ + " time: "
+ + System.currentTimeMillis()
+ + " server: "
+ + clientRequestExecutor.getSocketChannel()
+ .socket()
+ .getRemoteSocketAddress()
+ + " local socket: "
+ + clientRequestExecutor.getSocketChannel()
+ .socket()
+ .getLocalAddress()
+ + ":"
+ + clientRequestExecutor.getSocketChannel()
+ .socket()
+ .getLocalPort() + " result: "
+ + o.toString());
+ }
+
callback.requestComplete(o, requestTime);
} catch(Exception e) {
if(logger.isEnabledFor(Level.WARN))
Please sign in to comment.
Something went wrong with that request. Please try again.