Skip to content

Commit

Permalink
Stats to understand NIO layer performance + BDB exception counts et al
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Jan 2, 2013
1 parent fbe6718 commit dd29d0e
Show file tree
Hide file tree
Showing 22 changed files with 549 additions and 112 deletions.
42 changes: 2 additions & 40 deletions config/single_node_cluster/config/stores.xml
Expand Up @@ -14,44 +14,6 @@
</key-serializer>
<value-serializer>
<type>string</type>
</value-serializer>
</store>
<store>
<name>test-evolution</name>
<persistence>bdb</persistence>
<description>Test store</description>
<owners>harry@hogwarts.edu, hermoine@hogwarts.edu</owners>
<routing-strategy>consistent-routing</routing-strategy>
<routing>client</routing>
<replication-factor>1</replication-factor>
<required-reads>1</required-reads>
<required-writes>1</required-writes>
<key-serializer>
<type>string</type>
</key-serializer>
<value-serializer>
<type>avro-generic-versioned</type>
<schema-info version="0">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }]}</schema-info>
<schema-info version="1">{"type": "record", "name": "myrec","fields": [{ "name": "original", "type": "string" }, { "name": "new-field", "type": "string", "default":"" }]}</schema-info>
</value-serializer>
</store>
<store>
<name>anagpal-test-old</name>
<persistence>read-only</persistence>
<description>"test store"</description>
<owners>anagpal@linkedin.com</owners>
<routing-strategy>consistent-routing</routing-strategy>
<routing>client</routing>
<replication-factor>1</replication-factor>
<required-reads>1</required-reads>
<required-writes>1</required-writes>
<key-serializer>
<type>json</type>
<schema-info version="0">"string"</schema-info>
</key-serializer>
<value-serializer>
<type>json</type>
<schema-info version="0">{"cnt":"int32", "country":"string"}</schema-info>
</value-serializer>
</store>
</value-serializer>
</store>
</stores>
Expand Up @@ -30,11 +30,11 @@
import voldemort.VoldemortException;
import voldemort.client.protocol.RequestFormat;
import voldemort.common.VoldemortOpCode;
import voldemort.common.nio.ByteBufferBackedInputStream;
import voldemort.server.RequestRoutingType;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.StoreUtils;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteBufferBackedInputStream;
import voldemort.utils.ByteUtils;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
Expand Down
Expand Up @@ -14,12 +14,14 @@
* the License.
*/

package voldemort.utils;
package voldemort.common.nio;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import org.apache.commons.lang.mutable.MutableLong;

import voldemort.annotations.concurrency.NotThreadsafe;

/**
Expand All @@ -38,16 +40,34 @@ public class ByteBufferBackedInputStream extends InputStream {

private ByteBuffer buffer;

/**
* Reference to a size tracking object, that tracks the size of the buffer
* in bytes
*/
private MutableLong sizeTracker;

public ByteBufferBackedInputStream(ByteBuffer buffer) {
this.buffer = buffer;
this.sizeTracker = null;
}

public ByteBufferBackedInputStream(ByteBuffer buffer, MutableLong sizeTracker) {
this.buffer = buffer;
this.sizeTracker = sizeTracker;
this.sizeTracker.add(buffer.capacity());
}

public ByteBuffer getBuffer() {
return buffer;
}

public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
public void setBuffer(ByteBuffer newBuffer) {
// update the size tracker with the new buffer size
if((sizeTracker != null && this.buffer != null && newBuffer != null)) {
sizeTracker.add(newBuffer.capacity());
sizeTracker.subtract(this.buffer.capacity());
}
this.buffer = newBuffer;
}

@Override
Expand All @@ -68,4 +88,9 @@ public int read(byte[] bytes, int off, int len) throws IOException {
return len;
}

public void close() {
if(sizeTracker != null && this.buffer != null) {
sizeTracker.subtract(this.buffer.capacity());
}
}
}
Expand Up @@ -14,13 +14,16 @@
* the License.
*/

package voldemort.utils;
package voldemort.common.nio;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

import org.apache.commons.lang.mutable.MutableLong;

import voldemort.annotations.concurrency.NotThreadsafe;
import voldemort.utils.ByteUtils;

/**
* ByteBufferBackedOutputStream serves two purposes:
Expand All @@ -46,17 +49,36 @@ public class ByteBufferBackedOutputStream extends OutputStream {

private boolean wasExpanded;

/**
* Reference to a size tracking object, that tracks the size of the buffer
* in bytes
*/
private MutableLong sizeTracker;

public ByteBufferBackedOutputStream(ByteBuffer buffer) {
this.buffer = buffer;
wasExpanded = false;
this.sizeTracker = null;
}

public ByteBufferBackedOutputStream(ByteBuffer buffer, MutableLong sizeTracker) {
this.buffer = buffer;
wasExpanded = false;
this.sizeTracker = sizeTracker;
this.sizeTracker.add(buffer.capacity());
}

public ByteBuffer getBuffer() {
return buffer;
}

public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
public void setBuffer(ByteBuffer newBuffer) {
// update the size tracker with the new buffer size
if((sizeTracker != null && this.buffer != null && newBuffer != null)) {
sizeTracker.add(newBuffer.capacity());
sizeTracker.subtract(this.buffer.capacity());
}
this.buffer = newBuffer;
wasExpanded = false;
}

