Skip to content

Commit

Permalink
Refactoring, commenting, logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirk True committed Feb 9, 2010
1 parent 93ca99a commit fb647da
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 105 deletions.
196 changes: 106 additions & 90 deletions src/java/voldemort/server/niosocket/AsyncRequestHandler.java
Expand Up @@ -130,107 +130,132 @@ private void read(SelectionKey selectionKey) throws IOException {
throw new EOFException("EOF for " + socketChannel.socket().getRemoteSocketAddress());

if(logger.isTraceEnabled())
logInputBufferState("Read " + count + " bytes");
traceInputBufferState("Read " + count + " bytes");

if(count == 0)
return;

// Take note of the position after we read the bytes. We'll need it in
// case of incomplete reads later on down the method.
final int position = inputStream.getBuffer().position();

// Flip the buffer, set our limit to the current position and then set
// the position to 0 in preparation for reading in the RequestHandler.
inputStream.getBuffer().flip();

// We have to do this on the first request.
// We have to do this on the first request as we don't know the protocol
// yet.
if(requestHandler == null) {
if(!initRequestHandler(selectionKey)) {
return;
}
}

if(streamRequestHandler != null) {
if(logger.isTraceEnabled())
logInputBufferState("Continuing existing streaming request");

// We're continuing an existing streaming request from our last pass
// through. So handle it and return.
handleStreamRequest(selectionKey);
} else if(requestHandler != null
&& requestHandler.isCompleteRequest(inputStream.getBuffer())) {
// If we have the full request, flip the buffer for reading
// and execute the request
inputStream.getBuffer().rewind();
return;
}

if(logger.isTraceEnabled())
logger.trace("Starting execution for "
+ socketChannel.socket().getRemoteSocketAddress());
if(!requestHandler.isCompleteRequest(inputStream.getBuffer())) {
// Ouch - we're missing some data for a full request, so handle that
// and return.
handleIncompleteRequest(position);
return;
}

streamRequestHandler = requestHandler.handleRequest(new DataInputStream(inputStream),
new DataOutputStream(outputStream));
// At this point we have the full request (and it's not streaming), so
// rewind the buffer for reading and execute the request.
inputStream.getBuffer().rewind();

if(streamRequestHandler != null) {
handleStreamRequest(selectionKey);
} else {
if(logger.isTraceEnabled())
logger.trace("Finished execution for "
+ socketChannel.socket().getRemoteSocketAddress());
if(logger.isTraceEnabled())
logger.trace("Starting execution for "
+ socketChannel.socket().getRemoteSocketAddress());

clearInputBuffer();
prepForWrite(selectionKey);
}
} else {
handleIncompleteRequest(position);
streamRequestHandler = requestHandler.handleRequest(new DataInputStream(inputStream),
new DataOutputStream(outputStream));

if(streamRequestHandler != null) {
// In the case of a StreamRequestHandler, we handle that separately
// (attempting to process multiple "segments").
handleStreamRequest(selectionKey);
return;
}

// At this point we've completed a full stand-alone request. So clear
// our input buffer and prepare for outputting back to the client.
if(logger.isTraceEnabled())
logger.trace("Finished execution for "
+ socketChannel.socket().getRemoteSocketAddress());

prepForWrite(selectionKey);
}

