diff --git a/src/java/voldemort/server/protocol/vold/PutRequestHandler.java b/src/java/voldemort/server/protocol/vold/PutRequestHandler.java index e3fe668976..07842de66e 100644 --- a/src/java/voldemort/server/protocol/vold/PutRequestHandler.java +++ b/src/java/voldemort/server/protocol/vold/PutRequestHandler.java @@ -41,10 +41,10 @@ public static boolean isCompleteRequest(DataInputStream inputStream, public boolean parseRequest(DataInputStream inputStream) throws IOException { key = ClientRequestHandler.readKey(inputStream); int valueSize = inputStream.readInt(); - byte[] bytes = new byte[valueSize]; - ByteUtils.read(inputStream, bytes); - clock = new VectorClock(bytes); - value = ByteUtils.copy(bytes, clock.sizeInBytes(), bytes.length); + clock = VectorClock.createVectorClock(inputStream); + int vectorClockSize = clock.sizeInBytes(); + value = new byte[valueSize - vectorClockSize]; + ByteUtils.read(inputStream, value); transforms = ClientRequestHandler.readSingleTransform(inputStream, protocolVersion); return false; @@ -68,9 +68,8 @@ public int getResponseSize() { @Override public String getDebugMessage() { return "Operation PUT " + ClientRequestHandler.getDebugMessageForKey(key) + " ValueHash" - + (value == null ? "null" : value.hashCode()) + " ClockSize " + " ValueSize " - + (value == null ? "null" : value.length) + - + clock.sizeInBytes(); + + (value == null ? "null" : value.hashCode()) + " ClockSize " + clock.sizeInBytes() + + " ValueSize " + (value == null ? "null" : value.length); } } diff --git a/src/java/voldemort/versioning/VectorClock.java b/src/java/voldemort/versioning/VectorClock.java index 9daff49c07..1fcb7cf6b3 100644 --- a/src/java/voldemort/versioning/VectorClock.java +++ b/src/java/voldemort/versioning/VectorClock.java @@ -16,6 +16,8 @@ package voldemort.versioning; +import java.io.DataInputStream; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -136,6 +138,32 @@ public VectorClock(byte[] bytes, int offset) { this.timestamp = ByteUtils.readLong(bytes, index); } + public static VectorClock createVectorClock(DataInputStream inputStream) { + try { + final int HEADER_LENGTH = ByteUtils.SIZE_OF_SHORT + ByteUtils.SIZE_OF_BYTE; + byte[] header = new byte[HEADER_LENGTH]; + inputStream.readFully(header); + int numEntries = ByteUtils.readShort(header, 0); + + byte versionSize = header[ByteUtils.SIZE_OF_SHORT]; + + int entrySize = ByteUtils.SIZE_OF_SHORT + versionSize; + int totalEntrySize = numEntries * entrySize; + + byte[] vectorClockBytes = new byte[HEADER_LENGTH + totalEntrySize + + ByteUtils.SIZE_OF_LONG]; + System.arraycopy(header, 0, vectorClockBytes, 0, header.length); + + inputStream.readFully(vectorClockBytes, HEADER_LENGTH, vectorClockBytes.length + - HEADER_LENGTH); + + return new VectorClock(vectorClockBytes); + } catch(IOException e) { + throw new IllegalArgumentException("Can't deserialize vectorclock from stream", e); + } + + } + public byte[] toBytes() { byte[] serialized = new byte[sizeInBytes()]; toBytes(serialized, 0); diff --git a/test/unit/voldemort/versioning/VectorClockTest.java b/test/unit/voldemort/versioning/VectorClockTest.java index b48ea4b1cb..035b574d3d 100644 --- a/test/unit/voldemort/versioning/VectorClockTest.java +++ b/test/unit/voldemort/versioning/VectorClockTest.java @@ -21,6 +21,9 @@ import static org.junit.Assert.fail; import static voldemort.TestUtils.getClock; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; + import org.junit.Test; import voldemort.TestUtils; @@ -121,6 +124,7 @@ public void testSerializationBackwardCompatibility() { knownSerializedHead[index], serialized[index]); } + } /** @@ -139,6 +143,12 @@ public void testDeserializationBackwardCompatibility() { assertEquals("vector clock does not deserialize correctly on given byte array", clock, new VectorClock(knownSerialized)); + + DataInputStream ds = new DataInputStream(new ByteArrayInputStream(knownSerialized)); + VectorClock clock2 = VectorClock.createVectorClock(ds); + assertEquals("vector clock does not deserialize correctly on given input stream", + clock, + clock2); } @Test