Skip to content

Commit

Permalink
Vector clock deserializer from Input Stream
Browse files Browse the repository at this point in the history
Avoid double allocating the value size for puts which can be potentially
few kilobytes. Vector clock has a deserializer from InputStream and it is
used to avoid the double allocation on the hot path.
  • Loading branch information
arunthirupathi committed Apr 16, 2015
1 parent 94be1a5 commit 3f425ef
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 7 deletions.
13 changes: 6 additions & 7 deletions src/java/voldemort/server/protocol/vold/PutRequestHandler.java
Expand Up @@ -41,10 +41,10 @@ public static boolean isCompleteRequest(DataInputStream inputStream,
public boolean parseRequest(DataInputStream inputStream) throws IOException {
key = ClientRequestHandler.readKey(inputStream);
int valueSize = inputStream.readInt();
byte[] bytes = new byte[valueSize];
ByteUtils.read(inputStream, bytes);
clock = new VectorClock(bytes);
value = ByteUtils.copy(bytes, clock.sizeInBytes(), bytes.length);
clock = VectorClock.createVectorClock(inputStream);
int vectorClockSize = clock.sizeInBytes();
value = new byte[valueSize - vectorClockSize];
ByteUtils.read(inputStream, value);

transforms = ClientRequestHandler.readSingleTransform(inputStream, protocolVersion);
return false;
Expand All @@ -68,9 +68,8 @@ public int getResponseSize() {
@Override
public String getDebugMessage() {
return "Operation PUT " + ClientRequestHandler.getDebugMessageForKey(key) + " ValueHash"
+ (value == null ? "null" : value.hashCode()) + " ClockSize " + " ValueSize "
+ (value == null ? "null" : value.length) +
+ clock.sizeInBytes();
+ (value == null ? "null" : value.hashCode()) + " ClockSize " + clock.sizeInBytes()
+ " ValueSize " + (value == null ? "null" : value.length);
}

}
28 changes: 28 additions & 0 deletions src/java/voldemort/versioning/VectorClock.java
Expand Up @@ -16,6 +16,8 @@

package voldemort.versioning;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -136,6 +138,32 @@ public VectorClock(byte[] bytes, int offset) {
this.timestamp = ByteUtils.readLong(bytes, index);
}

public static VectorClock createVectorClock(DataInputStream inputStream) {
try {
final int HEADER_LENGTH = ByteUtils.SIZE_OF_SHORT + ByteUtils.SIZE_OF_BYTE;
byte[] header = new byte[HEADER_LENGTH];
inputStream.readFully(header);
int numEntries = ByteUtils.readShort(header, 0);

byte versionSize = header[ByteUtils.SIZE_OF_SHORT];

int entrySize = ByteUtils.SIZE_OF_SHORT + versionSize;
int totalEntrySize = numEntries * entrySize;

byte[] vectorClockBytes = new byte[HEADER_LENGTH + totalEntrySize
+ ByteUtils.SIZE_OF_LONG];
System.arraycopy(header, 0, vectorClockBytes, 0, header.length);

inputStream.readFully(vectorClockBytes, HEADER_LENGTH, vectorClockBytes.length
- HEADER_LENGTH);

return new VectorClock(vectorClockBytes);
} catch(IOException e) {
throw new IllegalArgumentException("Can't deserialize vectorclock from stream", e);
}

}

public byte[] toBytes() {
byte[] serialized = new byte[sizeInBytes()];
toBytes(serialized, 0);
Expand Down
10 changes: 10 additions & 0 deletions test/unit/voldemort/versioning/VectorClockTest.java
Expand Up @@ -21,6 +21,9 @@
import static org.junit.Assert.fail;
import static voldemort.TestUtils.getClock;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;

import org.junit.Test;

import voldemort.TestUtils;
Expand Down Expand Up @@ -121,6 +124,7 @@ public void testSerializationBackwardCompatibility() {
knownSerializedHead[index],
serialized[index]);
}

}

/**
Expand All @@ -139,6 +143,12 @@ public void testDeserializationBackwardCompatibility() {
assertEquals("vector clock does not deserialize correctly on given byte array",
clock,
new VectorClock(knownSerialized));

DataInputStream ds = new DataInputStream(new ByteArrayInputStream(knownSerialized));
VectorClock clock2 = VectorClock.createVectorClock(ds);
assertEquals("vector clock does not deserialize correctly on given input stream",
clock,
clock2);
}

@Test
Expand Down

0 comments on commit 3f425ef

Please sign in to comment.