Permalink
Browse files

Added implementation of isCompleteRequest and tested with unit tests.

  • Loading branch information...
1 parent 6142761 commit 5a00311059cde655b6ff7f1408c08f716490f5de @kirktrue kirktrue committed Aug 2, 2009
Showing with 87 additions and 2 deletions.
  1. +87 −2 src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
@@ -16,6 +16,7 @@
package voldemort.server.protocol.admin;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
@@ -40,6 +41,7 @@
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
import voldemort.utils.ByteArray;
+import voldemort.utils.ByteBufferBackedInputStream;
import voldemort.utils.ByteUtils;
import voldemort.utils.ClosableIterator;
import voldemort.utils.IoThrottler;
@@ -117,8 +119,91 @@ public void handleRequest(DataInputStream inputStream, DataOutputStream outputSt
outputStream.flush();
}
- public boolean isCompleteRequest(ByteBuffer buffer) {
- throw new UnsupportedOperationException();
+ /**
+ * This is pretty ugly. We end up mimicking the request logic here, so this
+ * needs to stay in sync with handleRequest.
+ */
+
+ public boolean isCompleteRequest(final ByteBuffer buffer) {
+ DataInputStream inputStream = new DataInputStream(new ByteBufferBackedInputStream(buffer));
+ DataOutputStream outputStream = new DataOutputStream(new ByteArrayOutputStream());
+
+ try {
+ byte opCode = inputStream.readByte();
+ StorageEngine<ByteArray, byte[]> engine;
+
+ switch(opCode) {
+ case VoldemortOpCode.GET_PARTITION_AS_STREAM_OP_CODE:
+ engine = readStorageEngine(inputStream, outputStream);
+
+ if(engine != null) {
+ int partitionSize = inputStream.readInt();
+
+ for(int i = 0; i < partitionSize; i++)
+ inputStream.readInt();
+ }
+
+ break;
+ case VoldemortOpCode.PUT_ENTRIES_AS_STREAM_OP_CODE:
+ engine = readStorageEngine(inputStream, outputStream);
+
+ if(engine != null) {
+ int keySize = inputStream.readInt();
+
+ while(keySize != -1) {
+ buffer.position(buffer.position() + keySize);
+ int valueSize = inputStream.readInt();
+ buffer.position(buffer.position() + valueSize);
+
+ keySize = inputStream.readInt();
+ }
+ }
+
+ break;
+ case VoldemortOpCode.UPDATE_METADATA_OP_CODE:
+ String keyString = inputStream.readUTF();
+
+ if(keyString.equals(MetadataStore.CLUSTER_KEY)
+ || keyString.equals(MetadataStore.ROLLBACK_CLUSTER_KEY)) {
+ // Read the clusterString but just to skip the bytes...
+ inputStream.readUTF();
+ } else if(keyString.equals(MetadataStore.STORES_KEY)) {
+ // Read the storesString but just to skip the bytes...
+ inputStream.readUTF();
+ }
+
+ break;
+ case VoldemortOpCode.SERVER_STATE_CHANGE_OP_CODE:
+ // Read the new server state, but again, just to skip the
+ // bytes...
+ inputStream.readUTF();
+
+ break;
+ case VoldemortOpCode.REDIRECT_GET_OP_CODE:
+ engine = readStorageEngine(inputStream, outputStream);
+
+ // Read the key to skip the bytes...
+ if(engine != null)
+ readKey(inputStream);
+
+ break;
+ default:
+ // Do nothing, let the request handler address this...
+ }
+
+ // If there aren't any remaining, we've "consumed" all the bytes and
+ // thus have a complete request...
+ return !buffer.hasRemaining();
+ } catch(Exception e) {
+ // This could also occur if the various methods we call into
+ // re-throw a corrupted value error as some other type of exception.
+ // For example, updating the position on a buffer past its limit
+ // throws an InvalidArgumentException.
+ if(logger.isDebugEnabled())
+ logger.debug("Probable partial read occurred causing exception", e);
+
+ return false;
+ }
}
private byte[] readKey(DataInputStream inputStream) throws IOException {

0 comments on commit 5a00311

Please sign in to comment.