private void write(SelectionKey selectionKey) throws IOException {
ByteBuffer outputBuffer = outputStream.getBuffer();

if(outputBuffer.hasRemaining()) {
// Write what we can now...
int count = socketChannel.write(outputBuffer);
if(outputStream.getBuffer().hasRemaining()) {
// If we have data, write what we can now...
int count = socketChannel.write(outputStream.getBuffer());

if(logger.isTraceEnabled())
logger.trace("Wrote " + count + " bytes, remaining: " + outputBuffer.remaining()
+ " for " + socketChannel.socket().getRemoteSocketAddress());
logger.trace("Wrote " + count + " bytes, remaining: "
+ outputStream.getBuffer().remaining() + " for "
+ socketChannel.socket().getRemoteSocketAddress());
} else {
if(logger.isTraceEnabled())
logger.trace("Wrote no bytes for "
+ socketChannel.socket().getRemoteSocketAddress());
}

if(outputBuffer.hasRemaining())
// If there's more to write but we didn't write it, we'll take that to
// mean that we're done here. We don't clear or reset anything. We leave
// our buffer state where it is and try our luck next time.
if(outputStream.getBuffer().hasRemaining())
return;

// If we don't have anything else to write, that means we're done with
// the request! So clear the buffers (resizing if necessary) and signal
// the Selector that we're ready to take the next request.
if(outputBuffer.capacity() >= resizeThreshold) {
outputBuffer = ByteBuffer.allocate(socketBufferSize);
outputStream.setBuffer(outputBuffer);
} else {
outputBuffer.clear();
}
// the request! So clear the buffers (resizing if necessary).
if(outputStream.getBuffer().capacity() >= resizeThreshold)
outputStream.setBuffer(ByteBuffer.allocate(socketBufferSize));
else
outputStream.getBuffer().clear();

if(streamRequestHandler != null && !streamRequestHandler.isStreamingReads()) {
// In the case of streaming writes, it's possible we can process
// another segment of the stream. We process streaming writes this
// way because there won't be any other notification for us to do
// work as we won't be notified via reads.
if(logger.isTraceEnabled())
logger.trace("Request is streaming for "
+ socketChannel.socket().getRemoteSocketAddress());

handleStreamRequest(selectionKey);
} else {
// If we're not streaming writes, signal the Selector that we're
// ready to read the next request.
selectionKey.interestOps(SelectionKey.OP_READ);
}
}

private void handleStreamRequest(SelectionKey selectionKey) throws IOException {
// You are not expected to understand this.
DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream);

// We need to keep track of the last known starting index *before* we
// attempt to service the next segment. This is needed in case of
// partial reads so that we can revert back to this point.
int preRequestPosition = inputStream.getBuffer().position();

StreamRequestHandlerState state = handleStreamRequestInternal(selectionKey,
dataInputStream,
dataOutputStream);

if(state == StreamRequestHandlerState.READING) {
// We've read our request but aren't ready to write anything
// just yet as we're streaming reads from the client.
// We've read our request and handled one segment, but we aren't
// ready to write anything just yet as we're streaming reads from
// the client. So let's keep executing segments as much as we can
// until we're no longer reading anything.
do {
preRequestPosition = inputStream.getBuffer().position();
state = handleStreamRequestInternal(selectionKey, dataInputStream, dataOutputStream);
Expand All @@ -239,37 +264,42 @@ private void handleStreamRequest(SelectionKey selectionKey) throws IOException {

if(state == null) {
// We got an error...
clearInputBuffer();
} else if(state == StreamRequestHandlerState.INCOMPLETE_READ) {
// We need the data that's in there so far and aren't ready
// to write anything out yet, so don't clear the input
// buffer or signal that we're ready to write. But we do want to
// compact the buffer as we don't want it to trigger an increase in
// the buffer if we don't need to...
// A) figure out where we are in the buffer...
return;
}

if(state == StreamRequestHandlerState.INCOMPLETE_READ) {
// We need the data that's in there so far and aren't ready to write
// anything out yet, so don't clear the input buffer or signal that
// we're ready to write. But we do want to compact the buffer as we
// don't want it to trigger an increase in the buffer if we don't
// need to do so.

// We need to do the following steps...
//
// a) ...figure out where we are in the buffer...
int currentPosition = inputStream.getBuffer().position();
// B) position ourselves at the start of the partial "segment"...

// b) ...position ourselves at the start of the incomplete
// "segment"...
inputStream.getBuffer().position(preRequestPosition);
// C) then move the buffer down such that preRequestPosition's data

// c) ...then copy the data starting from preRequestPosition's data
// is at index 0...
inputStream.getBuffer().compact();
// D) and reset the position to be ready for the rest of the reads
// and the limit to allow more data.

// d) ...and reset the position to be ready for the rest of the
// reads and the limit to allow more data.
handleIncompleteRequest(currentPosition - preRequestPosition);
} else if(state == StreamRequestHandlerState.WRITING) {
// We've read our request and are ready to start streaming
// writes to the client.
clearInputBuffer();

prepForWrite(selectionKey);
} else if(state == StreamRequestHandlerState.COMPLETE) {
streamRequestHandler.close(dataOutputStream);
streamRequestHandler = null;

// Treat this as a normal request. Assume that all completed
// requests want to write something back to the client.
clearInputBuffer();

prepForWrite(selectionKey);
}
}
Expand All @@ -282,12 +312,12 @@ private StreamRequestHandlerState handleStreamRequestInternal(SelectionKey selec

try {
if(logger.isTraceEnabled())
logInputBufferState("Before streaming request handler");
traceInputBufferState("Before streaming request handler");

state = streamRequestHandler.handleRequest(dataInputStream, dataOutputStream);

if(logger.isTraceEnabled())
logInputBufferState("After streaming request handler");
traceInputBufferState("After streaming request handler");
} catch(Exception e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e.getMessage(), e);
Expand All @@ -296,11 +326,10 @@ private StreamRequestHandlerState handleStreamRequestInternal(SelectionKey selec
: new VoldemortException(e);
streamRequestHandler.handleError(dataOutputStream, error);
streamRequestHandler.close(dataOutputStream);
streamRequestHandler = null;

prepForWrite(selectionKey);

streamRequestHandler = null;

close(selectionKey);
}

Expand All @@ -314,43 +343,30 @@ private StreamRequestHandlerState handleStreamRequestInternal(SelectionKey selec
*/

private void prepForWrite(SelectionKey selectionKey) {
outputStream.getBuffer().flip();
selectionKey.interestOps(SelectionKey.OP_WRITE);
}

/**
* We've written to the buffer in the handleRequest invocation, so we're
* done with the input and can reset/resize.
*
* @param inputBuffer
*/

private void clearInputBuffer() {
ByteBuffer inputBuffer = inputStream.getBuffer();

if(logger.isTraceEnabled())
logInputBufferState("About to clear read buffer");
traceInputBufferState("About to clear read buffer");

if(inputBuffer.capacity() >= resizeThreshold) {
inputBuffer = ByteBuffer.allocate(socketBufferSize);
inputStream.setBuffer(inputBuffer);
} else {
inputBuffer.clear();
}
if(inputStream.getBuffer().capacity() >= resizeThreshold)
inputStream.setBuffer(ByteBuffer.allocate(socketBufferSize));
else
inputStream.getBuffer().clear();

if(logger.isTraceEnabled())
logInputBufferState("Cleared read buffer");
traceInputBufferState("Cleared read buffer");

outputStream.getBuffer().flip();
selectionKey.interestOps(SelectionKey.OP_WRITE);
}

private void handleIncompleteRequest(int newPosition) {
if(logger.isTraceEnabled())
logInputBufferState("Incomplete read request detected, before update");
traceInputBufferState("Incomplete read request detected, before update");

inputStream.getBuffer().position(newPosition);
inputStream.getBuffer().limit(inputStream.getBuffer().capacity());

if(logger.isTraceEnabled())
logInputBufferState("Incomplete read request detected, after update");
traceInputBufferState("Incomplete read request detected, after update");

if(!inputStream.getBuffer().hasRemaining()) {
// We haven't read all the data needed for the request AND we
Expand All @@ -360,7 +376,7 @@ private void handleIncompleteRequest(int newPosition) {
inputStream.getBuffer().capacity() * 2));

if(logger.isTraceEnabled())
logInputBufferState("Expanded input buffer");
traceInputBufferState("Expanded input buffer");
}
}

Expand Down Expand Up @@ -440,7 +456,7 @@ private boolean initRequestHandler(SelectionKey selectionKey) {
}
}

private void logInputBufferState(String preamble) {
private void traceInputBufferState(String preamble) {
logger.trace(preamble + " - position: " + inputStream.getBuffer().position() + ", limit: "
+ inputStream.getBuffer().limit() + ", remaining: "
+ inputStream.getBuffer().remaining() + ", capacity: "
Expand Down

0 comments on commit fb647da

Please sign in to comment.