Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' of git@github.com:ijuma/voldemort

  • Loading branch information...
commit 18236fed4fcbf5544d5fcfda57717504a1ff8493 2 parents 3f9cb46 + 5a00311
@ijuma ijuma authored
View
8 src/java/voldemort/server/niosocket/SelectorManager.java
@@ -21,8 +21,8 @@
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -95,7 +95,7 @@
private final Selector selector;
- private final BlockingQueue<SocketChannel> socketChannelQueue;
+ private final Queue<SocketChannel> socketChannelQueue;
private final RequestHandlerFactory requestHandlerFactory;
@@ -106,7 +106,7 @@
public SelectorManager(RequestHandlerFactory requestHandlerFactory, int socketBufferSize)
throws IOException {
this.selector = Selector.open();
- this.socketChannelQueue = new LinkedBlockingQueue<SocketChannel>();
+ this.socketChannelQueue = new ConcurrentLinkedQueue<SocketChannel>();
this.requestHandlerFactory = requestHandlerFactory;
this.socketBufferSize = socketBufferSize;
}
View
12 src/java/voldemort/server/protocol/RequestHandler.java
@@ -16,6 +16,18 @@
public void handleRequest(DataInputStream inputStream, DataOutputStream outputStream)
throws IOException;
+ /**
+ * This method is used by non-blocking code to determine if the give buffer
+ * represents a complete request. Because the non-blocking code can by
+ * definition not just block waiting for more data, it's possible to get
+ * partial reads, and this identifies that case.
+ *
+ * @param buffer Buffer to check; the buffer is reset to position 0 before
+ * calling this method and the caller must reset it after the call
+ * returns
+ * @return True if the buffer holds a complete request, false otherwise
+ */
+
public boolean isCompleteRequest(ByteBuffer buffer);
}
View
89 src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
@@ -16,6 +16,7 @@
package voldemort.server.protocol.admin;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
@@ -40,6 +41,7 @@
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
import voldemort.utils.ByteArray;
+import voldemort.utils.ByteBufferBackedInputStream;
import voldemort.utils.ByteUtils;
import voldemort.utils.ClosableIterator;
import voldemort.utils.IoThrottler;
@@ -117,8 +119,91 @@ public void handleRequest(DataInputStream inputStream, DataOutputStream outputSt
outputStream.flush();
}
- public boolean isCompleteRequest(ByteBuffer buffer) {
- throw new UnsupportedOperationException();
+ /**
+ * This is pretty ugly. We end up mimicking the request logic here, so this
+ * needs to stay in sync with handleRequest.
+ */
+
+ public boolean isCompleteRequest(final ByteBuffer buffer) {
+ DataInputStream inputStream = new DataInputStream(new ByteBufferBackedInputStream(buffer));
+ DataOutputStream outputStream = new DataOutputStream(new ByteArrayOutputStream());
+
+ try {
+ byte opCode = inputStream.readByte();
+ StorageEngine<ByteArray, byte[]> engine;
+
+ switch(opCode) {
+ case VoldemortOpCode.GET_PARTITION_AS_STREAM_OP_CODE:
+ engine = readStorageEngine(inputStream, outputStream);
+
+ if(engine != null) {
+ int partitionSize = inputStream.readInt();
+
+ for(int i = 0; i < partitionSize; i++)
+ inputStream.readInt();
+ }
+
+ break;
+ case VoldemortOpCode.PUT_ENTRIES_AS_STREAM_OP_CODE:
+ engine = readStorageEngine(inputStream, outputStream);
+
+ if(engine != null) {
+ int keySize = inputStream.readInt();
+
+ while(keySize != -1) {
+ buffer.position(buffer.position() + keySize);
+ int valueSize = inputStream.readInt();
+ buffer.position(buffer.position() + valueSize);
+
+ keySize = inputStream.readInt();
+ }
+ }
+
+ break;
+ case VoldemortOpCode.UPDATE_METADATA_OP_CODE:
+ String keyString = inputStream.readUTF();
+
+ if(keyString.equals(MetadataStore.CLUSTER_KEY)
+ || keyString.equals(MetadataStore.ROLLBACK_CLUSTER_KEY)) {
+ // Read the clusterString but just to skip the bytes...
+ inputStream.readUTF();
+ } else if(keyString.equals(MetadataStore.STORES_KEY)) {
+ // Read the storesString but just to skip the bytes...
+ inputStream.readUTF();
+ }
+
+ break;
+ case VoldemortOpCode.SERVER_STATE_CHANGE_OP_CODE:
+ // Read the new server state, but again, just to skip the
+ // bytes...
+ inputStream.readUTF();
+
+ break;
+ case VoldemortOpCode.REDIRECT_GET_OP_CODE:
+ engine = readStorageEngine(inputStream, outputStream);
+
+ // Read the key to skip the bytes...
+ if(engine != null)
+ readKey(inputStream);
+
+ break;
+ default:
+ // Do nothing, let the request handler address this...
+ }
+
+ // If there aren't any remaining, we've "consumed" all the bytes and
+ // thus have a complete request...
+ return !buffer.hasRemaining();
+ } catch(Exception e) {
+ // This could also occur if the various methods we call into
+ // re-throw a corrupted value error as some other type of exception.
+ // For example, updating the position on a buffer past its limit
+ // throws an InvalidArgumentException.
+ if(logger.isDebugEnabled())
+ logger.debug("Probable partial read occurred causing exception", e);
+
+ return false;
+ }
}
private byte[] readKey(DataInputStream inputStream) throws IOException {
View
22 src/java/voldemort/server/protocol/vold/VoldemortNativeRequestHandler.java
@@ -8,6 +8,8 @@
import java.util.List;
import java.util.Map;
+import org.apache.log4j.Logger;
+
import voldemort.VoldemortException;
import voldemort.serialization.VoldemortOpCode;
import voldemort.server.StoreRepository;
@@ -29,6 +31,8 @@
*/
public class VoldemortNativeRequestHandler extends AbstractRequestHandler implements RequestHandler {
+ private final Logger logger = Logger.getLogger(VoldemortNativeRequestHandler.class);
+
private final int protocolVersion;
public VoldemortNativeRequestHandler(ErrorCodeMapper errorMapper,
@@ -72,18 +76,23 @@ public void handleRequest(DataInputStream inputStream, DataOutputStream outputSt
outputStream.flush();
}
+ /**
+ * This is pretty ugly. We end up mimicking the request logic here, so this
+ * needs to stay in sync with handleRequest.
+ */
+
public boolean isCompleteRequest(final ByteBuffer buffer) {
DataInputStream inputStream = new DataInputStream(new ByteBufferBackedInputStream(buffer));
try {
- byte opCode = buffer.get();
+ byte opCode = inputStream.readByte();
// Read the store name in, but just to skip the bytes.
inputStream.readUTF();
// Read the 'is routed' flag in, but just to skip the byte.
if(protocolVersion > 0)
- buffer.get();
+ inputStream.readBoolean();
switch(opCode) {
case VoldemortOpCode.GET_OP_CODE:
@@ -123,7 +132,14 @@ public boolean isCompleteRequest(final ByteBuffer buffer) {
// If there aren't any remaining, we've "consumed" all the bytes and
// thus have a complete request...
return !buffer.hasRemaining();
- } catch(Throwable t) {
+ } catch(Exception e) {
+ // This could also occur if the various methods we call into
+ // re-throw a corrupted value error as some other type of exception.
+ // For example, updating the position on a buffer past its limit
+ // throws an InvalidArgumentException.
+ if(logger.isDebugEnabled())
+ logger.debug("Probable partial read occurred causing exception", e);
+
return false;
}
}
View
2  src/java/voldemort/utils/ByteBufferBackedInputStream.java
@@ -56,7 +56,7 @@ public int read() throws IOException {
if(!buffer.hasRemaining())
return -1;
- return buffer.get();
+ return buffer.get() & 0xff;
}
@Override
Please sign in to comment.
Something went wrong with that request. Please try again.