Expand All @@ -78,13 +100,23 @@ private void expandIfNeeded(int len) {
if(need <= 0)
return;

int newCapacity = buffer.capacity() + need;
buffer = ByteUtils.expand(buffer, newCapacity * 2);
int newCapacity = (buffer.capacity() + need) * 2;
// update the size tracker with the new buffer size
if(sizeTracker != null) {
sizeTracker.add(newCapacity);
sizeTracker.subtract(this.buffer.capacity());
}
buffer = ByteUtils.expand(buffer, newCapacity);
wasExpanded = true;
}

public boolean wasExpanded() {
return wasExpanded;
}

public void close() {
if(sizeTracker != null && this.buffer != null) {
sizeTracker.subtract(this.buffer.capacity());
}
}
}
28 changes: 28 additions & 0 deletions src/java/voldemort/common/nio/CommBufferSizeStats.java
@@ -0,0 +1,28 @@
package voldemort.common.nio;

import org.apache.commons.lang.mutable.MutableLong;

/**
* Statistics object to track the communication buffer sizes across all the
* connections, handled by the selector managers
*
*/
public class CommBufferSizeStats {

private MutableLong commReadBufferSizeBytes;

private MutableLong commWriteBufferSizeBytes;

public CommBufferSizeStats() {
commReadBufferSizeBytes = new MutableLong(0);
commWriteBufferSizeBytes = new MutableLong(0);
}

public MutableLong getCommReadBufferSizeTracker() {
return commReadBufferSizeBytes;
}

public MutableLong getCommWriteBufferSizeTracker() {
return commWriteBufferSizeBytes;
}
}
Expand Up @@ -14,7 +14,7 @@
* the License.
*/

package voldemort.utils;
package voldemort.common.nio;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
Expand Down Expand Up @@ -98,6 +98,23 @@ public class SelectorManager implements Runnable {

protected final Logger logger = Logger.getLogger(getClass());

// statistics about the current select loop
/**
* Number of connections selected (meaning they have some data to be
* read/written) in the current processing loop
*/
protected int selectCount = -1;
/**
* Amount of time taken to process all the connections selected in this
* processing loop
*/
protected long processingTimeMs = -1;
/**
* Amount of time spent in the select() call. This is an indicator of how
* busy the thread is
*/
protected long selectTimeMs = -1;

public SelectorManager() {
try {
this.selector = Selector.open();
Expand Down Expand Up @@ -172,7 +189,10 @@ public void run() {
processEvents();

try {
selectTimeMs = System.currentTimeMillis();
int selected = selector.select(SELECTOR_POLL_MS);
selectTimeMs = System.currentTimeMillis() - selectTimeMs;
selectCount = selected;

if(isClosed.get()) {
if(logger.isInfoEnabled())
Expand All @@ -182,6 +202,7 @@ public void run() {
}

if(selected > 0) {
processingTimeMs = System.currentTimeMillis();
Iterator<SelectionKey> i = selector.selectedKeys().iterator();

while(i.hasNext()) {
Expand All @@ -194,6 +215,7 @@ public void run() {
worker.run();
}
}
processingTimeMs = System.currentTimeMillis() - processingTimeMs;
}
} catch(ClosedSelectorException e) {
if(logger.isDebugEnabled())
Expand All @@ -217,5 +239,4 @@ public void run() {
}
}
}

}
Expand Up @@ -14,7 +14,7 @@
* the License.
*/

package voldemort.utils;
package voldemort.common.nio;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -29,6 +29,8 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import voldemort.utils.ByteUtils;

/**
* SelectorManagerWorker manages a Selector, SocketChannel, and IO streams
* implementation. At the point that the run method is invoked, the Selector
Expand Down Expand Up @@ -62,13 +64,16 @@ public abstract class SelectorManagerWorker implements Runnable {

public SelectorManagerWorker(Selector selector,
SocketChannel socketChannel,
int socketBufferSize) {
int socketBufferSize,
CommBufferSizeStats commBufferStats) {
this.selector = selector;
this.socketChannel = socketChannel;
this.socketBufferSize = socketBufferSize;
this.resizeThreshold = socketBufferSize * 2; // This is arbitrary...
this.inputStream = new ByteBufferBackedInputStream(ByteBuffer.allocate(socketBufferSize));
this.outputStream = new ByteBufferBackedOutputStream(ByteBuffer.allocate(socketBufferSize));
this.inputStream = new ByteBufferBackedInputStream(ByteBuffer.allocate(socketBufferSize),
commBufferStats.getCommReadBufferSizeTracker());
this.outputStream = new ByteBufferBackedOutputStream(ByteBuffer.allocate(socketBufferSize),
commBufferStats.getCommWriteBufferSizeTracker());
this.createTimestamp = System.nanoTime();
this.isClosed = new AtomicBoolean(false);

Expand Down Expand Up @@ -162,6 +167,10 @@ protected void closeInternal() {
logger.warn(e.getMessage(), e);
}
}

// close the streams, so we account for comm buffer frees
inputStream.close();
outputStream.close();
}

public boolean isClosed() {
Expand Down

0 comments on commit dd29d0e

Please sign in to comment.